Skip to content

Commit 2dc9c56

Browse files
artembilanspring-builds
authored andcommitted
GH-3032: Fix BlockingQueueConsumer for in-flight draining
Fixes: #3032 Issue link: #3032 The fix for #2941 has missed "in-flight draining" for non-transactional consumers. (cherry picked from commit 993e94a)
1 parent 2029e77 commit 2dc9c56

File tree

1 file changed

+22
-16
lines changed

1 file changed

+22
-16
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -536,10 +536,7 @@ private Message handle(@Nullable Delivery delivery) {
536536
*/
537537
@Nullable
538538
public Message nextMessage() throws InterruptedException, ShutdownSignalException {
539-
if (logger.isTraceEnabled()) {
540-
logger.trace("Retrieving delivery for " + this);
541-
}
542-
return handle(this.queue.take());
539+
return nextMessage(-1);
543540
}
544541

545542
/**
@@ -558,26 +555,35 @@ public Message nextMessage(long timeout) throws InterruptedException, ShutdownSi
558555
if (!this.missingQueues.isEmpty()) {
559556
checkMissingQueues();
560557
}
561-
if (!cancelled()) {
562-
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
563-
if (message != null && cancelled()) {
564-
this.activeObjectCounter.release(this);
565-
ConsumerCancelledException consumerCancelledException = new ConsumerCancelledException();
566-
rollbackOnExceptionIfNecessary(consumerCancelledException,
567-
message.getMessageProperties().getDeliveryTag());
568-
throw consumerCancelledException;
558+
559+
if (this.transactional && cancelled()) {
560+
throw consumerCancelledException(null);
561+
}
562+
else {
563+
Message message = handle(timeout < 0 ? this.queue.take() : this.queue.poll(timeout, TimeUnit.MILLISECONDS));
564+
if (cancelled() && (message == null || this.transactional)) {
565+
Long deliveryTagToNack = null;
566+
if (message != null) {
567+
deliveryTagToNack = message.getMessageProperties().getDeliveryTag();
568+
}
569+
throw consumerCancelledException(deliveryTagToNack);
569570
}
570571
else {
571572
return message;
572573
}
573574
}
575+
}
576+
577+
private ConsumerCancelledException consumerCancelledException(@Nullable Long deliveryTagToNack) {
578+
this.activeObjectCounter.release(this);
579+
ConsumerCancelledException consumerCancelledException = new ConsumerCancelledException();
580+
if (deliveryTagToNack != null) {
581+
rollbackOnExceptionIfNecessary(consumerCancelledException, deliveryTagToNack);
582+
}
574583
else {
575584
this.deliveryTags.clear();
576-
this.activeObjectCounter.release(this);
577-
ConsumerCancelledException consumerCancelledException = new ConsumerCancelledException();
578-
rollbackOnExceptionIfNecessary(consumerCancelledException);
579-
throw consumerCancelledException;
580585
}
586+
return consumerCancelledException;
581587
}
582588

583589
/*

0 commit comments

Comments
 (0)