|
2 | 2 |
|
3 | 3 | namespace Frosh\Tools\Components\Elasticsearch; |
4 | 4 |
|
| 5 | +use Doctrine\DBAL\Connection; |
5 | 6 | use Elasticsearch\Client; |
| 7 | +use Shopware\Core\Framework\Increment\Exception\IncrementGatewayNotFoundException; |
| 8 | +use Shopware\Core\Framework\Increment\IncrementGatewayRegistry; |
| 9 | +use Shopware\Elasticsearch\Framework\ElasticsearchOutdatedIndexDetector; |
| 10 | +use Shopware\Elasticsearch\Framework\Indexing\CreateAliasTaskHandler; |
| 11 | +use Shopware\Elasticsearch\Framework\Indexing\ElasticsearchIndexer; |
| 12 | +use Shopware\Elasticsearch\Framework\Indexing\ElasticsearchIndexingMessage; |
| 13 | +use Symfony\Component\Messenger\MessageBusInterface; |
6 | 14 |
|
7 | 15 | class ElasticsearchManager |
8 | 16 | { |
| 17 | + protected ElasticsearchIndexer $indexer; |
| 18 | + protected MessageBusInterface $messageBus; |
| 19 | + protected CreateAliasTaskHandler $createAliasTaskHandler; |
| 20 | + protected ElasticsearchOutdatedIndexDetector $outdatedIndexDetector; |
| 21 | + protected Connection $connection; |
| 22 | + protected IncrementGatewayRegistry $gatewayRegistry; |
9 | 23 | private Client $client; |
10 | 24 | private bool $enabled; |
11 | 25 |
|
12 | | - public function __construct(Client $client, bool $enabled) |
13 | | - { |
| 26 | + public function __construct( |
| 27 | + Client $client, |
| 28 | + bool $enabled, |
| 29 | + ElasticsearchIndexer $indexer, |
| 30 | + MessageBusInterface $messageBus, |
| 31 | + CreateAliasTaskHandler $createAliasTaskHandler, |
| 32 | + ElasticsearchOutdatedIndexDetector $outdatedIndexDetector, |
| 33 | + Connection $connection, |
| 34 | + IncrementGatewayRegistry $gatewayRegistry |
| 35 | + ) { |
14 | 36 | $this->client = $client; |
15 | 37 | $this->enabled = $enabled; |
| 38 | + $this->indexer = $indexer; |
| 39 | + $this->messageBus = $messageBus; |
| 40 | + $this->createAliasTaskHandler = $createAliasTaskHandler; |
| 41 | + $this->outdatedIndexDetector = $outdatedIndexDetector; |
| 42 | + $this->connection = $connection; |
| 43 | + $this->gatewayRegistry = $gatewayRegistry; |
16 | 44 | } |
17 | 45 |
|
18 | 46 | public function isEnabled(): bool |
@@ -53,4 +81,59 @@ public function deleteIndex(string $name): array |
53 | 81 | { |
54 | 82 | return $this->client->indices()->delete(['index' => $name]); |
55 | 83 | } |
| 84 | + |
| 85 | + public function proxy(string $method, string $path, array $params, array $body): array |
| 86 | + { |
| 87 | + $response = $this->client->transport->performRequest($method, $path, $params, $body); |
| 88 | + |
| 89 | + return $this->client->transport->resultOrFuture($response); |
| 90 | + } |
| 91 | + |
| 92 | + public function flushAll(): void |
| 93 | + { |
| 94 | + $this->client->indices()->flushSynced(); |
| 95 | + } |
| 96 | + |
| 97 | + public function reindex(): void |
| 98 | + { |
| 99 | + $offset = null; |
| 100 | + while ($message = $this->indexer->iterate($offset)) { |
| 101 | + $this->messageBus->dispatch($message); |
| 102 | + $offset = $message->getOffset(); |
| 103 | + } |
| 104 | + } |
| 105 | + |
| 106 | + public function switchAlias(): void |
| 107 | + { |
| 108 | + $this->createAliasTaskHandler->run(); |
| 109 | + } |
| 110 | + |
| 111 | + public function deleteUnusedIndices(): void |
| 112 | + { |
| 113 | + $indices = $this->outdatedIndexDetector->get(); |
| 114 | + |
| 115 | + foreach ($indices as $index) { |
| 116 | + $this->client->indices()->delete(['index' => $index]); |
| 117 | + } |
| 118 | + } |
| 119 | + |
| 120 | + public function reset(): void |
| 121 | + { |
| 122 | + $indices = $this->outdatedIndexDetector->getAllUsedIndices(); |
| 123 | + |
| 124 | + foreach ($indices as $index) { |
| 125 | + $this->client->indices()->delete(['index' => $index]); |
| 126 | + } |
| 127 | + |
| 128 | + $this->connection->executeStatement('TRUNCATE elasticsearch_index_task'); |
| 129 | + |
| 130 | + try { |
| 131 | + $gateway = $this->gatewayRegistry->get(IncrementGatewayRegistry::MESSAGE_QUEUE_POOL); |
| 132 | + $gateway->reset('message_queue_stats', ElasticsearchIndexingMessage::class); |
| 133 | + } catch (IncrementGatewayNotFoundException $exception) { |
| 134 | + // In case message_queue pool is disabled |
| 135 | + } |
| 136 | + |
| 137 | + $this->connection->executeStatement('DELETE FROM enqueue WHERE body LIKE "%ElasticsearchIndexingMessage%"'); |
| 138 | + } |
56 | 139 | } |
0 commit comments