Skip to content

Commit d2e974a

Browse files
artembilangaryrussell
authored andcommitted
Fix new Sonar smells (#2768)
* Fix new Sonar smells * Fix some old Sonar smells as well * Fix Micrometer leaks in the `PollableChannel` when we register meters, but don't remove them. * * Fix NPE around `MetricsCaptor` in channels * * Fix new smells according test report * * Further Sonar smell fixes * * More smell fixes for `MessagingMethodInvokerHelper` * Remove `throws Exception` from `AbstractMessageHandler.destroy()` * * Fix complexity in the `MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny()`
1 parent f741724 commit d2e974a

File tree

33 files changed

+797
-719
lines changed

33 files changed

+797
-719
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/AbstractAmqpChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -219,7 +219,7 @@ protected void onInit() {
219219
}
220220

221221
@Override
222-
public void destroy() throws Exception {
222+
public void destroy() {
223223
if (this.connectionFactory != null) {
224224
this.connectionFactory.removeConnectionListener(this);
225225
this.initialized = false;

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/AbstractSubscribableAmqpChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -240,7 +240,7 @@ public void stop(Runnable callback) {
240240
}
241241

242242
@Override
243-
public void destroy() throws Exception {
243+
public void destroy() {
244244
super.destroy();
245245
if (this.container != null) {
246246
this.container.destroy();

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/PollableAmqpChannel.java

Lines changed: 74 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Deque;
2121
import java.util.List;
2222
import java.util.Map;
23+
import java.util.concurrent.atomic.AtomicBoolean;
2324

2425
import org.springframework.amqp.core.AmqpAdmin;
2526
import org.springframework.amqp.core.AmqpTemplate;
@@ -31,6 +32,7 @@
3132
import org.springframework.integration.channel.ExecutorChannelInterceptorAware;
3233
import org.springframework.integration.support.management.PollableChannelManagement;
3334
import org.springframework.integration.support.management.metrics.CounterFacade;
35+
import org.springframework.integration.support.management.metrics.MetricsCaptor;
3436
import org.springframework.lang.Nullable;
3537
import org.springframework.messaging.Message;
3638
import org.springframework.messaging.PollableChannel;
@@ -176,80 +178,43 @@ public Message<?> receive(long timeout) {
176178
return doReceive(timeout);
177179
}
178180

179-
181+
@Nullable
180182
protected Message<?> doReceive(Long timeout) {
181183
ChannelInterceptorList interceptorList = getIChannelInterceptorList();
182184
Deque<ChannelInterceptor> interceptorStack = null;
183-
boolean counted = false;
185+
AtomicBoolean counted = new AtomicBoolean();
184186
boolean countsEnabled = isCountsEnabled();
187+
boolean traceEnabled = isLoggingEnabled() && logger.isTraceEnabled();
185188
try {
186-
if (isLoggingEnabled() && logger.isTraceEnabled()) {
189+
if (traceEnabled) {
187190
logger.trace("preReceive on channel '" + this + "'");
188191
}
189192
if (interceptorList.getInterceptors().size() > 0) {
190193
interceptorStack = new ArrayDeque<>();
191-
192194
if (!interceptorList.preReceive(this, interceptorStack)) {
193195
return null;
194196
}
195197
}
196198
Object object = performReceive(timeout);
197-
Message<?> message = null;
198-
if (object == null) {
199-
if (isLoggingEnabled() && logger.isTraceEnabled()) {
200-
logger.trace("postReceive on channel '" + this + "', message is null");
201-
}
202-
}
203-
else {
204-
if (countsEnabled) {
205-
if (getMetricsCaptor() != null) {
206-
incrementReceiveCounter();
207-
}
208-
getMetrics().afterReceive();
209-
counted = true;
210-
}
211-
if (object instanceof Message<?>) {
212-
message = (Message<?>) object;
213-
}
214-
else {
215-
message = getMessageBuilderFactory()
216-
.withPayload(object)
217-
.build();
218-
}
219-
if (isLoggingEnabled() && logger.isDebugEnabled()) {
220-
logger.debug("postReceive on channel '" + this + "', message: " + message);
221-
}
222-
}
199+
Message<?> message = buildMessageFromResult(object, traceEnabled, countsEnabled ? counted : null);
223200

224-
if (interceptorStack != null) {
225-
if (message != null) {
226-
message = interceptorList.postReceive(message, this);
227-
}
228-
interceptorList.afterReceiveCompletion(message, this, null, interceptorStack);
201+
202+
if (message != null) {
203+
message = interceptorList.postReceive(message, this);
229204
}
205+
interceptorList.afterReceiveCompletion(message, this, null, interceptorStack);
230206
return message;
231207
}
232-
catch (RuntimeException e) {
233-
if (countsEnabled && !counted) {
234-
if (getMetricsCaptor() != null) {
235-
getMetricsCaptor().counterBuilder(RECEIVE_COUNTER_NAME)
236-
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
237-
.tag("type", "channel")
238-
.tag("result", "failure")
239-
.tag("exception", e.getClass().getSimpleName())
240-
.description("Messages received")
241-
.build()
242-
.increment();
243-
}
244-
getMetrics().afterError();
245-
}
246-
if (interceptorStack != null) {
247-
interceptorList.afterReceiveCompletion(null, this, e, interceptorStack);
208+
catch (RuntimeException ex) {
209+
if (countsEnabled && !counted.get()) {
210+
incrementReceiveErrorCounter(ex);
248211
}
249-
throw e;
212+
interceptorList.afterReceiveCompletion(null, this, ex, interceptorStack);
213+
throw ex;
250214
}
251215
}
252216

217+
@Nullable
253218
protected Object performReceive(Long timeout) {
254219
if (!this.declared) {
255220
doDeclares();
@@ -289,17 +254,63 @@ protected Object performReceive(Long timeout) {
289254
}
290255
}
291256

257+
private Message<?> buildMessageFromResult(@Nullable Object object, boolean traceEnabled,
258+
@Nullable AtomicBoolean counted) {
259+
260+
Message<?> message = null;
261+
if (object != null) {
262+
if (counted != null) {
263+
incrementReceiveCounter();
264+
getMetrics().afterReceive();
265+
counted.set(true);
266+
}
267+
if (object instanceof Message<?>) {
268+
message = (Message<?>) object;
269+
}
270+
else {
271+
message = getMessageBuilderFactory()
272+
.withPayload(object)
273+
.build();
274+
}
275+
}
276+
277+
if (traceEnabled) {
278+
logger.trace("postReceive on channel '" + this
279+
+ "', message" + (message != null ? ": " + message : " is null"));
280+
}
281+
282+
return message;
283+
}
284+
292285
private void incrementReceiveCounter() {
293-
if (this.receiveCounter == null) {
294-
this.receiveCounter = getMetricsCaptor().counterBuilder(RECEIVE_COUNTER_NAME)
295-
.tag("name", getComponentName())
296-
.tag("type", "channel")
297-
.tag("result", "success")
298-
.tag("exception", "none")
299-
.description("Messages received")
300-
.build();
286+
MetricsCaptor metricsCaptor = getMetricsCaptor();
287+
if (metricsCaptor != null) {
288+
if (this.receiveCounter == null) {
289+
this.receiveCounter = buildReceiveCounter(metricsCaptor, null);
290+
}
291+
this.receiveCounter.increment();
301292
}
302-
this.receiveCounter.increment();
293+
}
294+
295+
private void incrementReceiveErrorCounter(Exception ex) {
296+
MetricsCaptor metricsCaptor = getMetricsCaptor();
297+
if (metricsCaptor != null) {
298+
buildReceiveCounter(metricsCaptor, ex).increment();
299+
}
300+
getMetrics().afterError();
301+
}
302+
303+
private CounterFacade buildReceiveCounter(MetricsCaptor metricsCaptor, @Nullable Exception ex) {
304+
CounterFacade counterFacade = metricsCaptor
305+
.counterBuilder(RECEIVE_COUNTER_NAME)
306+
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
307+
.tag("type", "channel")
308+
.tag("result", ex == null ? "success" : "failure")
309+
.tag("exception", ex == null ? "none" : ex.getClass().getSimpleName())
310+
.description("Messages received")
311+
.build();
312+
this.meters.add(counterFacade);
313+
return counterFacade;
303314
}
304315

305316

@@ -339,6 +350,7 @@ public boolean removeInterceptor(ChannelInterceptor interceptor) {
339350
}
340351

341352
@Override
353+
@Nullable
342354
public ChannelInterceptor removeInterceptor(int index) {
343355
ChannelInterceptor interceptor = super.removeInterceptor(index);
344356
if (interceptor instanceof ExecutorChannelInterceptor) {
@@ -353,7 +365,7 @@ public boolean hasExecutorInterceptors() {
353365
}
354366

355367
@Override
356-
public void destroy() throws Exception {
368+
public void destroy() {
357369
super.destroy();
358370
if (this.receiveCounter != null) {
359371
this.receiveCounter.remove();

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 57 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -386,13 +386,10 @@ protected ReleaseStrategy getReleaseStrategy() {
386386

387387
@Override
388388
public MessageChannel getDiscardChannel() {
389-
if (this.discardChannelName != null) {
390-
synchronized (this) {
391-
if (this.discardChannelName != null) {
392-
this.discardChannel = getChannelResolver().resolveDestination(this.discardChannelName);
393-
this.discardChannelName = null;
394-
}
395-
}
389+
String channelName = this.discardChannelName;
390+
if (channelName != null) {
391+
this.discardChannel = getChannelResolver().resolveDestination(channelName);
392+
this.discardChannelName = null;
396393
}
397394
return this.discardChannel;
398395
}
@@ -449,52 +446,62 @@ protected void handleMessageInternal(Message<?> message) throws InterruptedExcep
449446
boolean noOutput = true;
450447
lock.lockInterruptibly();
451448
try {
452-
ScheduledFuture<?> scheduledFuture = this.expireGroupScheduledFutures.remove(groupIdUuid);
453-
if (scheduledFuture != null) {
454-
boolean canceled = scheduledFuture.cancel(true);
455-
if (canceled && this.logger.isDebugEnabled()) {
456-
this.logger.debug("Cancel 'ScheduledFuture' for MessageGroup with Correlation Key [ "
457-
+ correlationKey + "].");
458-
}
449+
noOutput = processMessageForGroup(message, correlationKey, groupIdUuid, lock);
450+
}
451+
finally {
452+
if (noOutput || !this.releaseLockBeforeSend) {
453+
lock.unlock();
459454
}
460-
MessageGroup messageGroup = this.messageStore.getMessageGroup(correlationKey);
461-
if (this.sequenceAware) {
462-
messageGroup = new SequenceAwareMessageGroup(messageGroup);
455+
}
456+
}
457+
458+
private boolean processMessageForGroup(Message<?> message, Object correlationKey, UUID groupIdUuid, Lock lock) {
459+
boolean noOutput = true;
460+
cancelScheduledFutureIfAny(correlationKey, groupIdUuid, true);
461+
MessageGroup messageGroup = this.messageStore.getMessageGroup(correlationKey);
462+
if (this.sequenceAware) {
463+
messageGroup = new SequenceAwareMessageGroup(messageGroup);
464+
}
465+
466+
if (!messageGroup.isComplete() && messageGroup.canAdd(message)) {
467+
if (this.logger.isTraceEnabled()) {
468+
this.logger.trace("Adding message to group [ " + messageGroup + "]");
463469
}
470+
messageGroup = store(correlationKey, message);
464471

465-
if (!messageGroup.isComplete() && messageGroup.canAdd(message)) {
466-
if (this.logger.isTraceEnabled()) {
467-
this.logger.trace("Adding message to group [ " + messageGroup + "]");
472+
if (this.releaseStrategy.canRelease(messageGroup)) {
473+
Collection<Message<?>> completedMessages = null;
474+
try {
475+
noOutput = false;
476+
completedMessages = completeGroup(message, correlationKey, messageGroup, lock);
468477
}
469-
messageGroup = this.store(correlationKey, message);
470-
471-
if (this.releaseStrategy.canRelease(messageGroup)) {
472-
Collection<Message<?>> completedMessages = null;
473-
try {
474-
noOutput = false;
475-
completedMessages = completeGroup(message, correlationKey, messageGroup, lock);
476-
}
477-
finally {
478-
// Possible clean (implementation dependency) up
479-
// even if there was an exception processing messages
480-
afterRelease(messageGroup, completedMessages);
481-
}
482-
if (!isExpireGroupsUponCompletion() && this.minimumTimeoutForEmptyGroups > 0) {
483-
removeEmptyGroupAfterTimeout(messageGroup, this.minimumTimeoutForEmptyGroups);
484-
}
478+
finally {
479+
// Possible clean (implementation dependency) up
480+
// even if there was an exception processing messages
481+
afterRelease(messageGroup, completedMessages);
485482
}
486-
else {
487-
scheduleGroupToForceComplete(messageGroup);
483+
if (!isExpireGroupsUponCompletion() && this.minimumTimeoutForEmptyGroups > 0) {
484+
removeEmptyGroupAfterTimeout(messageGroup, this.minimumTimeoutForEmptyGroups);
488485
}
489486
}
490487
else {
491-
noOutput = false;
492-
discardMessage(message, lock);
488+
scheduleGroupToForceComplete(messageGroup);
493489
}
494490
}
495-
finally {
496-
if (noOutput || !this.releaseLockBeforeSend) {
497-
lock.unlock();
491+
else {
492+
noOutput = false;
493+
discardMessage(message, lock);
494+
}
495+
return noOutput;
496+
}
497+
498+
private void cancelScheduledFutureIfAny(Object correlationKey, UUID groupIdUuid, boolean mayInterruptIfRunning) {
499+
ScheduledFuture<?> scheduledFuture = this.expireGroupScheduledFutures.remove(groupIdUuid);
500+
if (scheduledFuture != null) {
501+
boolean canceled = scheduledFuture.cancel(mayInterruptIfRunning);
502+
if (canceled && this.logger.isDebugEnabled()) {
503+
this.logger.debug("Cancel 'ScheduledFuture' for MessageGroup with Correlation Key [ "
504+
+ correlationKey + "].");
498505
}
499506
}
500507
}
@@ -606,7 +613,10 @@ private void discardMessage(Message<?> message, Lock lock) {
606613
}
607614

608615
private void discardMessage(Message<?> message) {
609-
this.messagingTemplate.send(getDiscardChannel(), message);
616+
MessageChannel messageChannel = getDiscardChannel();
617+
if (messageChannel != null) {
618+
this.messagingTemplate.send(messageChannel, message);
619+
}
610620
}
611621

612622
/**
@@ -630,20 +640,14 @@ protected void afterRelease(MessageGroup group, Collection<Message<?>> completed
630640
protected void forceComplete(MessageGroup group) {
631641
Object correlationKey = group.getGroupId();
632642
// UUIDConverter is no-op if already converted
633-
Lock lock = this.lockRegistry.obtain(UUIDConverter.getUUID(correlationKey).toString());
643+
UUID groupId = UUIDConverter.getUUID(correlationKey);
644+
Lock lock = this.lockRegistry.obtain(groupId.toString());
634645
boolean removeGroup = true;
635646
boolean noOutput = true;
636647
try {
637648
lock.lockInterruptibly();
638649
try {
639-
ScheduledFuture<?> scheduledFuture =
640-
this.expireGroupScheduledFutures.remove(UUIDConverter.getUUID(correlationKey));
641-
if (scheduledFuture != null) {
642-
boolean canceled = scheduledFuture.cancel(false);
643-
if (canceled && this.logger.isDebugEnabled()) {
644-
this.logger.debug("Cancel 'forceComplete' scheduling for MessageGroup [ " + group + "].");
645-
}
646-
}
650+
cancelScheduledFutureIfAny(correlationKey, groupId, false);
647651
MessageGroup groupNow = group;
648652
/*
649653
* If the group argument is not already complete,

0 commit comments

Comments
 (0)