Skip to content

Commit 97aaf95

Browse files
committed
Polishing RSocket module according SF changes
* Fix Checkstyle violations in the `MessageHistoryConfigurer`
1 parent e833a44 commit 97aaf95

File tree

10 files changed

+130
-131
lines changed

10 files changed

+130
-131
lines changed

spring-integration-core/src/main/java/org/springframework/integration/history/MessageHistoryConfigurer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,8 @@ private void trackComponentIfAny(TrackableComponent component) {
154154
component.setShouldTrack(shouldTrack);
155155
if (shouldTrack) {
156156
this.currentlyTrackedComponents.add(component);
157-
if (this.LOGGER.isInfoEnabled()) {
158-
this.LOGGER.info("Enabling MessageHistory tracking for component '" + componentName + "'");
157+
if (LOGGER.isInfoEnabled()) {
158+
LOGGER.info("Enabling MessageHistory tracking for component '" + componentName + "'");
159159
}
160160
}
161161
}
@@ -217,8 +217,8 @@ public void stop() {
217217
if (this.running) {
218218
this.currentlyTrackedComponents.forEach(component -> {
219219
component.setShouldTrack(false);
220-
if (this.LOGGER.isInfoEnabled()) {
221-
this.LOGGER.info("Disabling MessageHistory tracking for component '"
220+
if (LOGGER.isInfoEnabled()) {
221+
LOGGER.info("Disabling MessageHistory tracking for component '"
222222
+ component.getComponentName() + "'");
223223
}
224224
});

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/AbstractRSocketConnector.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,20 +35,20 @@
3535
* A base connector container for common RSocket client and server functionality.
3636
* <p>
3737
* It accepts {@link IntegrationRSocketEndpoint} instances for mapping registration via an internal
38-
* {@link IntegrationRSocketAcceptor} or performs an auto-detection otherwise, when all bean are ready
38+
* {@link IntegrationRSocketMessageHandler} or performs an auto-detection otherwise, when all bean are ready
3939
* in the application context.
4040
*
4141
* @author Artem Bilan
4242
*
4343
* @since 5.2
4444
*
45-
* @see IntegrationRSocketAcceptor
45+
* @see IntegrationRSocketMessageHandler
4646
*/
4747
public abstract class AbstractRSocketConnector
4848
implements ApplicationContextAware, InitializingBean, DisposableBean, SmartInitializingSingleton,
4949
SmartLifecycle {
5050

51-
protected final IntegrationRSocketAcceptor rsocketAcceptor; // NOSONAR - final
51+
protected final IntegrationRSocketMessageHandler rSocketMessageHandler; // NOSONAR - final
5252

5353
private MimeType dataMimeType = MimeTypeUtils.TEXT_PLAIN;
5454

@@ -65,8 +65,8 @@ public abstract class AbstractRSocketConnector
6565

6666
private volatile boolean running;
6767

68-
protected AbstractRSocketConnector(IntegrationRSocketAcceptor rsocketAcceptor) {
69-
this.rsocketAcceptor = rsocketAcceptor;
68+
protected AbstractRSocketConnector(IntegrationRSocketMessageHandler rSocketMessageHandler) {
69+
this.rSocketMessageHandler = rSocketMessageHandler;
7070
}
7171

7272
/**
@@ -126,25 +126,25 @@ public void setEndpoints(IntegrationRSocketEndpoint... endpoints) {
126126
* @param endpoint the {@link IntegrationRSocketEndpoint} to map.
127127
*/
128128
public void addEndpoint(IntegrationRSocketEndpoint endpoint) {
129-
this.rsocketAcceptor.addEndpoint(endpoint);
129+
this.rSocketMessageHandler.addEndpoint(endpoint);
130130
}
131131

132132
@Override
133133
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
134-
this.rsocketAcceptor.setApplicationContext(applicationContext);
134+
this.rSocketMessageHandler.setApplicationContext(applicationContext);
135135
}
136136

137137
@Override
138138
public void afterPropertiesSet() {
139-
this.rsocketAcceptor.setDefaultDataMimeType(this.dataMimeType);
140-
this.rsocketAcceptor.setDefaultMetadataMimeType(this.metadataMimeType);
141-
this.rsocketAcceptor.setRSocketStrategies(this.rsocketStrategies);
142-
this.rsocketAcceptor.afterPropertiesSet();
139+
this.rSocketMessageHandler.setDefaultDataMimeType(this.dataMimeType);
140+
this.rSocketMessageHandler.setDefaultMetadataMimeType(this.metadataMimeType);
141+
this.rSocketMessageHandler.setRSocketStrategies(this.rsocketStrategies);
142+
this.rSocketMessageHandler.afterPropertiesSet();
143143
}
144144

145145
@Override
146146
public void afterSingletonsInstantiated() {
147-
this.rsocketAcceptor.detectEndpoints();
147+
this.rSocketMessageHandler.detectEndpoints();
148148
}
149149

150150
public void setAutoStartup(boolean autoStartup) {

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ClientRSocketConnector.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@
3737
* A client {@link AbstractRSocketConnector} extension to the RSocket server.
3838
* <p>
3939
* Note: the {@link RSocketFactory.ClientRSocketFactory#acceptor(java.util.function.Function)}
40-
* in the provided {@link #factoryConfigurer} is overridden with an internal {@link IntegrationRSocketAcceptor}
40+
* in the provided {@link #factoryConfigurer} is overridden with an internal
41+
* {@link IntegrationRSocketMessageHandler#clientAcceptor()}
4142
* for the proper Spring Integration channel adapter mappings.
4243
*
4344
* @author Artem Bilan
@@ -85,7 +86,7 @@ public ClientRSocketConnector(URI uri) {
8586
* @param clientTransport the {@link ClientTransport} to use.
8687
*/
8788
public ClientRSocketConnector(ClientTransport clientTransport) {
88-
super(new IntegrationRSocketAcceptor());
89+
super(new IntegrationRSocketMessageHandler());
8990
Assert.notNull(clientTransport, "'clientTransport' must not be null");
9091
this.clientTransport = clientTransport;
9192
}
@@ -125,7 +126,7 @@ public void afterPropertiesSet() {
125126
.dataMimeType(getDataMimeType().toString())
126127
.metadataMimeType(getMetadataMimeType().toString());
127128
this.factoryConfigurer.accept(clientFactory);
128-
clientFactory.acceptor(this.rsocketAcceptor);
129+
clientFactory.acceptor(this.rSocketMessageHandler.clientAcceptor());
129130
Payload connectPayload = EmptyPayload.INSTANCE;
130131
if (this.connectRoute != null) {
131132
connectPayload = DefaultPayload.create(this.connectData, this.connectRoute);
@@ -136,7 +137,7 @@ public void afterPropertiesSet() {
136137

137138
@Override
138139
public void afterSingletonsInstantiated() {
139-
this.autoConnect = this.rsocketAcceptor.detectEndpoints();
140+
this.autoConnect = this.rSocketMessageHandler.detectEndpoints();
140141
}
141142

142143
@Override

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocket.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646

4747
import io.netty.buffer.ByteBuf;
4848
import io.rsocket.AbstractRSocket;
49+
import io.rsocket.ConnectionSetupPayload;
4950
import io.rsocket.Payload;
5051
import io.rsocket.metadata.CompositeMetadata;
5152
import reactor.core.publisher.Flux;
@@ -109,8 +110,23 @@ class IntegrationRSocket extends AbstractRSocket {
109110
this.bufferFactory = bufferFactory;
110111
}
111112

112-
public RSocketRequester getRequester() {
113-
return this.requester;
113+
/**
114+
* Wrap the {@link ConnectionSetupPayload} with a {@link Message} and
115+
* delegate to {@link #handle(Payload)} for handling.
116+
* @param payload the connection payload
117+
* @return completion handle for success or error
118+
*/
119+
Mono<Message<DataBuffer>> handleConnectionSetupPayload(ConnectionSetupPayload payload) {
120+
String destination = getDestination(payload);
121+
MessageHeaders headers = createHeaders(destination, null);
122+
DataBuffer dataBuffer = retainDataAndReleasePayload(payload);
123+
int refCount = refCount(dataBuffer);
124+
return Mono.just(MessageBuilder.createMessage(dataBuffer, headers))
125+
.doFinally(s -> {
126+
if (refCount(dataBuffer) == refCount) {
127+
DataBufferUtils.release(dataBuffer);
128+
}
129+
});
114130
}
115131

116132
@Override
Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodArgumentResolver;
3232
import org.springframework.messaging.handler.invocation.reactive.SyncHandlerMethodArgumentResolver;
3333
import org.springframework.messaging.rsocket.RSocketRequester;
34-
import org.springframework.messaging.rsocket.RSocketStrategies;
3534
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
3635
import org.springframework.util.Assert;
3736
import org.springframework.util.MimeType;
@@ -45,20 +44,15 @@
4544
/**
4645
* The {@link RSocketMessageHandler} extension for Spring Integration needs.
4746
* <p>
48-
* The most of logic is copied from {@link org.springframework.messaging.rsocket.MessageHandlerAcceptor}.
49-
* That cannot be extended because it is {@link final}.
50-
* <p>
51-
* This class adds an {@link IntegrationRSocketEndpoint} beans detection and registration functionality,
52-
* as well as serves as a container over an internal {@link IntegrationRSocket} implementation.
47+
* This class adds an {@link IntegrationRSocketEndpoint} beans detection and registration functionality.
5348
*
5449
* @author Artem Bilan
5550
*
5651
* @since 5.2
5752
*
58-
* @see org.springframework.messaging.rsocket.MessageHandlerAcceptor
53+
* @see RSocketMessageHandler
5954
*/
60-
class IntegrationRSocketAcceptor extends RSocketMessageHandler
61-
implements BiFunction<ConnectionSetupPayload, RSocket, RSocket> {
55+
class IntegrationRSocketMessageHandler extends RSocketMessageHandler {
6256

6357
private static final Method HANDLE_MESSAGE_METHOD =
6458
ReflectionUtils.findMethod(ReactiveMessageHandler.class, "handleMessage", Message.class);
@@ -68,7 +62,7 @@ class IntegrationRSocketAcceptor extends RSocketMessageHandler
6862

6963
private MimeType defaultMetadataMimeType = IntegrationRSocket.COMPOSITE_METADATA;
7064

71-
IntegrationRSocketAcceptor() {
65+
IntegrationRSocketMessageHandler() {
7266
setHandlerPredicate((clazz) -> false);
7367
}
7468

@@ -84,6 +78,7 @@ public void setDefaultDataMimeType(@Nullable MimeType defaultDataMimeType) {
8478
this.defaultDataMimeType = defaultDataMimeType;
8579
}
8680

81+
8782
/**
8883
* Configure the default {@code MimeType} for payload data if the
8984
* {@code SETUP} frame did not specify one.
@@ -96,6 +91,11 @@ public void setDefaultMetadataMimeType(MimeType mimeType) {
9691
this.defaultMetadataMimeType = mimeType;
9792
}
9893

94+
@Override
95+
public BiFunction<ConnectionSetupPayload, RSocket, RSocket> clientAcceptor() {
96+
return this::createRSocket;
97+
}
98+
9999
public boolean detectEndpoints() {
100100
ApplicationContext applicationContext = getApplicationContext();
101101
if (applicationContext != null && getHandlerMethods().isEmpty()) {
@@ -122,27 +122,19 @@ protected List<? extends HandlerMethodArgumentResolver> initArgumentResolvers()
122122
return Collections.singletonList(new MessageHandlerMethodArgumentResolver());
123123
}
124124

125-
@Override
126-
public RSocket apply(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) {
127-
return createRSocket(setupPayload, sendingRSocket);
128-
}
129-
130125
protected IntegrationRSocket createRSocket(ConnectionSetupPayload setupPayload, RSocket rsocket) {
131-
RSocketStrategies rsocketStrategies = getRSocketStrategies();
132-
MimeType dataMimeType =
133-
StringUtils.hasText(setupPayload.dataMimeType())
134-
? MimeTypeUtils.parseMimeType(setupPayload.dataMimeType())
135-
: this.defaultDataMimeType;
136-
Assert.notNull(dataMimeType, "No `dataMimeType` in the ConnectionSetupPayload and no default value");
137-
138-
MimeType metadataMimeType =
139-
StringUtils.hasText(setupPayload.metadataMimeType())
140-
? MimeTypeUtils.parseMimeType(setupPayload.metadataMimeType())
141-
: this.defaultMetadataMimeType;
142-
Assert.notNull(dataMimeType, "No `metadataMimeType` in the ConnectionSetupPayload and no default value");
143-
return new IntegrationRSocket(this, getRouteMatcher(),
144-
RSocketRequester.wrap(rsocket, dataMimeType, metadataMimeType, rsocketStrategies),
145-
dataMimeType, metadataMimeType, rsocketStrategies.dataBufferFactory());
126+
String s = setupPayload.dataMimeType();
127+
MimeType dataMimeType = StringUtils.hasText(s) ? MimeTypeUtils.parseMimeType(s) : this.defaultDataMimeType;
128+
Assert.notNull(dataMimeType, "No `dataMimeType` in ConnectionSetupPayload and no default value");
129+
130+
s = setupPayload.metadataMimeType();
131+
MimeType metaMimeType = StringUtils.hasText(s) ? MimeTypeUtils.parseMimeType(s) : this.defaultMetadataMimeType;
132+
Assert.notNull(dataMimeType, "No `metadataMimeType` in ConnectionSetupPayload and no default value");
133+
134+
RSocketRequester requester = RSocketRequester.wrap(rsocket, dataMimeType, metaMimeType, getRSocketStrategies());
135+
136+
return new IntegrationRSocket(this, getRouteMatcher(), requester, dataMimeType, metaMimeType,
137+
getRSocketStrategies().dataBufferFactory());
146138
}
147139

148140
private static final class MessageHandlerMethodArgumentResolver implements SyncHandlerMethodArgumentResolver {

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/RSocketConnectedEvent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
*
3232
* @since 5.2
3333
*
34-
* @see IntegrationRSocketAcceptor
34+
* @see IntegrationRSocketMessageHandler
3535
*/
3636
@SuppressWarnings("serial")
3737
public class RSocketConnectedEvent extends IntegrationEvent {

0 commit comments

Comments
 (0)