Skip to content

Fix new Sonar smells #2768

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -219,7 +219,7 @@ protected void onInit() {
}

@Override
public void destroy() throws Exception {
public void destroy() {
if (this.connectionFactory != null) {
this.connectionFactory.removeConnectionListener(this);
this.initialized = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -240,7 +240,7 @@ public void stop(Runnable callback) {
}

@Override
public void destroy() throws Exception {
public void destroy() {
super.destroy();
if (this.container != null) {
this.container.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
Expand All @@ -31,6 +32,7 @@
import org.springframework.integration.channel.ExecutorChannelInterceptorAware;
import org.springframework.integration.support.management.PollableChannelManagement;
import org.springframework.integration.support.management.metrics.CounterFacade;
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
Expand Down Expand Up @@ -176,80 +178,43 @@ public Message<?> receive(long timeout) {
return doReceive(timeout);
}


@Nullable
protected Message<?> doReceive(Long timeout) {
ChannelInterceptorList interceptorList = getIChannelInterceptorList();
Deque<ChannelInterceptor> interceptorStack = null;
boolean counted = false;
AtomicBoolean counted = new AtomicBoolean();
boolean countsEnabled = isCountsEnabled();
boolean traceEnabled = isLoggingEnabled() && logger.isTraceEnabled();
try {
if (isLoggingEnabled() && logger.isTraceEnabled()) {
if (traceEnabled) {
logger.trace("preReceive on channel '" + this + "'");
}
if (interceptorList.getInterceptors().size() > 0) {
interceptorStack = new ArrayDeque<>();

if (!interceptorList.preReceive(this, interceptorStack)) {
return null;
}
}
Object object = performReceive(timeout);
Message<?> message = null;
if (object == null) {
if (isLoggingEnabled() && logger.isTraceEnabled()) {
logger.trace("postReceive on channel '" + this + "', message is null");
}
}
else {
if (countsEnabled) {
if (getMetricsCaptor() != null) {
incrementReceiveCounter();
}
getMetrics().afterReceive();
counted = true;
}
if (object instanceof Message<?>) {
message = (Message<?>) object;
}
else {
message = getMessageBuilderFactory()
.withPayload(object)
.build();
}
if (isLoggingEnabled() && logger.isDebugEnabled()) {
logger.debug("postReceive on channel '" + this + "', message: " + message);
}
}
Message<?> message = buildMessageFromResult(object, traceEnabled, countsEnabled ? counted : null);

if (interceptorStack != null) {
if (message != null) {
message = interceptorList.postReceive(message, this);
}
interceptorList.afterReceiveCompletion(message, this, null, interceptorStack);

if (message != null) {
message = interceptorList.postReceive(message, this);
}
interceptorList.afterReceiveCompletion(message, this, null, interceptorStack);
return message;
}
catch (RuntimeException e) {
if (countsEnabled && !counted) {
if (getMetricsCaptor() != null) {
getMetricsCaptor().counterBuilder(RECEIVE_COUNTER_NAME)
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
.tag("type", "channel")
.tag("result", "failure")
.tag("exception", e.getClass().getSimpleName())
.description("Messages received")
.build()
.increment();
}
getMetrics().afterError();
}
if (interceptorStack != null) {
interceptorList.afterReceiveCompletion(null, this, e, interceptorStack);
catch (RuntimeException ex) {
if (countsEnabled && !counted.get()) {
incrementReceiveErrorCounter(ex);
}
throw e;
interceptorList.afterReceiveCompletion(null, this, ex, interceptorStack);
throw ex;
}
}

@Nullable
protected Object performReceive(Long timeout) {
if (!this.declared) {
doDeclares();
Expand Down Expand Up @@ -289,17 +254,63 @@ protected Object performReceive(Long timeout) {
}
}

private Message<?> buildMessageFromResult(@Nullable Object object, boolean traceEnabled,
@Nullable AtomicBoolean counted) {

Message<?> message = null;
if (object != null) {
if (counted != null) {
incrementReceiveCounter();
getMetrics().afterReceive();
counted.set(true);
}
if (object instanceof Message<?>) {
message = (Message<?>) object;
}
else {
message = getMessageBuilderFactory()
.withPayload(object)
.build();
}
}

if (traceEnabled) {
logger.trace("postReceive on channel '" + this
+ "', message" + (message != null ? ": " + message : " is null"));
}

return message;
}

private void incrementReceiveCounter() {
if (this.receiveCounter == null) {
this.receiveCounter = getMetricsCaptor().counterBuilder(RECEIVE_COUNTER_NAME)
.tag("name", getComponentName())
.tag("type", "channel")
.tag("result", "success")
.tag("exception", "none")
.description("Messages received")
.build();
MetricsCaptor metricsCaptor = getMetricsCaptor();
if (metricsCaptor != null) {
if (this.receiveCounter == null) {
this.receiveCounter = buildReceiveCounter(metricsCaptor, null);
}
this.receiveCounter.increment();
}
this.receiveCounter.increment();
}

private void incrementReceiveErrorCounter(Exception ex) {
MetricsCaptor metricsCaptor = getMetricsCaptor();
if (metricsCaptor != null) {
buildReceiveCounter(metricsCaptor, ex).increment();
}
getMetrics().afterError();
}

private CounterFacade buildReceiveCounter(MetricsCaptor metricsCaptor, @Nullable Exception ex) {
CounterFacade counterFacade = metricsCaptor
.counterBuilder(RECEIVE_COUNTER_NAME)
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
.tag("type", "channel")
.tag("result", ex == null ? "success" : "failure")
.tag("exception", ex == null ? "none" : ex.getClass().getSimpleName())
.description("Messages received")
.build();
this.meters.add(counterFacade);
return counterFacade;
}


Expand Down Expand Up @@ -339,6 +350,7 @@ public boolean removeInterceptor(ChannelInterceptor interceptor) {
}

@Override
@Nullable
public ChannelInterceptor removeInterceptor(int index) {
ChannelInterceptor interceptor = super.removeInterceptor(index);
if (interceptor instanceof ExecutorChannelInterceptor) {
Expand All @@ -353,7 +365,7 @@ public boolean hasExecutorInterceptors() {
}

@Override
public void destroy() throws Exception {
public void destroy() {
super.destroy();
if (this.receiveCounter != null) {
this.receiveCounter.remove();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,10 @@ protected ReleaseStrategy getReleaseStrategy() {

@Override
public MessageChannel getDiscardChannel() {
if (this.discardChannelName != null) {
synchronized (this) {
if (this.discardChannelName != null) {
this.discardChannel = getChannelResolver().resolveDestination(this.discardChannelName);
this.discardChannelName = null;
}
}
String channelName = this.discardChannelName;
if (channelName != null) {
this.discardChannel = getChannelResolver().resolveDestination(channelName);
this.discardChannelName = null;
}
return this.discardChannel;
}
Expand Down Expand Up @@ -449,52 +446,62 @@ protected void handleMessageInternal(Message<?> message) throws InterruptedExcep
boolean noOutput = true;
lock.lockInterruptibly();
try {
ScheduledFuture<?> scheduledFuture = this.expireGroupScheduledFutures.remove(groupIdUuid);
if (scheduledFuture != null) {
boolean canceled = scheduledFuture.cancel(true);
if (canceled && this.logger.isDebugEnabled()) {
this.logger.debug("Cancel 'ScheduledFuture' for MessageGroup with Correlation Key [ "
+ correlationKey + "].");
}
noOutput = processMessageForGroup(message, correlationKey, groupIdUuid, lock);
}
finally {
if (noOutput || !this.releaseLockBeforeSend) {
lock.unlock();
}
MessageGroup messageGroup = this.messageStore.getMessageGroup(correlationKey);
if (this.sequenceAware) {
messageGroup = new SequenceAwareMessageGroup(messageGroup);
}
}

private boolean processMessageForGroup(Message<?> message, Object correlationKey, UUID groupIdUuid, Lock lock) {
boolean noOutput = true;
cancelScheduledFutureIfAny(correlationKey, groupIdUuid, true);
MessageGroup messageGroup = this.messageStore.getMessageGroup(correlationKey);
if (this.sequenceAware) {
messageGroup = new SequenceAwareMessageGroup(messageGroup);
}

if (!messageGroup.isComplete() && messageGroup.canAdd(message)) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Adding message to group [ " + messageGroup + "]");
}
messageGroup = store(correlationKey, message);

if (!messageGroup.isComplete() && messageGroup.canAdd(message)) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Adding message to group [ " + messageGroup + "]");
if (this.releaseStrategy.canRelease(messageGroup)) {
Collection<Message<?>> completedMessages = null;
try {
noOutput = false;
completedMessages = completeGroup(message, correlationKey, messageGroup, lock);
}
messageGroup = this.store(correlationKey, message);

if (this.releaseStrategy.canRelease(messageGroup)) {
Collection<Message<?>> completedMessages = null;
try {
noOutput = false;
completedMessages = completeGroup(message, correlationKey, messageGroup, lock);
}
finally {
// Possible clean (implementation dependency) up
// even if there was an exception processing messages
afterRelease(messageGroup, completedMessages);
}
if (!isExpireGroupsUponCompletion() && this.minimumTimeoutForEmptyGroups > 0) {
removeEmptyGroupAfterTimeout(messageGroup, this.minimumTimeoutForEmptyGroups);
}
finally {
// Possible clean (implementation dependency) up
// even if there was an exception processing messages
afterRelease(messageGroup, completedMessages);
}
else {
scheduleGroupToForceComplete(messageGroup);
if (!isExpireGroupsUponCompletion() && this.minimumTimeoutForEmptyGroups > 0) {
removeEmptyGroupAfterTimeout(messageGroup, this.minimumTimeoutForEmptyGroups);
}
}
else {
noOutput = false;
discardMessage(message, lock);
scheduleGroupToForceComplete(messageGroup);
}
}
finally {
if (noOutput || !this.releaseLockBeforeSend) {
lock.unlock();
else {
noOutput = false;
discardMessage(message, lock);
}
return noOutput;
}

private void cancelScheduledFutureIfAny(Object correlationKey, UUID groupIdUuid, boolean mayInterruptIfRunning) {
ScheduledFuture<?> scheduledFuture = this.expireGroupScheduledFutures.remove(groupIdUuid);
if (scheduledFuture != null) {
boolean canceled = scheduledFuture.cancel(mayInterruptIfRunning);
if (canceled && this.logger.isDebugEnabled()) {
this.logger.debug("Cancel 'ScheduledFuture' for MessageGroup with Correlation Key [ "
+ correlationKey + "].");
}
}
}
Expand Down Expand Up @@ -606,7 +613,10 @@ private void discardMessage(Message<?> message, Lock lock) {
}

private void discardMessage(Message<?> message) {
this.messagingTemplate.send(getDiscardChannel(), message);
MessageChannel messageChannel = getDiscardChannel();
if (messageChannel != null) {
this.messagingTemplate.send(messageChannel, message);
}
}

/**
Expand All @@ -630,20 +640,14 @@ protected void afterRelease(MessageGroup group, Collection<Message<?>> completed
protected void forceComplete(MessageGroup group) {
Object correlationKey = group.getGroupId();
// UUIDConverter is no-op if already converted
Lock lock = this.lockRegistry.obtain(UUIDConverter.getUUID(correlationKey).toString());
UUID groupId = UUIDConverter.getUUID(correlationKey);
Lock lock = this.lockRegistry.obtain(groupId.toString());
boolean removeGroup = true;
boolean noOutput = true;
try {
lock.lockInterruptibly();
try {
ScheduledFuture<?> scheduledFuture =
this.expireGroupScheduledFutures.remove(UUIDConverter.getUUID(correlationKey));
if (scheduledFuture != null) {
boolean canceled = scheduledFuture.cancel(false);
if (canceled && this.logger.isDebugEnabled()) {
this.logger.debug("Cancel 'forceComplete' scheduling for MessageGroup [ " + group + "].");
}
}
cancelScheduledFutureIfAny(correlationKey, groupId, false);
MessageGroup groupNow = group;
/*
* If the group argument is not already complete,
Expand Down
Loading