Skip to content

Commit 1b59b4e

Browse files
committed
Add Reactor 11 support
This change adds support for Reactor 1.1 in spring-messaging in addition to Reactor 1.0.1 -- whichever is present on the classpath is used. Note also the module name change: reactor-tcp:1.0.1 -> reactor-net:1.1.0 Issue: SPR-11636
1 parent f6b89c2 commit 1b59b4e

File tree

11 files changed

+455
-24
lines changed

11 files changed

+455
-24
lines changed

build.gradle

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ configure(allprojects) { project ->
7878
repositories {
7979
maven { url "http://repo.spring.io/libs-release" }
8080
maven { url "http://repo.spring.io/milestone" } // for AspectJ 1.8.0.RC2
81+
maven { url "http://repo.spring.io/snapshot" } // temporarily until Reactor 1.1.0.RC1
8182
}
8283

8384
dependencies {
@@ -396,7 +397,8 @@ project("spring-messaging") {
396397
compile(project(":spring-beans"))
397398
compile(project(":spring-core"))
398399
compile(project(":spring-context"))
399-
optional("org.projectreactor:reactor-core:1.0.1.RELEASE")
400+
optional("org.projectreactor:reactor-core:1.1.0.BUILD-SNAPSHOT")
401+
optional("org.projectreactor:reactor-net:1.1.0.BUILD-SNAPSHOT")
400402
optional("org.projectreactor:reactor-tcp:1.0.1.RELEASE")
401403
optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") {
402404
exclude group: "javax.servlet", module: "javax.servlet-api"
@@ -623,8 +625,9 @@ project("spring-websocket") {
623625
testCompile("org.apache.tomcat.embed:tomcat-embed-core:8.0.3")
624626
testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:8.0.3")
625627
testCompile("org.apache.tomcat.embed:tomcat-embed-logging-juli:8.0.3")
626-
testCompile("org.projectreactor:reactor-core:1.0.1.RELEASE")
627-
testCompile("org.projectreactor:reactor-tcp:1.0.1.RELEASE")
628+
629+
testCompile("org.projectreactor:reactor-core:1.1.0.BUILD-SNAPSHOT")
630+
testCompile("org.projectreactor:reactor-net:1.1.0.BUILD-SNAPSHOT")
628631
testCompile("log4j:log4j:1.2.17")
629632
testCompile("org.slf4j:slf4j-jcl:${slf4jVersion}")
630633
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright 2002-2014 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.simp.stomp;
18+
19+
import org.springframework.messaging.Message;
20+
21+
import org.springframework.util.Assert;
22+
import reactor.function.Consumer;
23+
import reactor.function.Function;
24+
import reactor.io.Buffer;
25+
import reactor.io.encoding.Codec;
26+
27+
import java.nio.ByteBuffer;
28+
29+
/**
30+
* A Reactor TCP {@link reactor.io.encoding.Codec} for sending and receiving STOMP messages.
31+
*
32+
* @author Andy Wilkinson
33+
* @author Rossen Stoyanchev
34+
* @since 4.0
35+
*/
36+
public class Reactor11StompCodec implements Codec<Buffer, Message<byte[]>, Message<byte[]>> {
37+
38+
private final StompDecoder stompDecoder;
39+
40+
private final StompEncoder stompEncoder;
41+
42+
private final Function<Message<byte[]>, Buffer> encodingFunction;
43+
44+
45+
public Reactor11StompCodec() {
46+
this(new StompEncoder(), new StompDecoder());
47+
}
48+
49+
public Reactor11StompCodec(StompEncoder encoder, StompDecoder decoder) {
50+
Assert.notNull(encoder, "'encoder' is required");
51+
Assert.notNull(decoder, "'decoder' is required");
52+
this.stompEncoder = encoder;
53+
this.stompDecoder = decoder;
54+
this.encodingFunction = new EncodingFunction(this.stompEncoder);
55+
}
56+
57+
@Override
58+
public Function<Buffer, Message<byte[]>> decoder(final Consumer<Message<byte[]>> messageConsumer) {
59+
return new DecodingFunction(this.stompDecoder, messageConsumer);
60+
}
61+
62+
@Override
63+
public Function<Message<byte[]>, Buffer> encoder() {
64+
return this.encodingFunction;
65+
}
66+
67+
68+
private static class EncodingFunction implements Function<Message<byte[]>, Buffer> {
69+
70+
private final StompEncoder encoder;
71+
72+
private EncodingFunction(StompEncoder encoder) {
73+
this.encoder = encoder;
74+
}
75+
76+
@Override
77+
public Buffer apply(Message<byte[]> message) {
78+
byte[] bytes = this.encoder.encode(message);
79+
return new Buffer(ByteBuffer.wrap(bytes));
80+
}
81+
}
82+
83+
private static class DecodingFunction implements Function<Buffer, Message<byte[]>> {
84+
85+
private final StompDecoder decoder;
86+
87+
private final Consumer<Message<byte[]>> messageConsumer;
88+
89+
public DecodingFunction(StompDecoder decoder, Consumer<Message<byte[]>> next) {
90+
this.decoder = decoder;
91+
this.messageConsumer = next;
92+
}
93+
94+
@Override
95+
public Message<byte[]> apply(Buffer buffer) {
96+
for (Message<byte[]> message : this.decoder.decode(buffer.byteBuffer())) {
97+
this.messageConsumer.accept(message);
98+
}
99+
return null;
100+
}
101+
}
102+
}

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.messaging.simp.stomp;
1818

19-
import java.io.IOException;
2019
import java.util.Collection;
2120
import java.util.Map;
2221
import java.util.concurrent.Callable;
@@ -34,8 +33,8 @@
3433
import org.springframework.messaging.tcp.TcpConnection;
3534
import org.springframework.messaging.tcp.TcpConnectionHandler;
3635
import org.springframework.messaging.tcp.TcpOperations;
37-
import org.springframework.messaging.tcp.reactor.ReactorTcpClient;
3836
import org.springframework.util.Assert;
37+
import org.springframework.util.ClassUtils;
3938
import org.springframework.util.concurrent.ListenableFuture;
4039
import org.springframework.util.concurrent.ListenableFutureCallback;
4140
import org.springframework.util.concurrent.ListenableFutureTask;
@@ -69,6 +68,12 @@
6968
*/
7069
public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler {
7170

71+
private static final boolean reactor10Present =
72+
ClassUtils.isPresent("reactor.tcp.TcpClient", StompBrokerRelayMessageHandler.class.getClassLoader());
73+
74+
private static final boolean reactor11Present =
75+
ClassUtils.isPresent("reactor.net.tcp.TcpClient", StompBrokerRelayMessageHandler.class.getClassLoader());
76+
7277
private static final byte[] EMPTY_PAYLOAD = new byte[0];
7378

7479
private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<Void>(new VoidCallable());
@@ -331,7 +336,15 @@ protected void startInternal() {
331336
this.brokerChannel.subscribe(this);
332337

333338
if (this.tcpClient == null) {
334-
this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort);
339+
if (reactor11Present) {
340+
this.tcpClient = new Reactor11TcpClientFactory().create(this.relayHost, this.relayPort);
341+
}
342+
else if (reactor10Present) {
343+
this.tcpClient = new Reactor10TcpClientFactory().create(this.relayHost, this.relayPort);
344+
}
345+
else {
346+
throw new IllegalStateException("Please add the \"org.projectreactor:reactor-net\" dependency");
347+
}
335348
}
336349

337350
if (logger.isDebugEnabled()) {
@@ -606,6 +619,14 @@ public void run() {
606619
}
607620
}
608621

622+
@Override
623+
public void handleFailure(Throwable ex) {
624+
if (this.tcpConnection == null) {
625+
return;
626+
}
627+
handleTcpConnectionFailure("Closing connection after TCP failure", ex);
628+
}
629+
609630
@Override
610631
public void afterConnectionClosed() {
611632
if (this.tcpConnection == null) {
@@ -753,10 +774,19 @@ public ListenableFuture<Void> forward(Message<?> message) {
753774
}
754775
}
755776

756-
private static class StompTcpClientFactory {
777+
private static class Reactor11TcpClientFactory {
778+
779+
public TcpOperations<byte[]> create(String host, int port) {
780+
return new org.springframework.messaging.tcp.reactor.Reactor11TcpClient<byte[]>(
781+
host, port, new Reactor11StompCodec());
782+
}
783+
}
784+
785+
private static class Reactor10TcpClientFactory {
757786

758-
public TcpOperations<byte[]> create(String relayHost, int relayPort) {
759-
return new ReactorTcpClient<byte[]>(relayHost, relayPort, new StompCodec());
787+
public TcpOperations<byte[]> create(String host, int port) {
788+
return new org.springframework.messaging.tcp.reactor.ReactorTcpClient<byte[]>(
789+
host, port, new StompCodec());
760790
}
761791
}
762792

spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpConnectionHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ public interface TcpConnectionHandler<P> {
4747
*/
4848
void handleMessage(Message<P> message);
4949

50+
/**
51+
* Handle a failure on the connection.
52+
* @param ex the exception
53+
*/
54+
void handleFailure(Throwable ex);
55+
5056
/**
5157
* Invoked after the connection is closed.
5258
*/

spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpOperations.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,6 @@ public interface TcpOperations<P> {
5050
* @return a ListenableFuture that can be used to determine when and if the
5151
* connection is successfully closed
5252
*/
53-
ListenableFuture<Void> shutdown();
53+
ListenableFuture<Boolean> shutdown();
5454

5555
}

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
* @author Rossen Stoyanchev
3939
* @since 4.0
4040
*/
41-
abstract class AbstractPromiseToListenableFutureAdapter<S, T> implements ListenableFuture<T> {
41+
public abstract class AbstractPromiseToListenableFutureAdapter<S, T> implements ListenableFuture<T> {
4242

4343
private final Promise<S> promise;
4444

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/PassThroughPromiseToListenableFutureAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
* @author Rossen Stoyanchev
2626
* @since 4.0
2727
*/
28-
class PassThroughPromiseToListenableFutureAdapter<T> extends AbstractPromiseToListenableFutureAdapter<T, T> {
28+
public class PassThroughPromiseToListenableFutureAdapter<T> extends AbstractPromiseToListenableFutureAdapter<T, T> {
2929

3030

3131
public PassThroughPromiseToListenableFutureAdapter(Promise<T> promise) {

0 commit comments

Comments
 (0)