Skip to content

Commit 18e1d59

Browse files
committed
* Don't use BytyBuffer in the RSocketConnectedEvent.toString()
* Always `payload.retain()` when we convert `Payload` to `DataBuffer` * Fix `IntegrationRSocketAcceptor.detectEndpoints()` stream logic to really iterate over all the `IntegrationRSocketEndpoint` beans * Fix test to use an explicit `ClientConfig` class for the `@SpringJUnitConfig`: looks like JUnit 5 is OK to scan for `package protected` classes as well * Add request/reply tests into the `RSocketInboundGatewayIntegrationTests` for both server and client sides
1 parent ce80cc9 commit 18e1d59

File tree

6 files changed

+77
-31
lines changed

6 files changed

+77
-31
lines changed

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocket.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ static String getDestination(Payload payload) {
187187
}
188188

189189
static DataBuffer payloadToDataBuffer(Payload payload, DataBufferFactory bufferFactory) {
190+
payload.retain();
190191
try {
191192
if (bufferFactory instanceof NettyDataBufferFactory) {
192193
ByteBuf byteBuf = payload.sliceData().retain();

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketAcceptor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ public boolean detectEndpoints() {
8282
.values()
8383
.stream()
8484
.peek(this::addEndpoint)
85-
.findAny()
86-
.isPresent();
85+
.count() > 0;
8786
}
8887
else {
8988
return false;

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/RSocketConnectedEvent.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package org.springframework.integration.rsocket;
1818

19-
import java.util.Arrays;
20-
2119
import org.springframework.core.io.buffer.DataBuffer;
2220
import org.springframework.integration.events.IntegrationEvent;
2321
import org.springframework.messaging.rsocket.RSocketRequester;
@@ -67,7 +65,7 @@ public RSocketRequester getRequester() {
6765
public String toString() {
6866
return "RSocketConnectedEvent{" +
6967
"destination='" + this.destination + '\'' +
70-
", data=" + Arrays.toString(this.data.asByteBuffer().array()) +
68+
", data length=" + this.data.asByteBuffer().array().length +
7169
", requester=" + this.requester +
7270
'}';
7371
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ServerRSocketConnector.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,9 @@ private static class ServerRSocketAcceptor extends IntegrationRSocketAcceptor im
148148

149149
@Override
150150
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) {
151-
setupPayload.retain();
152151
String destination = IntegrationRSocket.getDestination(setupPayload);
153-
DataBuffer dataBuffer = IntegrationRSocket.payloadToDataBuffer(setupPayload,
154-
getRSocketStrategies().dataBufferFactory());
152+
DataBuffer dataBuffer =
153+
IntegrationRSocket.payloadToDataBuffer(setupPayload, getRSocketStrategies().dataBufferFactory());
155154
int refCount = IntegrationRSocket.refCount(dataBuffer);
156155
return Mono.just(sendingRSocket)
157156
.map(this::createRSocket)

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/inbound/RSocketInboundGatewayIntegrationTests.java

Lines changed: 71 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.rsocket.inbound;
1818

19+
import static org.assertj.core.api.Assertions.assertThat;
20+
1921
import java.time.Duration;
2022

2123
import org.junit.jupiter.api.AfterAll;
@@ -32,12 +34,15 @@
3234
import org.springframework.core.codec.CharSequenceEncoder;
3335
import org.springframework.core.codec.StringDecoder;
3436
import org.springframework.core.io.buffer.NettyDataBufferFactory;
37+
import org.springframework.integration.annotation.Transformer;
3538
import org.springframework.integration.channel.FluxMessageChannel;
39+
import org.springframework.integration.channel.QueueChannel;
3640
import org.springframework.integration.config.EnableIntegration;
3741
import org.springframework.integration.rsocket.ClientRSocketConnector;
3842
import org.springframework.integration.rsocket.RSocketConnectedEvent;
3943
import org.springframework.integration.rsocket.ServerRSocketConnector;
4044
import org.springframework.messaging.Message;
45+
import org.springframework.messaging.PollableChannel;
4146
import org.springframework.messaging.rsocket.RSocketRequester;
4247
import org.springframework.messaging.rsocket.RSocketStrategies;
4348
import org.springframework.test.annotation.DirtiesContext;
@@ -47,6 +52,7 @@
4752
import io.rsocket.frame.decoder.PayloadDecoder;
4853
import io.rsocket.transport.netty.server.TcpServerTransport;
4954
import reactor.core.publisher.Flux;
55+
import reactor.core.publisher.Mono;
5056
import reactor.core.publisher.MonoProcessor;
5157
import reactor.netty.tcp.TcpServer;
5258
import reactor.test.StepVerifier;
@@ -56,7 +62,7 @@
5662
*
5763
* @since 5.2
5864
*/
59-
@SpringJUnitConfig
65+
@SpringJUnitConfig(RSocketInboundGatewayIntegrationTests.ClientConfig.class)
6066
@DirtiesContext
6167
public class RSocketInboundGatewayIntegrationTests {
6268

@@ -66,13 +72,15 @@ public class RSocketInboundGatewayIntegrationTests {
6672

6773
private static ServerConfig serverConfig;
6874

69-
private static FluxMessageChannel serverInputChannel;
75+
private static PollableChannel serverFireAndForgetChannelChannel;
76+
77+
;
7078

7179
@Autowired
7280
private ClientRSocketConnector clientRSocketConnector;
7381

7482
@Autowired
75-
private FluxMessageChannel inputChannel;
83+
private PollableChannel fireAndForgetChannelChannel;
7684

7785
private RSocketRequester serverRsocketRequester;
7886

@@ -82,7 +90,7 @@ public class RSocketInboundGatewayIntegrationTests {
8290
static void setup() {
8391
serverContext = new AnnotationConfigApplicationContext(ServerConfig.class);
8492
serverConfig = serverContext.getBean(ServerConfig.class);
85-
serverInputChannel = serverContext.getBean("inputChannel", FluxMessageChannel.class);
93+
serverFireAndForgetChannelChannel = serverContext.getBean("fireAndForgetChannelChannel", PollableChannel.class);
8694
}
8795

8896
@AfterAll
@@ -103,29 +111,52 @@ void setupTest(TestInfo testInfo) {
103111

104112
@Test
105113
void clientFireAndForget() {
106-
fireAndForget(serverInputChannel, this.clientRsocketRequester);
114+
fireAndForget(serverFireAndForgetChannelChannel, this.clientRsocketRequester);
107115
}
108116

109117
@Test
110118
void serverFireAndForget() {
111-
fireAndForget(this.inputChannel, this.serverRsocketRequester);
119+
fireAndForget(this.fireAndForgetChannelChannel, this.serverRsocketRequester);
120+
}
121+
122+
private void fireAndForget(PollableChannel inputChannel, RSocketRequester rsocketRequester) {
123+
rsocketRequester.route("receive")
124+
.data("Hello")
125+
.send()
126+
.subscribe();
127+
128+
Message<?> receive = inputChannel.receive(10_000);
129+
assertThat(receive)
130+
.isNotNull()
131+
.extracting(Message::getPayload)
132+
.isEqualTo("Hello");
112133
}
113134

114-
private void fireAndForget(FluxMessageChannel inputChannel, RSocketRequester rsocketRequester) {
115-
StepVerifier.create(
116-
Flux.from(inputChannel)
117-
.map(Message::getPayload)
118-
.cast(String.class))
119-
.then(() ->
120-
rsocketRequester.route("receive")
121-
.data("Hello")
122-
.send()
123-
.subscribe())
124-
.expectNext("Hello")
125-
.thenCancel()
126-
.verify();
135+
@Test
136+
void clientEcho() {
137+
echo(this.clientRsocketRequester);
127138
}
128139

140+
@Test
141+
void serverEcho() {
142+
echo(this.serverRsocketRequester);
143+
}
144+
145+
private void echo(RSocketRequester rsocketRequester) {
146+
Flux<String> result =
147+
Flux.range(1, 3)
148+
.concatMap(i ->
149+
rsocketRequester.route("echo")
150+
.data("hello " + i)
151+
.retrieveMono(String.class));
152+
153+
StepVerifier.create(result)
154+
.expectNext("HELLO 1", "HELLO 2", "HELLO 3")
155+
.expectComplete()
156+
.verify(Duration.ofSeconds(10));
157+
}
158+
159+
129160
private abstract static class CommonConfig {
130161

131162
@Bean
@@ -138,18 +169,36 @@ public RSocketStrategies rsocketStrategies() {
138169
}
139170

140171
@Bean
141-
public RSocketInboundGateway rsocketInboundGateway() {
172+
public PollableChannel fireAndForgetChannelChannel() {
173+
return new QueueChannel();
174+
}
175+
176+
@Bean
177+
public RSocketInboundGateway rsocketInboundGatewayFireAndForget() {
142178
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("receive");
143179
rsocketInboundGateway.setRSocketStrategies(rsocketStrategies());
144-
rsocketInboundGateway.setRequestChannel(inputChannel());
180+
rsocketInboundGateway.setRequestChannel(fireAndForgetChannelChannel());
181+
return rsocketInboundGateway;
182+
}
183+
184+
@Bean
185+
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
186+
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
187+
rsocketInboundGateway.setRSocketStrategies(rsocketStrategies());
188+
rsocketInboundGateway.setRequestChannel(requestReplyChannel());
145189
return rsocketInboundGateway;
146190
}
147191

148192
@Bean
149-
public FluxMessageChannel inputChannel() {
193+
public FluxMessageChannel requestReplyChannel() {
150194
return new FluxMessageChannel();
151195
}
152196

197+
@Transformer(inputChannel = "requestReplyChannel")
198+
public Mono<String> echoTransformation(Flux<String> payload) {
199+
return payload.next().map(String::toUpperCase);
200+
}
201+
153202
}
154203

155204
@Configuration

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@
7979
*
8080
* @since 5.2
8181
*/
82-
@SpringJUnitConfig
82+
@SpringJUnitConfig(RSocketOutboundGatewayIntegrationTests.ClientConfig.class)
8383
@DirtiesContext
8484
public class RSocketOutboundGatewayIntegrationTests {
8585

0 commit comments

Comments
 (0)