Skip to content

Commit b25ae82

Browse files
artembilanspring-builds
authored andcommitted
GH-3039: Move Recovery in BlockingQueueConsumer into stop()
Fixes: #3039 Issue link: #3039 Currently, the `BlockingQueueConsumer` initiates a Basic Recovery command on the channel for transactional consumer immediately after Basic Cancel. However, it is possible still to try to handle in-flight messages during `shutdownTimeout` in the listener container * Leave only Basic Cancel command in the `BlockingQueueConsumer.basicCancel()` API * Revert `BlockingQueueConsumer.nextMessage(timeout)` method logic to normal loop until message pulled from the in-memory cache is `null` * Call `basicCancel(true)` from the `stop()` is not cancelled yet * Perform `channel.basicRecover()` for transactional channel in the `stop()`. This `stop()` is usually called from the listener container when in-flight messages have not been processed during `shutdownTimeout` (cherry picked from commit 14fe215)
1 parent 2dc9c56 commit b25ae82

File tree

1 file changed

+26
-40
lines changed

1 file changed

+26
-40
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 26 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -461,18 +461,19 @@ int getQueueCount() {
461461
}
462462

463463
protected void basicCancel() {
464-
basicCancel(false);
464+
basicCancel(true);
465465
}
466466

467467
protected void basicCancel(boolean expected) {
468468
this.normalCancel = expected;
469+
getConsumerTags()
470+
.forEach(consumerTag -> {
471+
if (this.channel.isOpen()) {
472+
RabbitUtils.cancel(this.channel, consumerTag);
473+
}
474+
});
469475
this.cancelled.set(true);
470476
this.abortStarted = System.currentTimeMillis();
471-
472-
Collection<String> consumerTags = getConsumerTags();
473-
if (!CollectionUtils.isEmpty(consumerTags)) {
474-
RabbitUtils.closeMessageConsumer(this.channel, consumerTags, this.transactional);
475-
}
476477
}
477478

478479
protected boolean hasDelivery() {
@@ -555,35 +556,12 @@ public Message nextMessage(long timeout) throws InterruptedException, ShutdownSi
555556
if (!this.missingQueues.isEmpty()) {
556557
checkMissingQueues();
557558
}
558-
559-
if (this.transactional && cancelled()) {
560-
throw consumerCancelledException(null);
561-
}
562-
else {
563-
Message message = handle(timeout < 0 ? this.queue.take() : this.queue.poll(timeout, TimeUnit.MILLISECONDS));
564-
if (cancelled() && (message == null || this.transactional)) {
565-
Long deliveryTagToNack = null;
566-
if (message != null) {
567-
deliveryTagToNack = message.getMessageProperties().getDeliveryTag();
568-
}
569-
throw consumerCancelledException(deliveryTagToNack);
570-
}
571-
else {
572-
return message;
573-
}
574-
}
575-
}
576-
577-
private ConsumerCancelledException consumerCancelledException(@Nullable Long deliveryTagToNack) {
578-
this.activeObjectCounter.release(this);
579-
ConsumerCancelledException consumerCancelledException = new ConsumerCancelledException();
580-
if (deliveryTagToNack != null) {
581-
rollbackOnExceptionIfNecessary(consumerCancelledException, deliveryTagToNack);
582-
}
583-
else {
584-
this.deliveryTags.clear();
559+
Message message = handle(timeout < 0 ? this.queue.take() : this.queue.poll(timeout, TimeUnit.MILLISECONDS));
560+
if (message == null && this.cancelled.get()) {
561+
this.activeObjectCounter.release(this);
562+
throw new ConsumerCancelledException();
585563
}
586-
return consumerCancelledException;
564+
return message;
587565
}
588566

589567
/*
@@ -809,13 +787,21 @@ public void stop() {
809787
this.abortStarted = System.currentTimeMillis();
810788
}
811789
if (!cancelled()) {
812-
try {
813-
RabbitUtils.closeMessageConsumer(this.channel, getConsumerTags(), this.transactional);
790+
basicCancel(true);
791+
}
792+
try {
793+
if (this.transactional) {
794+
/*
795+
* Re-queue in-flight messages if any
796+
* (after the consumer is cancelled to prevent the broker from simply sending them back to us).
797+
* Does not require a tx.commit.
798+
*/
799+
this.channel.basicRecover(true);
814800
}
815-
catch (Exception e) {
816-
if (logger.isDebugEnabled()) {
817-
logger.debug("Error closing consumer " + this, e);
818-
}
801+
}
802+
catch (Exception e) {
803+
if (logger.isDebugEnabled()) {
804+
logger.debug("Error closing consumer " + this, e);
819805
}
820806
}
821807
if (logger.isDebugEnabled()) {

0 commit comments

Comments
 (0)