Skip to content

Commit 01b0e1c

Browse files
committed
INT-4366: Fix MulticastSendingMH race condition
JIRA: https://jira.spring.io/browse/INT-4366 The `MulticastSendingMessageHandler.getSocket()` doesn't guard around `this.multicastSocket` property causing `NPE` and other inconsistency in the multi-threaded environment * Make the whole `MulticastSendingMessageHandler.getSocket()` as `synchronized` like it is with the super method * Reuse `closeSocketIfNeeded()` in the `UnicastSendingMessageHandler.handleMessageInternal()` * Fix type in the `UnicastSendingMessageHandler` logging message * Fix `UdpChannelAdapterTests` for missed `BeanFactory` for the SpEL and also `MulticastSendingMessageHandler.stop()` in one missed places **Cherry-pick to 4.3.x**
1 parent 170292a commit 01b0e1c

File tree

3 files changed

+17
-19
lines changed

3 files changed

+17
-19
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/MulticastSendingMessageHandler.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,11 @@ public MulticastSendingMessageHandler(String destinationExpression) {
125125
}
126126

127127
@Override
128-
protected DatagramSocket getSocket() throws IOException {
129-
if (this.getTheSocket() == null) {
130-
synchronized (this) {
131-
createSocket();
132-
}
128+
protected synchronized DatagramSocket getSocket() throws IOException {
129+
if (getTheSocket() == null) {
130+
createSocket();
133131
}
134-
return this.getTheSocket();
132+
return getTheSocket();
135133
}
136134

137135
private void createSocket() throws IOException {
@@ -174,7 +172,6 @@ private void createSocket() throws IOException {
174172

175173
/**
176174
* If acknowledge = true; how many acks needed for success.
177-
*
178175
* @param minAcksForSuccess The minimum number of acks that will represent success.
179176
*/
180177
public void setMinAcksForSuccess(int minAcksForSuccess) {
@@ -183,7 +180,6 @@ public void setMinAcksForSuccess(int minAcksForSuccess) {
183180

184181
/**
185182
* Set the underlying {@link MulticastSocket} time to live property.
186-
*
187183
* @param timeToLive {@link MulticastSocket#setTimeToLive(int)}
188184
*/
189185
public void setTimeToLive(int timeToLive) {

spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastSendingMessageHandler.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2001-2017 the original author or authors.
2+
* Copyright 2001-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -83,8 +83,8 @@ public class UnicastSendingMessageHandler extends
8383

8484
private volatile int ackCounter = 1;
8585

86-
private volatile Map<String, CountDownLatch> ackControl = Collections
87-
.synchronizedMap(new HashMap<String, CountDownLatch>());
86+
private volatile Map<String, CountDownLatch> ackControl =
87+
Collections.synchronizedMap(new HashMap<String, CountDownLatch>());
8888

8989
private volatile int soReceiveBufferSize = -1;
9090

@@ -282,12 +282,7 @@ public void handleMessageInternal(Message<?> message) throws MessageHandlingExce
282282
throw e;
283283
}
284284
catch (Exception e) {
285-
try {
286-
this.socket.close();
287-
}
288-
catch (Exception e1) {
289-
}
290-
this.socket = null;
285+
closeSocketIfNeeded();
291286
throw new MessageHandlingException(message, "failed to send UDP packet", e);
292287
}
293288
finally {
@@ -512,7 +507,7 @@ public void run() {
512507
}
513508
catch (IOException e) {
514509
if (this.socket != null && !this.socket.isClosed()) {
515-
logger.error("Error on UDP Acknowledge thread:" + e.getMessage());
510+
logger.error("Error on UDP Acknowledge thread: " + e.getMessage());
516511
}
517512
}
518513
finally {
@@ -530,7 +525,11 @@ public void restartAckThread() {
530525

531526
private void closeSocketIfNeeded() {
532527
if (this.socket != null) {
533-
this.socket.close();
528+
try {
529+
this.socket.close();
530+
}
531+
catch (Exception e) {
532+
}
534533
this.socket = null;
535534
}
536535
}

spring-integration-ip/src/test/java/org/springframework/integration/ip/udp/UdpChannelAdapterTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ public void testMulticastSender() throws Exception {
294294
assertNotNull(receivedMessage);
295295
assertEquals(new String(message.getPayload()), new String(receivedMessage.getPayload()));
296296
adapter.stop();
297+
handler.stop();
297298
}
298299

299300
@Test
@@ -302,6 +303,8 @@ public void testUnicastReceiverException() throws Exception {
302303
UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(0);
303304
adapter.setOutputChannel(channel);
304305
ServiceActivatingHandler handler = new ServiceActivatingHandler(new FailingService());
306+
handler.setBeanFactory(mock(BeanFactory.class));
307+
handler.afterPropertiesSet();
305308
channel.subscribe(handler);
306309
QueueChannel errorChannel = new QueueChannel();
307310
adapter.setErrorChannel(errorChannel);

0 commit comments

Comments
 (0)