Skip to content

Commit 9653a2e

Browse files
Merge pull request ReactiveX#3 from g9yuayon/remoting-local
Remoting local
2 parents 3247288 + a7257d6 commit 9653a2e

File tree

13 files changed

+435
-74
lines changed

13 files changed

+435
-74
lines changed

rxjava-contrib/rxjava-netty/src/examples/groovy/rx/netty/examples/EchoServer.groovy

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,28 @@ public class EchoServer {
1515
public static void main(String[] args) {
1616
RxNetty.createTcpServer(8181)
1717
// process each connection in parallel
18-
.parallel({ Observable<TcpConnection> o ->
18+
.parallel({ Observable<TcpConnection<String, String>> o ->
1919
// for each connection
20-
return o.flatMap({ TcpConnection connection ->
20+
return o.flatMap({ TcpConnection<String, String> connection ->
2121
// for each message we receive on the connection
22-
return connection.getChannelObservable().map({ ByteBuf bb ->
23-
String msg = bb.toString(Charset.forName("UTF8")).trim();
24-
return new ReceivedMessage(connection, msg);
22+
return connection.getChannelObservable().map({ String bb ->
23+
return new ReceivedMessage<String>(connection, msg.trim());
2524
});
2625
});
2726
})
28-
.toBlockingObservable().forEach({ ReceivedMessage receivedMessage ->
27+
.toBlockingObservable().forEach({ ReceivedMessage<String> receivedMessage ->
2928
receivedMessage.connection.write("Echo => " + receivedMessage.message + "\n");
3029
System.out.println("Received Message: " + receivedMessage.message);
3130
});
3231
}
3332

34-
def static class ReceivedMessage {
33+
def static class ReceivedMessage<I> {
3534
// I want value types
3635

37-
final TcpConnection connection;
36+
final TcpConnection<I, String> connection;
3837
final String message;
3938

40-
public ReceivedMessage(TcpConnection connection, String message) {
39+
public ReceivedMessage(TcpConnection<I, String> connection, String message) {
4140
this.connection = connection;
4241
this.message = message;
4342
}

rxjava-contrib/rxjava-netty/src/examples/groovy/rx/netty/examples/IntervalClientWithDisconnect.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class IntervalClientWithDisconnect {
1818

1919
public void run() {
2020
RxNetty.createTcpClient("localhost", 8181)
21-
.flatMap({ TcpConnection connection ->
21+
.flatMap({ TcpConnection<ByteBuf, String> connection ->
2222
System.out.println("received connection: " + connection);
2323

2424
Observable<String> subscribeMessage = connection.write("subscribe:")

rxjava-contrib/rxjava-netty/src/examples/groovy/rx/netty/examples/IntervalServer.groovy

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,11 @@ class IntervalServer {
2121
public static Observable<String> createServer(final int port) {
2222
return RxNetty.createTcpServer(port)
2323
// process each connection in parallel
24-
.parallel({ Observable<TcpConnection> o ->
24+
.parallel({ Observable<TcpConnection<String, String>> o ->
2525
// for each connection
26-
return o.flatMap({ TcpConnection connection ->
26+
return o.flatMap({ TcpConnection<String, String> connection ->
2727
// for each message we receive on the connection
28-
return connection.getChannelObservable().map({ ByteBuf bb ->
29-
String msg = bb.toString(Charset.forName("UTF8")).trim();
28+
return connection.getChannelObservable().map({ String msg ->
3029
if (msg.startsWith("subscribe:")) {
3130
System.out.println("-------------------------------------");
3231
System.out.println("Received 'subscribe' from client so starting interval ...");
@@ -48,7 +47,7 @@ class IntervalServer {
4847
});
4948
}
5049

51-
public static Subscription startInterval(final TcpConnection connection) {
50+
public static Subscription startInterval(final TcpConnection<String, String> connection) {
5251
return Observable.interval(1000, TimeUnit.MILLISECONDS)
5352
.flatMap({ Long interval ->
5453
System.out.println("Writing interval: " + interval);
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package rx.netty.examples;
2+
3+
import io.netty.buffer.ByteBuf;
4+
5+
import java.nio.charset.Charset;
6+
7+
import rx.Observable;
8+
import rx.netty.experimental.RxNetty;
9+
import rx.netty.experimental.impl.TcpConnection;
10+
import rx.util.functions.Action1;
11+
import rx.util.functions.Func1;
12+
13+
public class EchoServer {
14+
15+
public static void main(String[] args) {
16+
RxNetty.createTcpServer(8181)
17+
// process each connection in parallel
18+
.parallel(new Func1<Observable<TcpConnection<String, String>>, Observable<ReceivedMessage<String>>>() {
19+
20+
@Override
21+
public Observable<ReceivedMessage<String>> call(Observable<TcpConnection<String, String>> o) {
22+
// for each connection
23+
return o.flatMap(new Func1<TcpConnection<String, String>, Observable<ReceivedMessage<String>>>() {
24+
25+
@Override
26+
public Observable<ReceivedMessage<String>> call(final TcpConnection<String, String> connection) {
27+
// for each message we receive on the connection
28+
return connection.getChannelObservable().map(new Func1<String, ReceivedMessage<String>>() {
29+
30+
@Override
31+
public ReceivedMessage<String> call(String msg) {
32+
return new ReceivedMessage<String>(connection, msg.trim());
33+
}
34+
35+
});
36+
}
37+
38+
});
39+
40+
}
41+
})
42+
.toBlockingObservable().forEach(new Action1<ReceivedMessage<String>>() {
43+
44+
@Override
45+
public void call(ReceivedMessage<String> receivedMessage) {
46+
receivedMessage.connection.write("Echo => " + receivedMessage.message + "\n");
47+
System.out.println("Received Message: " + receivedMessage.message);
48+
}
49+
});
50+
}
51+
52+
public static class ReceivedMessage<I> {
53+
// I want tuples in java
54+
55+
final TcpConnection<I, String> connection;
56+
final String message;
57+
58+
public ReceivedMessage(TcpConnection<I, String> connection, String message) {
59+
this.connection = connection;
60+
this.message = message;
61+
}
62+
}
63+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package rx.netty.examples;
2+
3+
import io.netty.buffer.ByteBuf;
4+
5+
import java.nio.charset.Charset;
6+
7+
import rx.Observable;
8+
import rx.netty.experimental.RxNetty;
9+
import rx.netty.experimental.impl.TcpConnection;
10+
import rx.util.functions.Action1;
11+
import rx.util.functions.Func1;
12+
13+
public class IntervalClientWithDisconnect {
14+
15+
public static void main(String[] args) {
16+
new IntervalClientWithDisconnect().run();
17+
}
18+
19+
public void run() {
20+
Observable<TcpConnection<ByteBuf, String>> client = RxNetty.createTcpClient("localhost", 8181);
21+
22+
client.flatMap(new Func1<TcpConnection<ByteBuf, String>, Observable<String>>() {
23+
24+
@Override
25+
public Observable<String> call(TcpConnection<ByteBuf, String> connection) {
26+
27+
System.out.println("received connection: " + connection);
28+
29+
Observable<String> subscribeMessage = connection.write("subscribe:")
30+
// the intent of the flatMap to string is so onError can
31+
// be propagated via the concat below
32+
.flatMap(new Func1<Void, Observable<String>>() {
33+
34+
@Override
35+
public Observable<String> call(Void t1) {
36+
System.out.println("Send subscribe!");
37+
return Observable.empty();
38+
}
39+
40+
});
41+
42+
Observable<String> messageHandling = connection.getChannelObservable().map(new Func1<ByteBuf, String>() {
43+
44+
@Override
45+
public String call(ByteBuf bb) {
46+
return bb.toString(Charset.forName("UTF8")).trim();
47+
}
48+
49+
});
50+
51+
return Observable.concat(subscribeMessage, messageHandling);
52+
}
53+
})
54+
.take(10)
55+
.toBlockingObservable().forEach(new Action1<String>() {
56+
57+
@Override
58+
public void call(String v) {
59+
System.out.println("Received: " + v);
60+
}
61+
62+
});
63+
64+
}
65+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package rx.netty.examples;
2+
3+
import io.netty.buffer.ByteBuf;
4+
5+
import java.nio.charset.Charset;
6+
import java.util.concurrent.TimeUnit;
7+
8+
import rx.Notification;
9+
import rx.Observable;
10+
import rx.Subscription;
11+
import rx.netty.experimental.RxNetty;
12+
import rx.netty.experimental.impl.TcpConnection;
13+
import rx.util.functions.Action0;
14+
import rx.util.functions.Action1;
15+
import rx.util.functions.Func1;
16+
17+
public class IntervalServer {
18+
19+
public static void main(String[] args) {
20+
createServer(8181).toBlockingObservable().last();
21+
}
22+
23+
public static Observable<String> createServer(final int port) {
24+
return RxNetty.createTcpServer(port)
25+
// process each connection in parallel
26+
.parallel(new Func1<Observable<TcpConnection<String, String>>, Observable<String>>() {
27+
28+
@Override
29+
public Observable<String> call(Observable<TcpConnection<String, String>> o) {
30+
// for each connection
31+
return o.flatMap(new Func1<TcpConnection<String, String>, Observable<String>>() {
32+
33+
@Override
34+
public Observable<String> call(final TcpConnection<String, String> connection) {
35+
// for each message we receive on the connection
36+
return connection.getChannelObservable().map(new Func1<String, String>() {
37+
38+
@Override
39+
public String call(String msg) {
40+
msg = msg.trim();
41+
if (msg.startsWith("subscribe:")) {
42+
System.out.println("-------------------------------------");
43+
System.out.println("Received 'subscribe' from client so starting interval ...");
44+
// TODO how can we do this with startInterval returning an Observable instead of subscription?
45+
connection.addSubscription(startInterval(connection));
46+
} else if (msg.startsWith("unsubscribe:")) {
47+
System.out.println("Received 'unsubscribe' from client so stopping interval ...");
48+
connection.unsubscribe();
49+
} else {
50+
if (!msg.isEmpty()) {
51+
connection.write("\nERROR => Unknown command: " + msg + "\nCommands => subscribe:, unsubscribe:\n");
52+
}
53+
}
54+
55+
return msg;
56+
}
57+
58+
});
59+
}
60+
61+
});
62+
63+
}
64+
});
65+
}
66+
67+
private static Subscription startInterval(final TcpConnection<String, String> connection) {
68+
return Observable.interval(1000, TimeUnit.MILLISECONDS)
69+
.flatMap(new Func1<Long, Observable<Notification<Void>>>() {
70+
71+
@Override
72+
public Observable<Notification<Void>> call(Long interval) {
73+
System.out.println("Writing interval: " + interval);
74+
// emit the interval to the output and return the notification received from it
75+
return connection.write("interval => " + interval + "\n").materialize();
76+
}
77+
})
78+
.takeWhile(new Func1<Notification<Void>, Boolean>() {
79+
80+
@Override
81+
public Boolean call(Notification<Void> n) {
82+
// unsubscribe from interval if we receive an error
83+
return !n.isOnError();
84+
}
85+
})
86+
.subscribe(new Action1<Notification<Void>>() {
87+
88+
@Override
89+
public void call(Notification<Void> interval) {
90+
// do nothing
91+
}
92+
}, new Action1<Throwable>() {
93+
94+
@Override
95+
public void call(Throwable cause) {
96+
System.out.println("Interval stopped: " + cause);
97+
}
98+
99+
}, new Action0() {
100+
101+
@Override
102+
public void call() {
103+
System.out.println("Connection closed");
104+
}
105+
106+
});
107+
}
108+
109+
}

rxjava-contrib/rxjava-netty/src/main/java/rx/netty/experimental/RxNetty.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.netty.experimental;
1717

18+
import io.netty.buffer.ByteBuf;
1819
import io.netty.channel.EventLoopGroup;
1920
import io.netty.channel.nio.NioEventLoopGroup;
2021

@@ -25,6 +26,8 @@
2526
import rx.netty.experimental.impl.NettyClient;
2627
import rx.netty.experimental.impl.NettyServer;
2728
import rx.netty.experimental.impl.TcpConnection;
29+
import rx.netty.experimental.protocol.ProtocolHandler;
30+
import rx.netty.experimental.protocol.ProtocolHandlers;
2831

2932
public class RxNetty {
3033

@@ -45,21 +48,37 @@ public Thread newThread(Runnable r) {
4548
private static NioEventLoopGroup WORKER = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), THREAD_FACTORY);
4649
}
4750

48-
public static Observable<TcpConnection> createTcpServer(final int port, final EventLoopGroup acceptorEventLoops, final EventLoopGroup workerEventLoops) {
49-
return NettyServer.createServer(port, acceptorEventLoops, workerEventLoops);
51+
public static Observable<TcpConnection<String, String>> createTcpServer(final int port, final EventLoopGroup acceptorEventLoops, final EventLoopGroup workerEventLoops) {
52+
return NettyServer.createServer(port, acceptorEventLoops, workerEventLoops, ProtocolHandlers.stringCodec());
5053
}
5154

52-
public static Observable<TcpConnection> createTcpServer(int port) {
55+
public static Observable<TcpConnection<String, String>> createTcpServer(int port) {
5356
return createTcpServer(port, DEFAULT_EVENT_LOOPS.ACCEPTOR, DEFAULT_EVENT_LOOPS.WORKER);
5457
}
5558

56-
public static Observable<TcpConnection> createTcpClient(final String host, final int port, final EventLoopGroup eventLoops) {
57-
return NettyClient.createClient(host, port, eventLoops);
59+
public static <I, O> Observable<TcpConnection<I, O>> createTcpServer(int port, ProtocolHandler<I, O> handler) {
60+
return createTcpServer(port, DEFAULT_EVENT_LOOPS.ACCEPTOR, DEFAULT_EVENT_LOOPS.WORKER, handler);
5861
}
5962

60-
public static Observable<TcpConnection> createTcpClient(String host, int port) {
61-
return createTcpClient(host, port, DEFAULT_EVENT_LOOPS.WORKER);
63+
public static <I, O> Observable<TcpConnection<I, O>> createTcpServer(
64+
final int port,
65+
final EventLoopGroup acceptorEventLoops,
66+
final EventLoopGroup workerEventLoops,
67+
ProtocolHandler<I, O> handler) {
6268

69+
return NettyServer.createServer(port, acceptorEventLoops, workerEventLoops, handler);
6370
}
6471

72+
public static Observable<TcpConnection<ByteBuf, String>> createTcpClient(final String host, final int port, final EventLoopGroup eventLoops) {
73+
return NettyClient.createClient(host, port, eventLoops, ProtocolHandlers.commandOnlyHandler());
74+
}
75+
76+
public static Observable<TcpConnection<ByteBuf, String>> createTcpClient(String host, int port) {
77+
return RxNetty.createTcpClient(host, port, DEFAULT_EVENT_LOOPS.WORKER);
78+
79+
}
80+
81+
public static <I, O> Observable<TcpConnection<I, O>> createTcpClient(String host, int port, ProtocolHandler<I, O> handler) {
82+
return NettyClient.createClient(host, port, DEFAULT_EVENT_LOOPS.WORKER, handler);
83+
}
6584
}

0 commit comments

Comments
 (0)