Skip to content

Commit 36e7041

Browse files
artembilangaryrussell
authored andcommitted
Add RSocketInboundGateway; refactoring (#2923)
* Add `RSocketInboundGateway`; refactoring * Extract an `AbstractRSocketConnector` for common client and server connectors logic * Introduce an `IntegrationRSocketAcceptor` and `IntegrationRSocket` for the mapping and handling logic of RSockets and messages in between * Introduce an `IntegrationRSocketEndpoint` marker interface for Inbound Gateway mappings * Add `RSocketInboundGateway` implementation, which is called from the `IntegrationRSocketAcceptor` by the `IntegrationRSocketEndpoint` mapping * Add `RSocketConnectedEvent` to emit when the client is connected to the server. It does not make sense in Spring Integration to delegate such a logic into the `RSocketInboundGateway` * * Add `ServerRSocketConnector` to represent an RSocket server and container for connected `RSocketRequester`s from clients * Extract `accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket)` server logic into an internal `ServerRSocketAcceptor` extension for the `IntegrationRSocketAcceptor` * Address PR comments: - `RSocketConnectedEvent.toString()` - `ApplicationEventPublisherAware` into the `ServerRSocketConnector` - Log RSocket connection if no `this.applicationEventPublisher` * Remove `handleConnectionSetupPayload()` from the `IntegrationRSocket` since it is not delegated to the target handler * Provide reasonable default `RSocketStrategies` for the `AbstractRSocketConnector` and `RSocketInboundGateway` * Add initial `RSocketInboundGatewayIntegrationTests` * * Don't use `BytyBuffer` in the `RSocketConnectedEvent.toString()` * Always `payload.retain()` when we convert `Payload` to `DataBuffer` * Fix `IntegrationRSocketAcceptor.detectEndpoints()` stream logic to really iterate over all the `IntegrationRSocketEndpoint` beans * Fix test to use an explicit `ClientConfig` class for the `@SpringJUnitConfig`: looks like JUnit 5 is OK to scan for `package protected` classes as well * Add request/reply tests into the `RSocketInboundGatewayIntegrationTests` for both server and client sides * * Remove `DataBuffer` from `RSocketConnectedEvent.toString()` * * Fix Checkstyle violation in the RSocketInboundGatewayIntegrationTests
1 parent 4b58bf0 commit 36e7041

File tree

10 files changed

+1364
-39
lines changed

10 files changed

+1364
-39
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.rsocket;
18+
19+
import org.springframework.beans.BeansException;
20+
import org.springframework.beans.factory.DisposableBean;
21+
import org.springframework.beans.factory.InitializingBean;
22+
import org.springframework.beans.factory.SmartInitializingSingleton;
23+
import org.springframework.context.ApplicationContext;
24+
import org.springframework.context.ApplicationContextAware;
25+
import org.springframework.context.SmartLifecycle;
26+
import org.springframework.core.codec.CharSequenceEncoder;
27+
import org.springframework.core.codec.StringDecoder;
28+
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
29+
import org.springframework.messaging.rsocket.RSocketStrategies;
30+
import org.springframework.util.Assert;
31+
import org.springframework.util.MimeType;
32+
import org.springframework.util.MimeTypeUtils;
33+
34+
/**
35+
* A base connector container for common RSocket client and server functionality.
36+
* <p>
37+
* It accepts {@link IntegrationRSocketEndpoint} instances for mapping registration via an internal
38+
* {@link IntegrationRSocketAcceptor} or performs an auto-detection otherwise, when all bean are ready
39+
* in the application context.
40+
*
41+
* @author Artem Bilan
42+
*
43+
* @since 5.2
44+
*
45+
* @see IntegrationRSocketAcceptor
46+
*/
47+
public abstract class AbstractRSocketConnector
48+
implements ApplicationContextAware, InitializingBean, DisposableBean, SmartInitializingSingleton,
49+
SmartLifecycle {
50+
51+
protected final IntegrationRSocketAcceptor rsocketAcceptor; // NOSONAR - final
52+
53+
private MimeType dataMimeType = MimeTypeUtils.TEXT_PLAIN;
54+
55+
private RSocketStrategies rsocketStrategies =
56+
RSocketStrategies.builder()
57+
.decoder(StringDecoder.allMimeTypes())
58+
.encoder(CharSequenceEncoder.allMimeTypes())
59+
.dataBufferFactory(new DefaultDataBufferFactory())
60+
.build();
61+
62+
private volatile boolean running;
63+
64+
private ApplicationContext applicationContext;
65+
66+
protected AbstractRSocketConnector(IntegrationRSocketAcceptor rsocketAcceptor) {
67+
this.rsocketAcceptor = rsocketAcceptor;
68+
}
69+
70+
public void setDataMimeType(MimeType dataMimeType) {
71+
Assert.notNull(dataMimeType, "'dataMimeType' must not be null");
72+
this.dataMimeType = dataMimeType;
73+
}
74+
75+
protected MimeType getDataMimeType() {
76+
return this.dataMimeType;
77+
}
78+
79+
public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
80+
Assert.notNull(rsocketStrategies, "'rsocketStrategies' must not be null");
81+
this.rsocketStrategies = rsocketStrategies;
82+
}
83+
84+
public RSocketStrategies getRSocketStrategies() {
85+
return this.rsocketStrategies;
86+
}
87+
88+
public void setEndpoints(IntegrationRSocketEndpoint... endpoints) {
89+
Assert.notNull(endpoints, "'endpoints' must not be null");
90+
for (IntegrationRSocketEndpoint endpoint : endpoints) {
91+
addEndpoint(endpoint);
92+
}
93+
}
94+
95+
public void addEndpoint(IntegrationRSocketEndpoint endpoint) {
96+
this.rsocketAcceptor.addEndpoint(endpoint);
97+
}
98+
99+
@Override
100+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
101+
this.applicationContext = applicationContext;
102+
this.rsocketAcceptor.setApplicationContext(applicationContext);
103+
}
104+
105+
protected ApplicationContext getApplicationContext() {
106+
return this.applicationContext;
107+
}
108+
109+
@Override
110+
public void afterPropertiesSet() {
111+
this.rsocketAcceptor.setDefaultDataMimeType(this.dataMimeType);
112+
this.rsocketAcceptor.setRSocketStrategies(this.rsocketStrategies);
113+
this.rsocketAcceptor.afterPropertiesSet();
114+
}
115+
116+
@Override
117+
public void afterSingletonsInstantiated() {
118+
this.rsocketAcceptor.detectEndpoints();
119+
}
120+
121+
@Override
122+
public void start() {
123+
if (!this.running) {
124+
this.running = true;
125+
doStart();
126+
}
127+
}
128+
129+
protected abstract void doStart();
130+
131+
@Override
132+
public void stop() {
133+
this.running = false;
134+
}
135+
136+
@Override
137+
public boolean isRunning() {
138+
return this.running;
139+
}
140+
141+
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ClientRSocketConnector.java

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,10 @@
1818

