Skip to content

Commit 2d6984b

Browse files
committed
Revert "INT-4366: Fix MulticastSendingMH
This reverts commit c3b64dc.
1 parent c3b64dc commit 2d6984b

13 files changed

+235
-261
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/MulticastSendingMessageHandler.java

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2001-2018 the original author or authors.
2+
* Copyright 2001-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,8 +38,6 @@
3838
* determine success.
3939
*
4040
* @author Gary Russell
41-
* @author Artem Bilan
42-
*
4341
* @since 2.0
4442
*/
4543
public class MulticastSendingMessageHandler extends UnicastSendingMessageHandler {
@@ -128,45 +126,49 @@ public MulticastSendingMessageHandler(String destinationExpression) {
128126

129127
@Override
130128
protected DatagramSocket getSocket() throws IOException {
131-
if (this.multicastSocket == null) {
129+
if (this.getTheSocket() == null) {
132130
synchronized (this) {
133-
if (this.multicastSocket == null) {
134-
createSocket();
135-
}
131+
createSocket();
136132
}
137133
}
138-
return getTheSocket();
134+
return this.getTheSocket();
139135
}
140136

141137
private void createSocket() throws IOException {
142-
MulticastSocket socket;
143-
if (isAcknowledge()) {
144-
int ackPort = getAckPort();
145-
if (this.localAddress == null) {
146-
socket = ackPort == 0 ? new MulticastSocket() : new MulticastSocket(ackPort);
138+
if (this.getTheSocket() == null) {
139+
MulticastSocket socket;
140+
if (this.isAcknowledge()) {
141+
int ackPort = this.getAckPort();
142+
if (this.localAddress == null) {
143+
socket = ackPort == 0 ? new MulticastSocket() : new MulticastSocket(ackPort);
144+
}
145+
else {
146+
InetAddress whichNic = InetAddress.getByName(this.localAddress);
147+
socket = new MulticastSocket(new InetSocketAddress(whichNic, ackPort));
148+
}
149+
if (getSoReceiveBufferSize() > 0) {
150+
socket.setReceiveBufferSize(this.getSoReceiveBufferSize());
151+
}
152+
if (logger.isDebugEnabled()) {
153+
logger.debug("Listening for acks on port: " + socket.getLocalPort());
154+
}
155+
setSocket(socket);
156+
updateAckAddress();
147157
}
148158
else {
149-
InetAddress whichNic = InetAddress.getByName(this.localAddress);
150-
socket = new MulticastSocket(new InetSocketAddress(whichNic, ackPort));
159+
socket = new MulticastSocket();
160+
setSocket(socket);
151161
}
152-
if (getSoReceiveBufferSize() > 0) {
153-
socket.setReceiveBufferSize(getSoReceiveBufferSize());
162+
if (this.timeToLive >= 0) {
163+
socket.setTimeToLive(this.timeToLive);
154164
}
155-
if (logger.isDebugEnabled()) {
156-
logger.debug("Listening for acks on port: " + socket.getLocalPort());
165+
setSocketAttributes(socket);
166+
if (this.localAddress != null) {
167+
InetAddress whichNic = InetAddress.getByName(this.localAddress);
168+
socket.setInterface(whichNic);
157169
}
158-
setSocket(socket);
159-
updateAckAddress();
160-
}
161-
else {
162-
socket = new MulticastSocket();
163-
setSocket(socket);
164-
}
165-
if (this.timeToLive >= 0) {
166-
socket.setTimeToLive(this.timeToLive);
170+
this.multicastSocket = socket;
167171
}
168-
setSocketAttributes(socket);
169-
this.multicastSocket = socket;
170172
}
171173

172174

@@ -176,7 +178,7 @@ private void createSocket() throws IOException {
176178
* @param minAcksForSuccess The minimum number of acks that will represent success.
177179
*/
178180
public void setMinAcksForSuccess(int minAcksForSuccess) {
179-
setAckCounter(minAcksForSuccess);
181+
this.setAckCounter(minAcksForSuccess);
180182
}
181183

182184
/**

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpInboundGatewayTests.java

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
import java.util.HashSet;
2929
import java.util.Set;
3030
import java.util.concurrent.CountDownLatch;
31+
import java.util.concurrent.Executors;
3132
import java.util.concurrent.TimeUnit;
3233
import java.util.concurrent.atomic.AtomicBoolean;
3334
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,7 +39,6 @@
3839
import org.junit.Test;
3940

4041
import org.springframework.beans.factory.BeanFactory;
41-
import org.springframework.core.task.SimpleAsyncTaskExecutor;
4242
import org.springframework.integration.channel.DirectChannel;
4343
import org.springframework.integration.channel.QueueChannel;
4444
import org.springframework.integration.handler.ServiceActivatingHandler;
@@ -56,8 +56,6 @@
5656

5757
/**
5858
* @author Gary Russell
59-
* @author Artem Bilan
60-
*
6159
* @since 2.0
6260
*/
6361
public class TcpInboundGatewayTests {
@@ -121,31 +119,30 @@ public void testNetClientMode() throws Exception {
121119
final CountDownLatch latch2 = new CountDownLatch(1);
122120
final CountDownLatch latch3 = new CountDownLatch(1);
123121
final AtomicBoolean done = new AtomicBoolean();
124-
new SimpleAsyncTaskExecutor()
125-
.execute(() -> {
126-
try {
127-
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0, 10);
128-
port.set(server.getLocalPort());
129-
latch1.countDown();
130-
Socket socket = server.accept();
131-
socket.getOutputStream().write("Test1\r\nTest2\r\n".getBytes());
132-
byte[] bytes = new byte[12];
133-
readFully(socket.getInputStream(), bytes);
134-
assertEquals("Echo:Test1\r\n", new String(bytes));
135-
readFully(socket.getInputStream(), bytes);
136-
assertEquals("Echo:Test2\r\n", new String(bytes));
137-
latch2.await();
138-
socket.close();
139-
server.close();
140-
done.set(true);
141-
latch3.countDown();
142-
}
143-
catch (Exception e) {
144-
if (!done.get()) {
145-
e.printStackTrace();
146-
}
147-
}
148-
});
122+
Executors.newSingleThreadExecutor().execute(() -> {
123+
try {
124+
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0, 10);
125+
port.set(server.getLocalPort());
126+
latch1.countDown();
127+
Socket socket = server.accept();
128+
socket.getOutputStream().write("Test1\r\nTest2\r\n".getBytes());
129+
byte[] bytes = new byte[12];
130+
readFully(socket.getInputStream(), bytes);
131+
assertEquals("Echo:Test1\r\n", new String(bytes));
132+
readFully(socket.getInputStream(), bytes);
133+
assertEquals("Echo:Test2\r\n", new String(bytes));
134+
latch2.await();
135+
socket.close();
136+
server.close();
137+
done.set(true);
138+
latch3.countDown();
139+
}
140+
catch (Exception e) {
141+
if (!done.get()) {
142+
e.printStackTrace();
143+
}
144+
}
145+
});
149146
assertTrue(latch1.await(10, TimeUnit.SECONDS));
150147
AbstractClientConnectionFactory ccf = new TcpNetClientConnectionFactory("localhost", port.get());
151148
ccf.setSingleUse(false);

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpOutboundGatewayTests.java

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.Set;
4444
import java.util.concurrent.CountDownLatch;
4545
import java.util.concurrent.ExecutionException;
46+
import java.util.concurrent.Executors;
4647
import java.util.concurrent.Future;
4748
import java.util.concurrent.TimeUnit;
4849
import java.util.concurrent.atomic.AtomicBoolean;
@@ -60,8 +61,6 @@
6061
import org.springframework.beans.factory.BeanFactory;
6162
import org.springframework.core.serializer.DefaultDeserializer;
6263
import org.springframework.core.serializer.DefaultSerializer;
63-
import org.springframework.core.task.AsyncTaskExecutor;
64-
import org.springframework.core.task.SimpleAsyncTaskExecutor;
6564
import org.springframework.expression.EvaluationContext;
6665
import org.springframework.expression.Expression;
6766
import org.springframework.expression.spel.standard.SpelExpressionParser;
@@ -91,8 +90,6 @@ public class TcpOutboundGatewayTests {
9190

9291
private static final Log logger = LogFactory.getLog(TcpOutboundGatewayTests.class);
9392

94-
private AsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
95-
9693
@ClassRule
9794
public static LongRunningIntegrationTest longTests = new LongRunningIntegrationTest();
9895

@@ -104,13 +101,13 @@ public class TcpOutboundGatewayTests {
104101
public void testGoodNetSingle() throws Exception {
105102
final CountDownLatch latch = new CountDownLatch(1);
106103
final AtomicBoolean done = new AtomicBoolean();
107-
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<>();
108-
this.executor.execute(() -> {
104+
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<ServerSocket>();
105+
Executors.newSingleThreadExecutor().execute(() -> {
109106
try {
110107
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0, 100);
111108
serverSocket.set(server);
112109
latch.countDown();
113-
List<Socket> sockets = new ArrayList<>();
110+
List<Socket> sockets = new ArrayList<Socket>();
114111
int i = 0;
115112
while (true) {
116113
Socket socket = server.accept();
@@ -168,8 +165,8 @@ public void testGoodNetSingle() throws Exception {
168165
public void testGoodNetMultiplex() throws Exception {
169166
final CountDownLatch latch = new CountDownLatch(1);
170167
final AtomicBoolean done = new AtomicBoolean();
171-
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<>();
172-
this.executor.execute(() -> {
168+
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<ServerSocket>();
169+
Executors.newSingleThreadExecutor().execute(() -> {
173170
try {
174171
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0, 10);
175172
serverSocket.set(server);
@@ -223,8 +220,8 @@ public void testGoodNetMultiplex() throws Exception {
223220
public void testGoodNetTimeout() throws Exception {
224221
final CountDownLatch latch = new CountDownLatch(1);
225222
final AtomicBoolean done = new AtomicBoolean();
226-
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<>();
227-
this.executor.execute(() -> {
223+
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<ServerSocket>();
224+
Executors.newSingleThreadExecutor().execute(() -> {
228225
try {
229226
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0);
230227
serverSocket.set(server);
@@ -263,12 +260,12 @@ public void testGoodNetTimeout() throws Exception {
263260
Future<Integer>[] results = (Future<Integer>[]) new Future<?>[2];
264261
for (int i = 0; i < 2; i++) {
265262
final int j = i;
266-
results[j] = (this.executor.submit(() -> {
263+
results[j] = (Executors.newSingleThreadExecutor().submit(() -> {
267264
gateway.handleMessage(MessageBuilder.withPayload("Test" + j).build());
268265
return 0;
269266
}));
270267
}
271-
Set<String> replies = new HashSet<>();
268+
Set<String> replies = new HashSet<String>();
272269
int timeouts = 0;
273270
for (int i = 0; i < 2; i++) {
274271
try {
@@ -347,7 +344,7 @@ private void testGoodNetGWTimeoutGuts(final int port, AbstractClientConnectionFa
347344
final AtomicReference<String> lastReceived = new AtomicReference<String>();
348345
final CountDownLatch serverLatch = new CountDownLatch(2);
349346

350-
this.executor.execute(() -> {
347+
Executors.newSingleThreadExecutor().execute(() -> {
351348
try {
352349
latch.countDown();
353350
int i = 0;
@@ -401,7 +398,7 @@ private void testGoodNetGWTimeoutGuts(final int port, AbstractClientConnectionFa
401398

402399
for (int i = 0; i < 2; i++) {
403400
final int j = i;
404-
results[j] = (this.executor.submit(() -> {
401+
results[j] = (Executors.newSingleThreadExecutor().submit(() -> {
405402
gateway.handleMessage(MessageBuilder.withPayload("Test" + j).build());
406403
return j;
407404
}));
@@ -445,7 +442,7 @@ public void testCachingFailover() throws Exception {
445442
final AtomicBoolean done = new AtomicBoolean();
446443
final CountDownLatch serverLatch = new CountDownLatch(1);
447444

448-
this.executor.execute(() -> {
445+
Executors.newSingleThreadExecutor().execute(() -> {
449446
try {
450447
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0);
451448
serverSocket.set(server);
@@ -520,12 +517,12 @@ public void testCachingFailover() throws Exception {
520517

521518
@Test
522519
public void testFailoverCached() throws Exception {
523-
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<>();
520+
final AtomicReference<ServerSocket> serverSocket = new AtomicReference<ServerSocket>();
524521
final CountDownLatch latch = new CountDownLatch(1);
525522
final AtomicBoolean done = new AtomicBoolean();
526523
final CountDownLatch serverLatch = new CountDownLatch(1);
527524

528-
this.executor.execute(() -> {
525+
Executors.newSingleThreadExecutor().execute(() -> {
529526
try {
530527
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0);
531528
serverSocket.set(server);
@@ -670,11 +667,11 @@ private void testGWPropagatesSocketCloseGuts(final int port, AbstractClientConne
670667
final ServerSocket server) throws Exception {
671668
final CountDownLatch latch = new CountDownLatch(1);
672669
final AtomicBoolean done = new AtomicBoolean();
673-
final AtomicReference<String> lastReceived = new AtomicReference<>();
670+
final AtomicReference<String> lastReceived = new AtomicReference<String>();
674671
final CountDownLatch serverLatch = new CountDownLatch(1);
675672

676-
this.executor.execute(() -> {
677-
List<Socket> sockets = new ArrayList<>();
673+
Executors.newSingleThreadExecutor().execute(() -> {
674+
List<Socket> sockets = new ArrayList<Socket>();
678675
try {
679676
latch.countDown();
680677
while (!done.get()) {
@@ -796,8 +793,8 @@ private void testGWPropagatesSocketTimeoutGuts(final int port, AbstractClientCon
796793
final CountDownLatch latch = new CountDownLatch(1);
797794
final AtomicBoolean done = new AtomicBoolean();
798795

799-
this.executor.execute(() -> {
800-
List<Socket> sockets = new ArrayList<>();
796+
Executors.newSingleThreadExecutor().execute(() -> {
797+
List<Socket> sockets = new ArrayList<Socket>();
801798
try {
802799
latch.countDown();
803800
while (!done.get()) {

0 commit comments

Comments
 (0)