Skip to content

Commit 466daa8

Browse files
garyrussellartembilan
authored andcommitted
TCP: Connect Timeout; Close Stream
- Add `connectTimeout` to client connection factories - Add `closeStreamAfterSend` to outbound gateway * Polishing - PR Comments.
1 parent cd07572 commit 466daa8

22 files changed

+417
-51
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/config/IpAdapterParserUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ public abstract class IpAdapterParserUtils {
131131

132132
public static final String SSL_HANDSHAKE_TIMEOUT = "ssl-handshake-timeout";
133133

134+
public static final String CONNECT_TIMEOUT = "connect-timeout";
135+
134136
private IpAdapterParserUtils() {
135137
}
136138

spring-integration-ip/src/main/java/org/springframework/integration/ip/config/TcpConnectionFactoryFactoryBean.java

Lines changed: 43 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -59,73 +59,75 @@
5959
public class TcpConnectionFactoryFactoryBean extends AbstractFactoryBean<AbstractConnectionFactory>
6060
implements Lifecycle, BeanNameAware, ApplicationEventPublisherAware {
6161

62-
private volatile AbstractConnectionFactory connectionFactory;
62+
private AbstractConnectionFactory connectionFactory;
6363

64-
private volatile String type;
64+
private String type;
6565

66-
private volatile String host;
66+
private String host;
6767

68-
private volatile int port;
68+
private int port;
6969

70-
private volatile int soTimeout;
70+
private int soTimeout;
7171

72-
private volatile int soSendBufferSize;
72+
private int soSendBufferSize;
7373

74-
private volatile int soReceiveBufferSize;
74+
private int soReceiveBufferSize;
7575

76-
private volatile boolean soTcpNoDelay;
76+
private boolean soTcpNoDelay;
7777

78-
private volatile int soLinger = -1; // don't set by default
78+
private int soLinger = -1; // don't set by default
7979

80-
private volatile boolean soKeepAlive;
80+
private boolean soKeepAlive;
8181

82-
private volatile int soTrafficClass = -1; // don't set by default
82+
private int soTrafficClass = -1; // don't set by default
8383

84-
private volatile Executor taskExecutor;
84+
private Executor taskExecutor;
8585

86-
private volatile Deserializer<?> deserializer = new ByteArrayCrLfSerializer();
86+
private Deserializer<?> deserializer = new ByteArrayCrLfSerializer();
8787

88-
private volatile Serializer<?> serializer = new ByteArrayCrLfSerializer();
88+
private Serializer<?> serializer = new ByteArrayCrLfSerializer();
8989

90-
private volatile TcpMessageMapper mapper = new TcpMessageMapper();
90+
private TcpMessageMapper mapper = new TcpMessageMapper();
9191

92-
private volatile boolean mapperSet;
92+
private boolean mapperSet;
9393

94-
private volatile boolean singleUse;
94+
private boolean singleUse;
9595

96-
private volatile int backlog = 5;
96+
private int backlog = 5;
9797

98-
private volatile TcpConnectionInterceptorFactoryChain interceptorFactoryChain;
98+
private TcpConnectionInterceptorFactoryChain interceptorFactoryChain;
9999

100-
private volatile boolean lookupHost = true;
100+
private boolean lookupHost = true;
101101

102-
private volatile String localAddress;
102+
private String localAddress;
103103

104-
private volatile boolean usingNio;
104+
private boolean usingNio;
105105

106-
private volatile boolean usingDirectBuffers;
106+
private boolean usingDirectBuffers;
107107

108-
private volatile String beanName;
108+
private String beanName;
109109

110-
private volatile boolean applySequence;
110+
private boolean applySequence;
111111

112-
private volatile Long readDelay;
112+
private Long readDelay;
113113

114-
private volatile TcpSSLContextSupport sslContextSupport;
114+
private TcpSSLContextSupport sslContextSupport;
115115

116-
private volatile Integer sslHandshakeTimeout;
116+
private Integer sslHandshakeTimeout;
117117

118-
private volatile TcpSocketSupport socketSupport = new DefaultTcpSocketSupport();
118+
private TcpSocketSupport socketSupport = new DefaultTcpSocketSupport();
119119

120-
private volatile TcpNioConnectionSupport nioConnectionSupport;
120+
private TcpNioConnectionSupport nioConnectionSupport;
121121

122-
private volatile TcpNetConnectionSupport netConnectionSupport;
122+
private TcpNetConnectionSupport netConnectionSupport;
123123

124-
private volatile TcpSocketFactorySupport socketFactorySupport;
124+
private TcpSocketFactorySupport socketFactorySupport;
125125

126-
private volatile ApplicationEventPublisher applicationEventPublisher;
126+
private ApplicationEventPublisher applicationEventPublisher;
127127

128-
private volatile BeanFactory beanFactory;
128+
private BeanFactory beanFactory;
129+
130+
private Integer connectTimeout;
129131

130132

131133
public TcpConnectionFactoryFactoryBean() {
@@ -189,6 +191,9 @@ protected AbstractConnectionFactory createInstance() {
189191
this.setCommonAttributes(factory);
190192
factory.setTcpSocketFactorySupport(this.obtainSocketFactorySupport());
191193
factory.setTcpNetConnectionSupport(this.obtainNetConnectionSupport());
194+
if (this.connectTimeout != null) {
195+
factory.setConnectTimeout(this.connectTimeout);
196+
}
192197
this.connectionFactory = factory;
193198
}
194199
}
@@ -501,6 +506,10 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
501506
this.applicationEventPublisher = applicationEventPublisher;
502507
}
503508

509+
public void setConnectTimeout(int connectTimeout) {
510+
this.connectTimeout = connectTimeout;
511+
}
512+
504513
/**
505514
* Set the SSL handshake timeout (only used with SSL and NIO).
506515
* @param sslHandshakeTimeout the timeout.

spring-integration-ip/src/main/java/org/springframework/integration/ip/config/TcpConnectionFactoryParser.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ else if (!"server".equals(type) && !"client".equals(type)) {
100100
IpAdapterParserUtils.SSL_HANDSHAKE_TIMEOUT);
101101
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element,
102102
IpAdapterParserUtils.READ_DELAY);
103+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element,
104+
IpAdapterParserUtils.CONNECT_TIMEOUT);
103105

104106
return builder.getBeanDefinition();
105107
}

spring-integration-ip/src/main/java/org/springframework/integration/ip/config/TcpOutboundGatewayParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars
5555
}
5656
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element,
5757
IpAdapterParserUtils.REPLY_TIMEOUT, "sendTimeout");
58+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "close-stream-after-send");
5859
return builder;
5960
}
6061

spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractConnectionFactorySpec.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,4 +219,15 @@ public S tcpSocketSupport(TcpSocketSupport tcpSocketSupport) {
219219
return _this();
220220
}
221221

222+
/**
223+
* This connection factory uses a new connection for each operation.
224+
* @param single true for a new connection for each operation.
225+
* @return the spec.
226+
* @since 5.2
227+
*/
228+
public S singleUseConnections(boolean single) {
229+
this.target.setSingleUse(single);
230+
return _this();
231+
}
232+
222233
}

spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpClientConnectionFactorySpec.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,15 @@ public class TcpClientConnectionFactorySpec
3838
super(nio ? new TcpNioClientConnectionFactory(host, port) : new TcpNetClientConnectionFactory(host, port));
3939
}
4040

41+
/**
42+
* Set the connection timeout in seconds. Defaults to 60.
43+
* @param connectTimeout the timeout.
44+
* @return the spec.
45+
* @since 5.2
46+
*/
47+
public TcpClientConnectionFactorySpec connectTimeout(int connectTimeout) {
48+
this.target.setConnectTimeout(connectTimeout);
49+
return _this();
50+
}
51+
4152
}

spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,20 @@ public <P> TcpOutboundGatewaySpec remoteTimeout(Function<Message<P>, ?> remoteTi
8989
return _this();
9090
}
9191

92+
/**
93+
* Set to true to close the connection ouput stream after sending without
94+
* closing the connection. Use to signal EOF to the server, such as when using
95+
* a {@link org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer}.
96+
* Requires a single-use connection factory.
97+
* @param closeStreamAfterSend true to close.
98+
* @return the spec.
99+
* @since 5.2
100+
*/
101+
public TcpOutboundGatewaySpec closeStreamAfterSend(boolean closeStreamAfterSend) {
102+
this.target.setCloseStreamAfterSend(closeStreamAfterSend);
103+
return _this();
104+
}
105+
92106
@Override
93107
public Map<Object, String> getComponentsToRegister() {
94108
return this.connectionFactory != null

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpOutboundGateway.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.ip.tcp;
1818

19+
import java.io.IOException;
1920
import java.util.Map;
2021
import java.util.concurrent.ConcurrentHashMap;
2122
import java.util.concurrent.CountDownLatch;
@@ -84,6 +85,8 @@ public class TcpOutboundGateway extends AbstractReplyProducingMessageHandler
8485

8586
private int secondChanceDelay = DEFAULT_SECOND_CHANCE_DELAY;
8687

88+
private boolean closeStreamAfterSend;
89+
8790
/**
8891
* @param requestTimeout the requestTimeout to set
8992
*/
@@ -117,6 +120,8 @@ protected void doInit() {
117120
if (!this.evaluationContextSet) {
118121
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
119122
}
123+
Assert.state(!this.closeStreamAfterSend || this.isSingleUse,
124+
"Single use connection needed with closeStreamAfterSend");
120125
}
121126

122127
/**
@@ -149,9 +154,12 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
149154
logger.debug("Added pending reply " + connectionId);
150155
}
151156
connection.send(requestMessage);
157+
if (this.closeStreamAfterSend) {
158+
connection.shutdownOutput();
159+
}
152160
return getReply(requestMessage, connection, connectionId, reply);
153161
}
154-
catch (RuntimeException e) {
162+
catch (RuntimeException | IOException e) {
155163
logger.error("Tcp Gateway exception", e);
156164
if (e instanceof MessagingException) {
157165
throw (MessagingException) e;
@@ -305,6 +313,18 @@ public void setReplyChannelName(String replyChannel) {
305313
this.setOutputChannelName(replyChannel);
306314
}
307315

316+
/**
317+
* Set to true to close the connection ouput stream after sending without
318+
* closing the connection. Use to signal EOF to the server, such as when using
319+
* a {@link org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer}.
320+
* Requires a single-use connection factory.
321+
* @param closeStreamAfterSend true to close.
322+
* @since 5.2
323+
*/
324+
public void setCloseStreamAfterSend(boolean closeStreamAfterSend) {
325+
this.closeStreamAfterSend = closeStreamAfterSend;
326+
}
327+
308328
@Override
309329
public String getComponentType() {
310330
return "ip:tcp-outbound-gateway";

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractClientConnectionFactory.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.ip.tcp.connection;
1818

1919
import java.net.Socket;
20+
import java.time.Duration;
2021
import java.util.concurrent.locks.ReadWriteLock;
2122
import java.util.concurrent.locks.ReentrantReadWriteLock;
2223

@@ -33,11 +34,15 @@
3334
*/
3435
public abstract class AbstractClientConnectionFactory extends AbstractConnectionFactory {
3536

37+
private static final long DEFAULT_CONNECT_TIMEOUT = 60L;
38+
3639
private final ReadWriteLock theConnectionLock = new ReentrantReadWriteLock();
3740

38-
private volatile TcpConnectionSupport theConnection;
41+
private boolean manualListenerRegistration;
3942

40-
private volatile boolean manualListenerRegistration;
43+
private Duration connectTimeout = Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT);
44+
45+
private volatile TcpConnectionSupport theConnection;
4146

4247
/**
4348
* Constructs a factory that will established connections to the host and port.
@@ -48,6 +53,19 @@ public AbstractClientConnectionFactory(String host, int port) {
4853
super(host, port);
4954
}
5055

56+
/**
57+
* Set the connection timeout in seconds. Defaults to 60.
58+
* @param connectTimeout the timeout.
59+
* @since 5.2
60+
*/
61+
public void setConnectTimeout(int connectTimeout) {
62+
this.connectTimeout = Duration.ofSeconds(connectTimeout);
63+
}
64+
65+
protected Duration getConnectTimeout() {
66+
return this.connectTimeout;
67+
}
68+
5169
/**
5270
* Set whether to automatically (default) or manually add a {@link TcpListener} to the
5371
* connections created by this factory. By default, the factory automatically configures

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnection.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.ip.tcp.connection;
1818

19+
import java.io.IOException;
20+
1921
import javax.net.ssl.SSLSession;
2022

2123
import org.springframework.core.serializer.Deserializer;
@@ -132,4 +134,24 @@ public interface TcpConnection extends Runnable {
132134
*/
133135
SocketInfo getSocketInfo();
134136

137+
/**
138+
* Set the connection's input stream to end of stream.
139+
* @throws IOException an IO Exception.
140+
* @since 5.2
141+
*/
142+
@SuppressWarnings("unused")
143+
default void shutdownInput() throws IOException {
144+
throw new UnsupportedOperationException("This connection does not support shutDownInput()");
145+
}
146+
147+
/**
148+
* Disable the socket's output stream.
149+
* @throws IOException an IO Exception
150+
* @since 5.2
151+
*/
152+
@SuppressWarnings("unused")
153+
default void shutdownOutput() throws IOException {
154+
throw new UnsupportedOperationException("This connection does not support shutDownOutput()");
155+
}
156+
135157
}

0 commit comments

Comments
 (0)