1919
import java.net.URI;
2020
import java.util.function.Consumer;
21+
import java.util.function.Function;
2122

22-
import org.springframework.beans.factory.DisposableBean;
23-
import org.springframework.beans.factory.InitializingBean;
2423
import org.springframework.messaging.rsocket.RSocketRequester;
25-
import org.springframework.messaging.rsocket.RSocketStrategies;
2624
import org.springframework.util.Assert;
27-
import org.springframework.util.MimeType;
28-
import org.springframework.util.MimeTypeUtils;
2925

3026
import io.rsocket.Payload;
3127
import io.rsocket.RSocket;
@@ -39,7 +35,11 @@
3935
import reactor.core.publisher.Mono;
4036

4137
/**
42-
* A client connector to the RSocket server.
38+
* A client {@link AbstractRSocketConnector} extension to the RSocket server.
39+
* <p>
40+
* Note: the {@link RSocketFactory.ClientRSocketFactory#acceptor(Function)}
41+
* in the provided {@link #factoryConfigurer} is overridden with an internal {@link IntegrationRSocketAcceptor}
42+
* for the proper Spring Integration channel adapter mappings.
4343
*
4444
* @author Artem Bilan
4545
*
@@ -48,17 +48,17 @@
4848
* @see RSocketFactory.ClientRSocketFactory
4949
* @see RSocketRequester
5050
*/
51-
public class ClientRSocketConnector implements InitializingBean, DisposableBean {
51+
public class ClientRSocketConnector extends AbstractRSocketConnector {
5252

5353
private final ClientTransport clientTransport;
5454

55-
private MimeType dataMimeType = MimeTypeUtils.TEXT_PLAIN;
55+
private Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = (clientRSocketFactory) -> { };
5656

57-
private Payload connectPayload = EmptyPayload.INSTANCE;
57+
private String connectRoute;
5858

59-
private RSocketStrategies rsocketStrategies = RSocketStrategies.builder().build();
59+
private String connectData = "";
6060

61-
private Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = (clientRSocketFactory) -> { };
61+
private boolean autoConnect;
6262

6363
private Mono<RSocket> rsocketMono;
6464

@@ -71,47 +71,51 @@ public ClientRSocketConnector(URI uri) {
7171
}
7272

7373
public ClientRSocketConnector(ClientTransport clientTransport) {
74+
super(new IntegrationRSocketAcceptor());
7475
Assert.notNull(clientTransport, "'clientTransport' must not be null");
7576
this.clientTransport = clientTransport;
7677
}
7778

78-
public void setDataMimeType(MimeType dataMimeType) {
79-
Assert.notNull(dataMimeType, "'dataMimeType' must not be null");
80-
this.dataMimeType = dataMimeType;
81-
}
82-
8379
public void setFactoryConfigurer(Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer) {
8480
Assert.notNull(factoryConfigurer, "'factoryConfigurer' must not be null");
8581
this.factoryConfigurer = factoryConfigurer;
8682
}
8783

88-
public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
89-
Assert.notNull(rsocketStrategies, "'rsocketStrategies' must not be null");
90-
this.rsocketStrategies = rsocketStrategies;
84+
public void setConnectRoute(String connectRoute) {
85+
this.connectRoute = connectRoute;
9186
}
9287

93-
public void setConnectRoute(String connectRoute) {
94-
this.connectPayload = DefaultPayload.create("", connectRoute);
88+
public void setConnectData(String connectData) {
89+
Assert.notNull(connectData, "'connectData' must not be null");
90+
this.connectData = connectData;
9591
}
9692

9793
@Override
9894
public void afterPropertiesSet() {
95+
super.afterPropertiesSet();
9996
RSocketFactory.ClientRSocketFactory clientFactory =
10097
RSocketFactory.connect()
101-
.dataMimeType(this.dataMimeType.toString());
98+
.dataMimeType(getDataMimeType().toString());
10299
this.factoryConfigurer.accept(clientFactory);
103-
clientFactory.setupPayload(this.connectPayload);
100+
clientFactory.acceptor(this.rsocketAcceptor);
101+
Payload connectPayload = EmptyPayload.INSTANCE;
102+
if (this.connectRoute != null) {
103+
connectPayload = DefaultPayload.create(this.connectData, this.connectRoute);
104+
}
105+
clientFactory.setupPayload(connectPayload);
104106
this.rsocketMono = clientFactory.transport(this.clientTransport).start().cache();
105107
}
106108

107-
public void connect() {
108-
this.rsocketMono.subscribe();
109+
@Override
110+
public void afterSingletonsInstantiated() {
111+
this.autoConnect = this.rsocketAcceptor.detectEndpoints();
109112
}
110113

111-
public Mono<RSocketRequester> getRSocketRequester() {
112-
return this.rsocketMono
113-
.map(rsocket -> RSocketRequester.wrap(rsocket, this.dataMimeType, this.rsocketStrategies))
114-
.cache();
114+
@Override
115+
protected void doStart() {
116+
if (this.autoConnect) {
117+
connect();
118+
}
115119
}
116120

117121
@Override
@@ -121,4 +125,17 @@ public void destroy() {
121125
.subscribe();
122126
}
123127

128+
/**
129+
* Perform subscription into the RSocket server for incoming requests.
130+
*/
131+
public void connect() {
132+
this.rsocketMono.subscribe();
133+
}
134+
135+
public Mono<RSocketRequester> getRSocketRequester() {
136+
return this.rsocketMono
137+
.map(rsocket -> RSocketRequester.wrap(rsocket, getDataMimeType(), getRSocketStrategies()))
138+
.cache();
139+
}
140+
124141
}

0 commit comments

Comments
 (0)