Skip to content

Commit 5e7c1ae

Browse files
artembilangaryrussell
authored andcommitted
Add RSocket Java DSL
* Add `@NonNullApi` for RSocket packages * Some code style and JavaDocs polishing
1 parent 0c32a57 commit 5e7c1ae

File tree

14 files changed

+527
-15
lines changed

14 files changed

+527
-15
lines changed

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/AbstractRSocketConnector.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ public abstract class AbstractRSocketConnector
5959
.dataBufferFactory(new DefaultDataBufferFactory())
6060
.build();
6161

62+
private boolean autoStartup = true;
63+
6264
private volatile boolean running;
6365

6466
protected AbstractRSocketConnector(IntegrationRSocketAcceptor rsocketAcceptor) {
@@ -128,6 +130,15 @@ public void afterSingletonsInstantiated() {
128130
this.rsocketAcceptor.detectEndpoints();
129131
}
130132

133+
public void setAutoStartup(boolean autoStartup) {
134+
this.autoStartup = autoStartup;
135+
}
136+
137+
@Override
138+
public boolean isAutoStartup() {
139+
return this.autoStartup;
140+
}
141+
131142
@Override
132143
public void start() {
133144
if (!this.running) {

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ClientRSocketConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public void connect() {
161161

162162
public Mono<RSocketRequester> getRSocketRequester() {
163163
return this.rsocketMono
164-
.map(rsocket -> RSocketRequester.wrap(rsocket, getDataMimeType(), getRSocketStrategies()))
164+
.map((rsocket) -> RSocketRequester.wrap(rsocket, getDataMimeType(), getRSocketStrategies()))
165165
.cache();
166166
}
167167

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocket.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ private Mono<Void> handle(Payload payload) {
129129
int refCount = refCount(dataBuffer);
130130
Message<?> message = MessageBuilder.createMessage(dataBuffer, headers);
131131
return Mono.defer(() -> this.handler.apply(message))
132-
.doFinally(s -> {
132+
.doFinally((signal) -> {
133133
if (refCount(dataBuffer) == refCount) {
134134
DataBufferUtils.release(dataBuffer);
135135
}
@@ -147,19 +147,22 @@ private Flux<Payload> handleAndReply(Payload firstPayload, Flux<Payload> payload
147147
MessageHeaders headers = createHeaders(destination, replyMono);
148148

149149
AtomicBoolean read = new AtomicBoolean();
150-
Flux<DataBuffer> buffers = payloads.map(this::retainDataAndReleasePayload).doOnSubscribe(s -> read.set(true));
150+
Flux<DataBuffer> buffers =
151+
payloads.map(this::retainDataAndReleasePayload)
152+
.doOnSubscribe((subscription) -> read.set(true));
151153
Message<Flux<DataBuffer>> message = MessageBuilder.createMessage(buffers, headers);
152154

153155
return Mono.defer(() -> this.handler.apply(message))
154-
.doFinally(s -> {
156+
.doFinally((signal) -> {
155157
// Subscription should have happened by now due to ChannelSendOperator
156158
if (!read.get()) {
157159
buffers.subscribe(DataBufferUtils::release);
158160
}
159161
})
160-
.thenMany(Flux.defer(() -> replyMono.isTerminated() ?
161-
replyMono.flatMapMany(Function.identity()) :
162-
Mono.error(new IllegalStateException("Something went wrong: reply Mono not set"))));
162+
.thenMany(Flux.defer(() ->
163+
replyMono.isTerminated()
164+
? replyMono.flatMapMany(Function.identity())
165+
: Mono.error(new IllegalStateException("Something went wrong: reply Mono not set"))));
163166
}
164167

165168
private DataBuffer retainDataAndReleasePayload(Payload payload) {
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/**
22
* Provides classes for RSocket XML namespace parsing and configuration support.
33
*/
4+
@org.springframework.lang.NonNullApi
45
package org.springframework.integration.rsocket.config;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.rsocket.dsl;
18+
19+
import org.springframework.core.ResolvableType;
20+
import org.springframework.integration.dsl.MessagingGatewaySpec;
21+
import org.springframework.integration.rsocket.AbstractRSocketConnector;
22+
import org.springframework.integration.rsocket.inbound.RSocketInboundGateway;
23+
import org.springframework.messaging.rsocket.RSocketStrategies;
24+
25+
/**
26+
* The {@link MessagingGatewaySpec} implementation for the {@link RSocketInboundGateway}.
27+
*
28+
* @author Artem Bilan
29+
*
30+
* @since 5.2
31+
*/
32+
public class RSocketInboundGatewaySpec extends MessagingGatewaySpec<RSocketInboundGatewaySpec, RSocketInboundGateway> {
33+
34+
RSocketInboundGatewaySpec(String... path) {
35+
super(new RSocketInboundGateway(path));
36+
}
37+
38+
/**
39+
* Configure {@link RSocketStrategies} instead of a default one.
40+
* @param rsocketStrategies the {@link RSocketStrategies} to use.
41+
* @return the spec
42+
* @see RSocketInboundGateway#setRSocketStrategies(RSocketStrategies)
43+
*/
44+
public RSocketInboundGatewaySpec rsocketStrategies(RSocketStrategies rsocketStrategies) {
45+
this.target.setRSocketStrategies(rsocketStrategies);
46+
return this;
47+
}
48+
49+
/**
50+
* Provide an {@link AbstractRSocketConnector} reference for an explicit endpoint mapping.
51+
* @param rsocketConnector the {@link AbstractRSocketConnector} to use.
52+
* @return the spec
53+
* @see RSocketInboundGateway#setRSocketConnector(AbstractRSocketConnector)
54+
*/
55+
public RSocketInboundGatewaySpec rsocketConnector(AbstractRSocketConnector rsocketConnector) {
56+
this.target.setRSocketConnector(rsocketConnector);
57+
return this;
58+
}
59+
60+
/**
61+
* Specify the type of payload to be generated when the inbound RSocket request
62+
* content is read by the converters/encoders.
63+
* @param requestElementType The payload type.
64+
* @return the spec
65+
* @see RSocketInboundGateway#setRequestElementType(ResolvableType)
66+
*/
67+
public RSocketInboundGatewaySpec requestElementType(ResolvableType requestElementType) {
68+
this.target.setRequestElementType(requestElementType);
69+
return this;
70+
}
71+
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.rsocket.dsl;
18+
19+
import java.util.function.Function;
20+
21+
import org.reactivestreams.Publisher;
22+
23+
import org.springframework.expression.Expression;
24+
import org.springframework.integration.dsl.MessageHandlerSpec;
25+
import org.springframework.integration.expression.FunctionExpression;
26+
import org.springframework.integration.expression.ValueExpression;
27+
import org.springframework.integration.rsocket.ClientRSocketConnector;
28+
import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway;
29+
import org.springframework.messaging.Message;
30+
import org.springframework.messaging.rsocket.RSocketRequester;
31+
import org.springframework.messaging.rsocket.RSocketRequesterMethodArgumentResolver;
32+
33+
/**
34+
* The {@link MessageHandlerSpec} implementation for the {@link RSocketOutboundGateway}.
35+
*
36+
* @author Artem Bilan
37+
*
38+
* @since 5.2
39+
*/
40+
public class RSocketOutboundGatewaySpec extends MessageHandlerSpec<RSocketOutboundGatewaySpec, RSocketOutboundGateway> {
41+
42+
RSocketOutboundGatewaySpec(Expression routeExpression) {
43+
this.target = new RSocketOutboundGateway(routeExpression);
44+
}
45+
46+
/**
47+
* Configure a {@link ClientRSocketConnector} for client side requests based on the connection
48+
* provided by the {@link ClientRSocketConnector#getRSocketRequester()}.
49+
* In case of server side, an {@link RSocketRequester} must be provided in the
50+
* {@link RSocketRequesterMethodArgumentResolver#RSOCKET_REQUESTER_HEADER} header of request message.
51+
* @param clientRSocketConnector the {@link ClientRSocketConnector} to use.
52+
* @return the spec
53+
* @see RSocketOutboundGateway#setClientRSocketConnector(ClientRSocketConnector)
54+
*/
55+
public RSocketOutboundGatewaySpec clientRSocketConnector(ClientRSocketConnector clientRSocketConnector) {
56+
this.target.setClientRSocketConnector(clientRSocketConnector);
57+
return this;
58+
}
59+
60+
/**
61+
* Configure a {@link RSocketOutboundGateway.Command} for RSocket request type.
62+
* @param command the {@link RSocketOutboundGateway.Command} to use.
63+
* @return the spec
64+
* @see RSocketOutboundGateway#setCommand(RSocketOutboundGateway.Command)
65+
*/
66+
public RSocketOutboundGatewaySpec command(RSocketOutboundGateway.Command command) {
67+
return command(new ValueExpression<>(command));
68+
}
69+
70+
/**
71+
* Configure a {@code Function} to evaluate a {@link RSocketOutboundGateway.Command}
72+
* for RSocket request type at runtime against a request message.
73+
* @param commandFunction the {@code Function} to use.
74+
* @param <P> the expected request message payload type.
75+
* @return the spec
76+
* @see RSocketOutboundGateway#setCommandExpression(Expression)
77+
*/
78+
public <P> RSocketOutboundGatewaySpec command(Function<Message<P>, ?> commandFunction) {
79+
return command(new FunctionExpression<>(commandFunction));
80+
}
81+
82+
/**
83+
* Configure a SpEL expression to evaluate a {@link RSocketOutboundGateway.Command}
84+
* for RSocket request type at runtime against a request message.
85+
* @param commandExpression the SpEL expression to use.
86+
* @return the spec
87+
* @see RSocketOutboundGateway#setCommandExpression(Expression)
88+
*/
89+
public RSocketOutboundGatewaySpec command(String commandExpression) {
90+
return command(PARSER.parseExpression(commandExpression));
91+
}
92+
93+
/**
94+
* Configure a SpEL expression to evaluate a {@link RSocketOutboundGateway.Command}
95+
* for RSocket request type at runtime against a request message.
96+
* @param commandExpression the SpEL expression to use.
97+
* @return the spec
98+
* @see RSocketOutboundGateway#setCommandExpression(Expression)
99+
*/
100+
public RSocketOutboundGatewaySpec command(Expression commandExpression) {
101+
this.target.setCommandExpression(commandExpression);
102+
return this;
103+
}
104+
105+
/**
106+
* Configure a type for a request {@link Publisher} elements.
107+
* @param publisherElementType the type of the request {@link Publisher} elements.
108+
* @return the spec
109+
* @see RSocketOutboundGateway#setPublisherElementType(Class)
110+
*/
111+
public RSocketOutboundGatewaySpec publisherElementType(Class<?> publisherElementType) {
112+
return publisherElementType(new ValueExpression<>(publisherElementType));
113+
}
114+
115+
/**
116+
* Configure a {@code Function} to evaluate a request {@link Publisher} elements type at runtime against
117+
* a request message.
118+
* @param publisherElementTypeFunction the {@code Function} to evaluate a type for the request
119+
* {@link Publisher} elements.
120+
* @param <P> the expected request message payload type.
121+
* @return the spec
122+
* @see RSocketOutboundGateway#setPublisherElementTypeExpression(Expression)
123+
*/
124+
public <P> RSocketOutboundGatewaySpec publisherElementType(Function<Message<P>, ?> publisherElementTypeFunction) {
125+
return publisherElementType(new FunctionExpression<>(publisherElementTypeFunction));
126+
}
127+
128+
/**
129+
* Configure a SpEL expression to evaluate a request {@link Publisher} elements type at runtime against
130+
* a request message.
131+
* @param publisherElementTypeExpression the expression to evaluate a type for the request
132+
* {@link Publisher} elements.
133+
* @return the spec
134+
* @see RSocketOutboundGateway#setPublisherElementTypeExpression(Expression)
135+
*/
136+
public RSocketOutboundGatewaySpec publisherElementType(String publisherElementTypeExpression) {
137+
return publisherElementType(PARSER.parseExpression(publisherElementTypeExpression));
138+
}
139+
140+
/**
141+
* Configure a SpEL expression to evaluate a request {@link Publisher} elements type at runtime against
142+
* a request message.
143+
* @param publisherElementTypeExpression the expression to evaluate a type for the request
144+
* {@link Publisher} elements.
145+
* @return the spec
146+
* @see RSocketOutboundGateway#setPublisherElementTypeExpression(Expression)
147+
*/
148+
public RSocketOutboundGatewaySpec publisherElementType(Expression publisherElementTypeExpression) {
149+
this.target.setPublisherElementTypeExpression(publisherElementTypeExpression);
150+
return this;
151+
}
152+
153+
/**
154+
* Specify the expected response type for the RSocket response.
155+
* @param expectedResponseType The expected type.
156+
* @return the spec
157+
* @see RSocketOutboundGateway#setExpectedResponseType(Class)
158+
*/
159+
public RSocketOutboundGatewaySpec expectedResponseType(Class<?> expectedResponseType) {
160+
return expectedResponseType(new ValueExpression<>(expectedResponseType));
161+
}
162+
163+
/**
164+
* Specify the {@code Function} to determine the type for the RSocket response.
165+
* @param expectedResponseTypeFunction The expected response type {@code Function}.
166+
* @param <P> the expected request message payload type.
167+
* @return the spec
168+
* @see RSocketOutboundGateway#setExpectedResponseTypeExpression(Expression)
169+
*/
170+
public <P> RSocketOutboundGatewaySpec expectedResponseType(Function<Message<P>, ?> expectedResponseTypeFunction) {
171+
return expectedResponseType(new FunctionExpression<>(expectedResponseTypeFunction));
172+
}
173+
174+
/**
175+
* Specify the {@link Expression} to determine the type for the RSocket response.
176+
* @param expectedResponseTypeExpression The expected response type expression.
177+
* @return the spec
178+
* @see RSocketOutboundGateway#setExpectedResponseTypeExpression(Expression)
179+
*/
180+
public RSocketOutboundGatewaySpec expectedResponseType(String expectedResponseTypeExpression) {
181+
return expectedResponseType(PARSER.parseExpression(expectedResponseTypeExpression));
182+
}
183+
184+
/**
185+
* Specify the {@link Expression} to determine the type for the RSocket response.
186+
* @param expectedResponseTypeExpression The expected response type expression.
187+
* @return the spec
188+
* @see RSocketOutboundGateway#setExpectedResponseTypeExpression(Expression)
189+
*/
190+
public RSocketOutboundGatewaySpec expectedResponseType(Expression expectedResponseTypeExpression) {
191+
this.target.setExpectedResponseTypeExpression(expectedResponseTypeExpression);
192+
return this;
193+
}
194+
195+
}

0 commit comments

Comments
 (0)