From bd51857931feb6689edf97d5603588ac81436012 Mon Sep 17 00:00:00 2001 From: Nicola Turato Date: Thu, 28 Sep 2017 13:49:18 +0100 Subject: [PATCH 1/4] moved memory checks into maybeStopConsumer --- RabbitMq/BatchConsumer.php | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/RabbitMq/BatchConsumer.php b/RabbitMq/BatchConsumer.php index 54fd535f..00c20a3d 100644 --- a/RabbitMq/BatchConsumer.php +++ b/RabbitMq/BatchConsumer.php @@ -85,10 +85,6 @@ public function execute(AMQPMessage $msg) $this->addMessage($msg); $this->maybeStopConsumer(); - - if (null !== $this->getMemoryLimit() && $this->isRamAlmostOverloaded()) { - $this->stopConsuming(); - } } public function consume() @@ -189,10 +185,6 @@ protected function handleProcessMessages($processFlags = null) $this->consumed++; $this->maybeStopConsumer(); - - if (null !== $this->getMemoryLimit() && $this->isRamAlmostOverloaded()) { - $this->stopConsuming(); - } } /** @@ -390,6 +382,10 @@ protected function maybeStopConsumer() if ($this->forceStop) { $this->stopConsuming(); } + + if (null !== $this->getMemoryLimit() && $this->isRamAlmostOverloaded()) { + $this->stopConsuming(); + } } /** From 758956cfa5a1664d89dc4417349cb436fc8c6642 Mon Sep 17 00:00:00 2001 From: Nicola Turato Date: Thu, 28 Sep 2017 14:00:42 +0100 Subject: [PATCH 2/4] removed "dead" methods and unnecessary exception handling - start only pliles messages until out of memory. - no exception are thrown by addMessage or maybeStopConsumer --- RabbitMq/BatchConsumer.php | 51 ++------------------------------------ 1 file changed, 2 insertions(+), 49 deletions(-) diff --git a/RabbitMq/BatchConsumer.php b/RabbitMq/BatchConsumer.php index 00c20a3d..e64fbec6 100644 --- a/RabbitMq/BatchConsumer.php +++ b/RabbitMq/BatchConsumer.php @@ -71,22 +71,6 @@ public function setCallback($callback) return $this; } - public function start() - { - $this->setupConsumer(); - - while (count($this->getChannel()->callbacks)) { - $this->getChannel()->wait(); - } - } - - public function execute(AMQPMessage $msg) - { - $this->addMessage($msg); - - $this->maybeStopConsumer(); - } - public function consume() { $this->setupConsumer(); @@ -230,40 +214,9 @@ protected function isCompleteBatch($isConsuming) */ public function processMessage(AMQPMessage $msg) { - try { - call_user_func(array($this, 'execute'), $msg); - } catch (Exception\StopConsumerException $e) { - $this->logger->info('Consumer requested restart', array( - 'amqp' => array( - 'queue' => $this->queueOptions['name'], - 'message' => $msg, - 'stacktrace' => $e->getTraceAsString() - ) - )); - $this->stopConsuming(); - } catch (\Exception $e) { - $this->logger->error($e->getMessage(), array( - 'amqp' => array( - 'queue' => $this->queueOptions['name'], - 'message' => $msg, - 'stacktrace' => $e->getTraceAsString() - ) - )); - $this->batchConsume(); - - throw $e; - } catch (\Error $e) { - $this->logger->error($e->getMessage(), array( - 'amqp' => array( - 'queue' => $this->queueOptions['name'], - 'message' => $msg, - 'stacktrace' => $e->getTraceAsString() - ) - )); - $this->batchConsume(); + $this->addMessage($msg); - throw $e; - } + $this->maybeStopConsumer(); } /** From 7f4aeb6df3e33edc7805fadc10ac24a3858a49d0 Mon Sep 17 00:00:00 2001 From: Nicola Turato Date: Thu, 28 Sep 2017 14:04:14 +0100 Subject: [PATCH 3/4] prevent process to loop when stopConsuming is called --- RabbitMq/BatchConsumer.php | 74 +++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 42 deletions(-) diff --git a/RabbitMq/BatchConsumer.php b/RabbitMq/BatchConsumer.php index e64fbec6..b32d8fae 100644 --- a/RabbitMq/BatchConsumer.php +++ b/RabbitMq/BatchConsumer.php @@ -9,11 +9,6 @@ final class BatchConsumer extends BaseAmqp implements DequeuerInterface { - /** - * @var int - */ - private $consumed = 0; - /** * @var \Closure|callable */ @@ -75,43 +70,32 @@ public function consume() { $this->setupConsumer(); - $isConsuming = false; - $timeoutWanted = $this->getTimeoutWait(); while (count($this->getChannel()->callbacks)) { - $this->maybeStopConsumer(); - if (!$this->forceStop) { - try { - $this->getChannel()->wait(null, false, $timeoutWanted); - $isConsuming = true; - } catch (AMQPTimeoutException $e) { - $this->batchConsume(); - if ($isConsuming) { - $isConsuming = false; - } elseif (null !== $this->getIdleTimeoutExitCode()) { - return $this->getIdleTimeoutExitCode(); - } else { - throw $e; - } - } - } else { + if ($this->isCompleteBatch()) { $this->batchConsume(); } - if ($this->isCompleteBatch($isConsuming)) { - $this->batchConsume(); - } + $this->maybeStopConsumer(); + + $timeout = $this->isEmptyBatch() ? $this->getIdleTimeout() : $this->getTimeoutWait(); - $timeoutWanted = $isConsuming ? $this->getTimeoutWait() : $this->getIdleTimeout(); + try { + $this->getChannel()->wait(null, false, $timeout); + } catch (AMQPTimeoutException $e) { + if (!$this->isEmptyBatch()) { + $this->batchConsume(); + } elseif (null !== $this->getIdleTimeoutExitCode()) { + return $this->getIdleTimeoutExitCode(); + } else { + throw $e; + } + } } } - public function batchConsume() + private function batchConsume() { - if ($this->batchCounter === 0) { - return; - } - - try { + try { $processFlags = call_user_func($this->callback, $this->messages); $this->handleProcessMessages($processFlags); $this->logger->debug('Queue message processed', array( @@ -129,6 +113,7 @@ public function batchConsume() 'stacktrace' => $e->getTraceAsString() ) )); + $this->resetBatch(); $this->stopConsuming(); } catch (\Exception $e) { $this->logger->error($e->getMessage(), array( @@ -166,9 +151,6 @@ protected function handleProcessMessages($processFlags = null) foreach ($processFlags as $deliveryTag => $processFlag) { $this->handleProcessFlag($deliveryTag, $processFlag); } - - $this->consumed++; - $this->maybeStopConsumer(); } /** @@ -177,7 +159,7 @@ protected function handleProcessMessages($processFlags = null) * * @return void */ - private function handleProcessFlag ($deliveryTag, $processFlag) + private function handleProcessFlag($deliveryTag, $processFlag) { if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) { // Reject and requeue message to RabbitMQ @@ -195,13 +177,19 @@ private function handleProcessFlag ($deliveryTag, $processFlag) } /** - * @param bool $isConsuming - * * @return bool */ - protected function isCompleteBatch($isConsuming) + protected function isCompleteBatch() { - return $isConsuming && $this->batchCounter === $this->prefetchCount; + return $this->batchCounter === $this->prefetchCount; + } + + /** + * @return bool + */ + protected function isEmptyBatch() + { + return $this->batchCounter === 0; } /** @@ -300,7 +288,9 @@ private function getMessageChannel($deliveryTag) */ public function stopConsuming() { - $this->batchConsume(); + if (!$this->isEmptyBatch()) { + $this->batchConsume(); + } $this->getChannel()->basic_cancel($this->getConsumerTag()); } From 87198f7dea306ecccf7781cfe67cc49881ab0030 Mon Sep 17 00:00:00 2001 From: nturato Date: Thu, 28 Sep 2017 15:27:55 +0100 Subject: [PATCH 4/4] Update .travis.yml --- .travis.yml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 8761f8d5..62eda9df 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,11 +10,14 @@ matrix: env: SYMFONY_VERSION=2.8.* include: - - php: 5.3 + - dist: precise + php: 5.3 env: SYMFONY_VERSION=2.3.* - - php: 5.3 + - dist: precise + php: 5.3 env: SYMFONY_VERSION=2.7.* - - php: 5.3 + - dist: precise + php: 5.3 env: SYMFONY_VERSION=2.8.* - php: 5.4