Skip to content

Commit f367bae

Browse files
committed
GH-1338: Polishing
- Add a default ack listener - Use lambdas in tests - NOSONAR tags
1 parent b5f85b0 commit f367bae

File tree

8 files changed

+76
-82
lines changed

8 files changed

+76
-82
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,9 +327,10 @@ public void setGlobalQos(boolean globalQos) {
327327
}
328328

329329
/**
330-
* Set a {@link MessageAckListener} to use when ack a message(messages) in {@link AcknowledgeMode#AUTO} mode.
330+
* Set a {@link MessageAckListener} to use when ack a message(messages) in
331+
* {@link AcknowledgeMode#AUTO} mode.
331332
* @param messageAckListener the messageAckListener.
332-
* @see AbstractMessageListenerContainer#setMessageAckListener(MessageAckListener)
333+
* @since 2.4.6
333334
*/
334335
public void setMessageAckListener(MessageAckListener messageAckListener) {
335336
this.messageAckListener = messageAckListener;

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor
252252

253253
private boolean asyncReplies;
254254

255-
private MessageAckListener messageAckListener;
255+
private MessageAckListener messageAckListener = (success, deliveryTag, cause) -> { };
256256

257257
@Override
258258
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
@@ -1191,8 +1191,10 @@ public void setjavaLangErrorHandler(JavaLangErrorHandler javaLangErrorHandler) {
11911191
}
11921192

11931193
/**
1194-
* Set a {@link MessageAckListener} to use when ack a message(messages) in {@link AcknowledgeMode#AUTO} mode.
1194+
* Set a {@link MessageAckListener} to use when ack a message(messages) in
1195+
* {@link AcknowledgeMode#AUTO} mode.
11951196
* @param messageAckListener the messageAckListener.
1197+
* @since 2.4.6
11961198
* @see MessageAckListener
11971199
* @see AcknowledgeMode
11981200
*/

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.springframework.context.ApplicationEventPublisher;
6666
import org.springframework.lang.Nullable;
6767
import org.springframework.transaction.support.TransactionSynchronizationManager;
68+
import org.springframework.util.Assert;
6869
import org.springframework.util.ObjectUtils;
6970
import org.springframework.util.backoff.BackOffExecution;
7071

@@ -173,6 +174,8 @@ public class BlockingQueueConsumer {
173174

174175
volatile boolean declaring; // NOSONAR package protected
175176

177+
private MessageAckListener messageAckListener;
178+
176179
/**
177180
* Create a consumer. The consumer must not attempt to use
178181
* the connection factory or communicate with the broker
@@ -189,6 +192,7 @@ public BlockingQueueConsumer(ConnectionFactory connectionFactory,
189192
MessagePropertiesConverter messagePropertiesConverter,
190193
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
191194
boolean transactional, int prefetchCount, String... queues) {
195+
192196
this(connectionFactory, messagePropertiesConverter, activeObjectCounter,
193197
acknowledgeMode, transactional, prefetchCount, true, queues);
194198
}
@@ -399,6 +403,17 @@ public void setConsumeDelay(long consumeDelay) {
399403
this.consumeDelay = consumeDelay;
400404
}
401405

406+
/**
407+
* Set a {@link MessageAckListener} to use when ack a message(messages) in
408+
* {@link AcknowledgeMode#AUTO} mode.
409+
* @param messageAckListener the messageAckListener.
410+
* @since 2.4.6
411+
*/
412+
public void setMessageAckListener(MessageAckListener messageAckListener) {
413+
Assert.notNull(messageAckListener, "'messageAckListener' cannot be null");
414+
this.messageAckListener = messageAckListener;
415+
}
416+
402417
/**
403418
* Clear the delivery tags when rolling back with an external transaction
404419
* manager.
@@ -841,7 +856,7 @@ public void rollbackOnExceptionIfNecessary(Throwable ex, long tag) {
841856
* @return true if at least one delivery tag exists.
842857
* @throws IOException Any IOException.
843858
*/
844-
public boolean commitIfNecessary(boolean localTx, MessageAckListener messageAckListener) throws IOException {
859+
public boolean commitIfNecessary(boolean localTx) throws IOException {
845860
if (this.deliveryTags.isEmpty()) {
846861
return false;
847862
}
@@ -860,11 +875,11 @@ public boolean commitIfNecessary(boolean localTx, MessageAckListener messageAckL
860875
long deliveryTag = new ArrayList<Long>(this.deliveryTags).get(this.deliveryTags.size() - 1);
861876
try {
862877
this.channel.basicAck(deliveryTag, true);
863-
notifyMessageAckListener(messageAckListener, true, deliveryTag, null);
878+
notifyMessageAckListener(true, deliveryTag, null);
864879
}
865880
catch (Exception e) {
866881
logger.error("Error acking.", e);
867-
notifyMessageAckListener(messageAckListener, false, deliveryTag, e);
882+
notifyMessageAckListener(false, deliveryTag, e);
868883
}
869884
}
870885

@@ -883,24 +898,18 @@ public boolean commitIfNecessary(boolean localTx, MessageAckListener messageAckL
883898
}
884899

885900
/**
886-
* Notify MessageAckListener set on the relevant message listener.
887-
* @param messageAckListener MessageAckListener set on the message listener.
901+
* Notify MessageAckListener set on message listener.
888902
* @param success Whether ack succeeded.
889903
* @param deliveryTag The deliveryTag of ack.
890904
* @param cause If an exception occurs.
905+
* @since 2.4.6
891906
*/
892-
private void notifyMessageAckListener(@Nullable MessageAckListener messageAckListener,
893-
boolean success,
894-
long deliveryTag,
895-
@Nullable Throwable cause) {
896-
if (messageAckListener == null) {
897-
return;
898-
}
907+
private void notifyMessageAckListener(boolean success, long deliveryTag, @Nullable Throwable cause) {
899908
try {
900-
messageAckListener.onComplete(success, deliveryTag, cause);
909+
this.messageAckListener.onComplete(success, deliveryTag, cause);
901910
}
902911
catch (Exception e) {
903-
logger.error("An exception occured on MessageAckListener.", e);
912+
logger.error("An exception occured in MessageAckListener.", e);
904913
}
905914
}
906915

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,7 +1195,7 @@ else if (!isChannelTransacted() || isLocallyTransacted) {
11951195
* @param now the current time.
11961196
* @throws IOException if one occurs.
11971197
*/
1198-
synchronized void ackIfNecessary(long now) throws Exception {
1198+
synchronized void ackIfNecessary(long now) throws Exception { // NOSONAR
11991199
if (this.pendingAcks >= this.messagesPerAck || (
12001200
this.pendingAcks > 0 && (now - this.lastAck > this.ackTimeout || this.canceled))) {
12011201
sendAck(now);
@@ -1227,7 +1227,7 @@ private void rollback(long deliveryTag, Exception e) {
12271227
}
12281228
}
12291229

1230-
protected synchronized void sendAck(long now) throws Exception {
1230+
protected synchronized void sendAck(long now) throws Exception { // NOSONAR
12311231
sendAckWithNotify(this.latestDeferredDeliveryTag, true);
12321232
this.lastAck = now;
12331233
this.pendingAcks = 0;
@@ -1238,14 +1238,15 @@ protected synchronized void sendAck(long now) throws Exception {
12381238
* @param deliveryTag DeliveryTag of this ack.
12391239
* @param multiple Whether multiple ack.
12401240
* @throws Exception Occured when ack.
1241+
* @Since 2.4.6
12411242
*/
1242-
private void sendAckWithNotify(long deliveryTag, boolean multiple) throws Exception {
1243+
private void sendAckWithNotify(long deliveryTag, boolean multiple) throws Exception { // NOSONAR
12431244
try {
12441245
getChannel().basicAck(deliveryTag, multiple);
1245-
notifyMessageAckListener(getMessageAckListener(), true, deliveryTag, null);
1246+
notifyMessageAckListener(true, deliveryTag, null);
12461247
}
12471248
catch (Exception e) {
1248-
notifyMessageAckListener(getMessageAckListener(), false, deliveryTag, e);
1249+
notifyMessageAckListener(false, deliveryTag, e);
12491250
throw e;
12501251
}
12511252
}
@@ -1256,16 +1257,11 @@ private void sendAckWithNotify(long deliveryTag, boolean multiple) throws Except
12561257
* @param success Whether ack succeeded.
12571258
* @param deliveryTag The deliveryTag of ack.
12581259
* @param cause If an exception occurs.
1260+
* @since 2.4.6
12591261
*/
1260-
private void notifyMessageAckListener(@Nullable MessageAckListener messageAckListener,
1261-
boolean success,
1262-
long deliveryTag,
1263-
@Nullable Throwable cause) {
1264-
if (messageAckListener == null) {
1265-
return;
1266-
}
1262+
private void notifyMessageAckListener(boolean success, long deliveryTag, @Nullable Throwable cause) {
12671263
try {
1268-
messageAckListener.onComplete(success, deliveryTag, cause);
1264+
getMessageAckListener().onComplete(success, deliveryTag, cause);
12691265
}
12701266
catch (Exception e) {
12711267
this.logger.error("An exception occured on MessageAckListener.", e);

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/MessageAckListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
* A listener for message ack when using {@link AcknowledgeMode#AUTO}.
2424
*
2525
* @author Cao Weibo
26-
* @see AbstractMessageListenerContainer#setMessageAckListener(MessageAckListener)
26+
* @since 2.4.6
2727
*/
2828
@FunctionalInterface
2929
public interface MessageAckListener {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -865,6 +865,7 @@ this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefet
865865
consumer.setBackOffExecution(getRecoveryBackOff().start());
866866
consumer.setShutdownTimeout(getShutdownTimeout());
867867
consumer.setApplicationEventPublisher(getApplicationEventPublisher());
868+
consumer.setMessageAckListener(getMessageAckListener());
868869
return consumer;
869870
}
870871

@@ -1044,7 +1045,7 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
10441045
executeWithList(channel, messages, deliveryTag, consumer);
10451046
}
10461047

1047-
return consumer.commitIfNecessary(isChannelLocallyTransacted(), getMessageAckListener());
1048+
return consumer.commitIfNecessary(isChannelLocallyTransacted());
10481049

10491050
}
10501051

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -739,15 +739,12 @@ public void testMessageAckListenerWithSuccessfulAck() throws Exception {
739739
public void onMessage(Message message) {
740740
}
741741
});
742-
container.setMessageAckListener(new MessageAckListener() {
743-
@Override
744-
public void onComplete(boolean success, long deliveryTag, Throwable cause) throws Exception {
745-
calledTimes.incrementAndGet();
746-
ackDeliveryTag.set(deliveryTag);
747-
ackSuccess.set(success);
748-
ackCause.set(cause);
749-
latch.countDown();
750-
}
742+
container.setMessageAckListener((success, deliveryTag, cause) -> {
743+
calledTimes.incrementAndGet();
744+
ackDeliveryTag.set(deliveryTag);
745+
ackSuccess.set(success);
746+
ackCause.set(cause);
747+
latch.countDown();
751748
});
752749
container.start();
753750
RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
@@ -779,15 +776,12 @@ public void onMessage(Message message) {
779776
cf.resetConnection();
780777
}
781778
});
782-
container.setMessageAckListener(new MessageAckListener() {
783-
@Override
784-
public void onComplete(boolean success, long deliveryTag, Throwable cause) throws Exception {
785-
called.set(true);
786-
ackDeliveryTag.set(deliveryTag);
787-
ackSuccess.set(success);
788-
ackCause.set(cause);
789-
latch.countDown();
790-
}
779+
container.setMessageAckListener((success, deliveryTag, cause) -> {
780+
called.set(true);
781+
ackDeliveryTag.set(deliveryTag);
782+
ackSuccess.set(success);
783+
ackCause.set(cause);
784+
latch.countDown();
791785
});
792786
container.start();
793787
new RabbitTemplate(cf).convertAndSend(Q1, "foo");
@@ -813,15 +807,12 @@ public void testMessageAckListenerWithBatchAck() throws Exception {
813807
container.setMessagesPerAck(messageCount);
814808
container.setMessageListener(message -> {
815809
});
816-
container.setMessageAckListener(new MessageAckListener() {
817-
@Override
818-
public void onComplete(boolean success, long deliveryTag, Throwable cause) throws Exception {
819-
calledTimes.incrementAndGet();
820-
ackDeliveryTag.set(deliveryTag);
821-
ackSuccess.set(success);
822-
ackCause.set(cause);
823-
latch.countDown();
824-
}
810+
container.setMessageAckListener((success, deliveryTag, cause) -> {
811+
calledTimes.incrementAndGet();
812+
ackDeliveryTag.set(deliveryTag);
813+
ackSuccess.set(success);
814+
ackCause.set(cause);
815+
latch.countDown();
825816
});
826817
container.start();
827818
RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegration2Tests.java

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -693,18 +693,15 @@ public void testMessageAckListenerWithSuccessfulAck() throws Exception {
693693
this.container = createContainer((ChannelAwareMessageListener) (m, c) -> {
694694
}, false, this.queue.getName());
695695
this.container.setAcknowledgeMode(AcknowledgeMode.AUTO);
696+
this.container.setMessageAckListener((success, deliveryTag, cause) -> {
697+
calledTimes.incrementAndGet();
698+
ackDeliveryTag.set(deliveryTag);
699+
ackSuccess.set(success);
700+
ackCause.set(cause);
701+
latch.countDown();
702+
});
696703
this.container.afterPropertiesSet();
697704
this.container.start();
698-
this.container.setMessageAckListener(new MessageAckListener() {
699-
@Override
700-
public void onComplete(boolean success, long deliveryTag, Throwable cause) throws Exception {
701-
calledTimes.incrementAndGet();
702-
ackDeliveryTag.set(deliveryTag);
703-
ackSuccess.set(success);
704-
ackCause.set(cause);
705-
latch.countDown();
706-
}
707-
});
708705
int messageCount = 5;
709706
for (int i = 0; i < messageCount; i++) {
710707
this.template.convertAndSend(this.queue.getName(), "foo");
@@ -729,18 +726,15 @@ public void testMessageAckListenerWithBatchAck() throws Exception {
729726
this.container.setBatchSize(5);
730727
this.container.setConsumerBatchEnabled(true);
731728
this.container.setAcknowledgeMode(AcknowledgeMode.AUTO);
729+
this.container.setMessageAckListener((success, deliveryTag, cause) -> {
730+
calledTimes.incrementAndGet();
731+
ackDeliveryTag.set(deliveryTag);
732+
ackSuccess.set(success);
733+
ackCause.set(cause);
734+
latch.countDown();
735+
});
732736
this.container.afterPropertiesSet();
733737
this.container.start();
734-
this.container.setMessageAckListener(new MessageAckListener() {
735-
@Override
736-
public void onComplete(boolean success, long deliveryTag, Throwable cause) throws Exception {
737-
calledTimes.incrementAndGet();
738-
ackDeliveryTag.set(deliveryTag);
739-
ackSuccess.set(success);
740-
ackCause.set(cause);
741-
latch.countDown();
742-
}
743-
});
744738
int messageCount = 5;
745739
for (int i = 0; i < messageCount; i++) {
746740
this.template.convertAndSend(this.queue.getName(), "foo");

0 commit comments

Comments
 (0)