diff --git a/.travis.yml b/.travis.yml index 8761f8d5..62eda9df 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,11 +10,14 @@ matrix: env: SYMFONY_VERSION=2.8.* include: - - php: 5.3 + - dist: precise + php: 5.3 env: SYMFONY_VERSION=2.3.* - - php: 5.3 + - dist: precise + php: 5.3 env: SYMFONY_VERSION=2.7.* - - php: 5.3 + - dist: precise + php: 5.3 env: SYMFONY_VERSION=2.8.* - php: 5.4 diff --git a/RabbitMq/BatchConsumer.php b/RabbitMq/BatchConsumer.php index 54fd535f..b32d8fae 100644 --- a/RabbitMq/BatchConsumer.php +++ b/RabbitMq/BatchConsumer.php @@ -9,11 +9,6 @@ final class BatchConsumer extends BaseAmqp implements DequeuerInterface { - /** - * @var int - */ - private $consumed = 0; - /** * @var \Closure|callable */ @@ -71,67 +66,36 @@ public function setCallback($callback) return $this; } - public function start() + public function consume() { $this->setupConsumer(); while (count($this->getChannel()->callbacks)) { - $this->getChannel()->wait(); - } - } - - public function execute(AMQPMessage $msg) - { - $this->addMessage($msg); - - $this->maybeStopConsumer(); + if ($this->isCompleteBatch()) { + $this->batchConsume(); + } - if (null !== $this->getMemoryLimit() && $this->isRamAlmostOverloaded()) { - $this->stopConsuming(); - } - } + $this->maybeStopConsumer(); - public function consume() - { - $this->setupConsumer(); + $timeout = $this->isEmptyBatch() ? $this->getIdleTimeout() : $this->getTimeoutWait(); - $isConsuming = false; - $timeoutWanted = $this->getTimeoutWait(); - while (count($this->getChannel()->callbacks)) { - $this->maybeStopConsumer(); - if (!$this->forceStop) { - try { - $this->getChannel()->wait(null, false, $timeoutWanted); - $isConsuming = true; - } catch (AMQPTimeoutException $e) { + try { + $this->getChannel()->wait(null, false, $timeout); + } catch (AMQPTimeoutException $e) { + if (!$this->isEmptyBatch()) { $this->batchConsume(); - if ($isConsuming) { - $isConsuming = false; - } elseif (null !== $this->getIdleTimeoutExitCode()) { - return $this->getIdleTimeoutExitCode(); - } else { - throw $e; - } + } elseif (null !== $this->getIdleTimeoutExitCode()) { + return $this->getIdleTimeoutExitCode(); + } else { + throw $e; } - } else { - $this->batchConsume(); } - - if ($this->isCompleteBatch($isConsuming)) { - $this->batchConsume(); - } - - $timeoutWanted = $isConsuming ? $this->getTimeoutWait() : $this->getIdleTimeout(); } } - public function batchConsume() + private function batchConsume() { - if ($this->batchCounter === 0) { - return; - } - - try { + try { $processFlags = call_user_func($this->callback, $this->messages); $this->handleProcessMessages($processFlags); $this->logger->debug('Queue message processed', array( @@ -149,6 +113,7 @@ public function batchConsume() 'stacktrace' => $e->getTraceAsString() ) )); + $this->resetBatch(); $this->stopConsuming(); } catch (\Exception $e) { $this->logger->error($e->getMessage(), array( @@ -186,13 +151,6 @@ protected function handleProcessMessages($processFlags = null) foreach ($processFlags as $deliveryTag => $processFlag) { $this->handleProcessFlag($deliveryTag, $processFlag); } - - $this->consumed++; - $this->maybeStopConsumer(); - - if (null !== $this->getMemoryLimit() && $this->isRamAlmostOverloaded()) { - $this->stopConsuming(); - } } /** @@ -201,7 +159,7 @@ protected function handleProcessMessages($processFlags = null) * * @return void */ - private function handleProcessFlag ($deliveryTag, $processFlag) + private function handleProcessFlag($deliveryTag, $processFlag) { if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) { // Reject and requeue message to RabbitMQ @@ -219,13 +177,19 @@ private function handleProcessFlag ($deliveryTag, $processFlag) } /** - * @param bool $isConsuming - * * @return bool */ - protected function isCompleteBatch($isConsuming) + protected function isCompleteBatch() { - return $isConsuming && $this->batchCounter === $this->prefetchCount; + return $this->batchCounter === $this->prefetchCount; + } + + /** + * @return bool + */ + protected function isEmptyBatch() + { + return $this->batchCounter === 0; } /** @@ -238,40 +202,9 @@ protected function isCompleteBatch($isConsuming) */ public function processMessage(AMQPMessage $msg) { - try { - call_user_func(array($this, 'execute'), $msg); - } catch (Exception\StopConsumerException $e) { - $this->logger->info('Consumer requested restart', array( - 'amqp' => array( - 'queue' => $this->queueOptions['name'], - 'message' => $msg, - 'stacktrace' => $e->getTraceAsString() - ) - )); - $this->stopConsuming(); - } catch (\Exception $e) { - $this->logger->error($e->getMessage(), array( - 'amqp' => array( - 'queue' => $this->queueOptions['name'], - 'message' => $msg, - 'stacktrace' => $e->getTraceAsString() - ) - )); - $this->batchConsume(); - - throw $e; - } catch (\Error $e) { - $this->logger->error($e->getMessage(), array( - 'amqp' => array( - 'queue' => $this->queueOptions['name'], - 'message' => $msg, - 'stacktrace' => $e->getTraceAsString() - ) - )); - $this->batchConsume(); + $this->addMessage($msg); - throw $e; - } + $this->maybeStopConsumer(); } /** @@ -355,7 +288,9 @@ private function getMessageChannel($deliveryTag) */ public function stopConsuming() { - $this->batchConsume(); + if (!$this->isEmptyBatch()) { + $this->batchConsume(); + } $this->getChannel()->basic_cancel($this->getConsumerTag()); } @@ -390,6 +325,10 @@ protected function maybeStopConsumer() if ($this->forceStop) { $this->stopConsuming(); } + + if (null !== $this->getMemoryLimit() && $this->isRamAlmostOverloaded()) { + $this->stopConsuming(); + } } /**