Skip to content

Commit ce37dd2

Browse files
authored
Merge branch 'master' into actor_example_update
2 parents 216ec35 + 838224f commit ce37dd2

File tree

7 files changed

+124
-193
lines changed

7 files changed

+124
-193
lines changed

examples/src/main/java/io/dapr/examples/invoke/grpc/HelloWorldClient.java

Lines changed: 18 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -5,140 +5,37 @@
55

66
package io.dapr.examples.invoke.grpc;
77

8-
import static io.dapr.examples.DaprExamplesProtos.SayRequest;
9-
import static io.dapr.examples.DaprExamplesProtos.SayResponse;
10-
11-
import com.google.common.util.concurrent.ListenableFuture;
12-
import com.google.protobuf.Any;
13-
import com.google.protobuf.InvalidProtocolBufferException;
14-
import io.dapr.DaprGrpc;
15-
import io.dapr.DaprProtos.InvokeServiceEnvelope;
16-
import io.dapr.DaprProtos.InvokeServiceResponseEnvelope;
17-
import io.grpc.ManagedChannel;
18-
import io.grpc.ManagedChannelBuilder;
19-
import java.util.ArrayList;
20-
import java.util.List;
21-
import java.util.concurrent.ExecutionException;
22-
import java.util.concurrent.TimeUnit;
23-
import org.apache.commons.cli.CommandLine;
24-
import org.apache.commons.cli.CommandLineParser;
25-
import org.apache.commons.cli.DefaultParser;
26-
import org.apache.commons.cli.Options;
27-
8+
import io.dapr.client.DaprClient;
9+
import io.dapr.client.DaprClientBuilder;
10+
import io.dapr.client.domain.Verb;
2811

2912
/**
3013
* 1. Build and install jars:
3114
* mvn clean install
3215
* 2. Send messages to the server:
33-
* dapr run --protocol grpc --grpc-port 50001 -- mvn exec:java -pl=examples \
34-
* -Dexec.mainClass=io.dapr.examples.invoke.grpc.HelloWorldClient \
35-
* -Dexec.args="-p 50001 'message one' 'message two'"
16+
* dapr run -- mvn exec:java -pl=examples -D exec.mainClass=io.dapr.examples.invoke.grpc.HelloWorldClient
3617
*/
3718
public class HelloWorldClient {
3819

3920
/**
40-
* Client mode: class representing a client-side logic for calling HelloWorld over Dapr.
41-
*/
42-
private static class GrpcHelloWorldDaprClient {
43-
44-
/**
45-
* Client communication channel: host, port and tls(on/off).
46-
*/
47-
private final ManagedChannel channel;
48-
49-
/**
50-
* Calls will be done asynchronously.
51-
*/
52-
private final DaprGrpc.DaprFutureStub client;
53-
54-
/**
55-
* Creates a Grpc client for the DaprGrpc service.
56-
*
57-
* @param host host for the remote service endpoint
58-
* @param port port for the remote service endpoint
59-
*/
60-
public GrpcHelloWorldDaprClient(String host, int port) {
61-
this(ManagedChannelBuilder
62-
.forAddress("localhost", port)
63-
.usePlaintext() // SSL/TLS is default, we turn it off just because this is a sample and not prod.
64-
.build());
65-
}
66-
67-
/**
68-
* Helper constructor to build client from channel.
69-
*
70-
* @param channel The ManagedChannel.
71-
*/
72-
private GrpcHelloWorldDaprClient(ManagedChannel channel) {
73-
this.channel = channel;
74-
this.client = DaprGrpc.newFutureStub(channel);
75-
}
76-
77-
/**
78-
* Client mode: sends messages, one per second.
79-
*
80-
* @param messages The messages to send.
81-
*/
82-
private void sendMessages(String... messages)
83-
throws ExecutionException, InterruptedException, InvalidProtocolBufferException {
84-
List<ListenableFuture<InvokeServiceResponseEnvelope>> futureResponses = new ArrayList<>();
85-
for (String message : messages) {
86-
SayRequest request = SayRequest
87-
.newBuilder()
88-
.setMessage(message)
89-
.build();
90-
91-
// Now, wrap the request with Dapr's envelope.
92-
InvokeServiceEnvelope requestEnvelope = InvokeServiceEnvelope
93-
.newBuilder()
94-
.setId("hellogrpc") // Service's identifier.
95-
.setData(Any.pack(request))
96-
.setMethod("say") // The service's method to be invoked by Dapr.
97-
.build();
98-
99-
futureResponses.add(client.invokeService(requestEnvelope));
100-
System.out.println("Client: sent => " + message);
101-
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
102-
}
103-
104-
for (ListenableFuture<InvokeServiceResponseEnvelope> future : futureResponses) {
105-
Any data = future.get().getData(); // Blocks waiting for response.
106-
// IMPORTANT: do not use Any.unpack(), use Type.ParseFrom() instead.
107-
SayResponse response = SayResponse.parseFrom(data.getValue());
108-
System.out.println("Client: got response => " + response.getTimestamp());
109-
}
110-
}
111-
112-
/**
113-
* Client mode: gracefully shutdown client within 1 min, otherwise force it.
114-
*
115-
* @throws InterruptedException Propagated interrupted exception.
116-
*/
117-
private void shutdown() throws InterruptedException {
118-
this.channel.shutdown().awaitTermination(1, TimeUnit.MINUTES);
119-
System.out.println("Client: Bye.");
120-
}
121-
122-
}
123-
124-
/**
125-
* The main method of this app.
21+
* The main method of the client app.
12622
*
127-
* @param args Args representing the port the app will listen on.
128-
* @throws Exception An Exception.
23+
* @param args Array of messages to be sent.
12924
*/
130-
public static void main(String[] args) throws Exception {
131-
Options options = new Options();
132-
options.addRequiredOption("p", "port", true, "Port to listen or send event to.");
25+
public static void main(String[] args) throws InterruptedException {
26+
DaprClient client = new DaprClientBuilder().build();
13327

134-
CommandLineParser parser = new DefaultParser();
135-
CommandLine cmd = parser.parse(options, args);
28+
String serviceAppId = "hellogrpc";
29+
String method = "say";
13630

137-
// If port string is not valid, it will throw an exception.
138-
int port = Integer.parseInt(cmd.getOptionValue("port"));
31+
int count = 0;
32+
while (true) {
33+
String message = "Message #" + (count++);
34+
System.out.println("Sending message: " + message);
35+
client.invokeService(Verb.POST, serviceAppId, method, message).block();
36+
System.out.println("Message sent: " + message);
13937

140-
GrpcHelloWorldDaprClient helloWorldClient = new GrpcHelloWorldDaprClient("localhost", port);
141-
helloWorldClient.sendMessages(cmd.getArgs());
142-
helloWorldClient.shutdown();
38+
Thread.sleep(1000);
39+
}
14340
}
14441
}

examples/src/main/java/io/dapr/examples/invoke/grpc/HelloWorldService.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
* mvn clean install
3131
* 2. Run in server mode:
3232
* dapr run --app-id hellogrpc --app-port 5000 --protocol grpc \
33-
* -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.invoke.grpc.HelloWorldService \
34-
* -Dexec.args="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5009"
33+
* -- mvn exec:java -pl=examples -D exec.mainClass=io.dapr.examples.invoke.grpc.HelloWorldService \
34+
* -D exec.args="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5009"
3535
*/
3636
public class HelloWorldService {
3737

@@ -97,14 +97,11 @@ private void awaitTermination() throws InterruptedException {
9797
public void onInvoke(DaprClientProtos.InvokeEnvelope request, StreamObserver<Any> responseObserver) {
9898
try {
9999
if ("say".equals(request.getMethod())) {
100-
// IMPORTANT: do not use Any.unpack(), use Type.ParseFrom() instead.
101-
SayRequest sayRequest = SayRequest.parseFrom(request.getData().getValue());
100+
SayRequest sayRequest =
101+
SayRequest.newBuilder().setMessage(request.getData().getValue().toStringUtf8()).build();
102102
SayResponse sayResponse = this.say(sayRequest);
103103
responseObserver.onNext(Any.pack(sayResponse));
104104
}
105-
} catch (InvalidProtocolBufferException e) {
106-
e.printStackTrace();
107-
responseObserver.onError(e);
108105
} finally {
109106
responseObserver.onCompleted();
110107
}

examples/src/main/java/io/dapr/examples/invoke/grpc/README.md

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -70,61 +70,49 @@ In the `GrpcHelloWorldDaprService` class, the `onInvoke` method is the most impo
7070
Now run the service code:
7171

7272
```sh
73-
dapr run --app-id hellogrpc --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.invoke.grpc.HelloWorldService -Dexec.args="-p 5000"
73+
dapr run --app-id hellogrpc --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -D exec.mainClass=io.dapr.examples.invoke.grpc.HelloWorldService -D exec.args="-p 5000"
7474
```
7575

76-
The `app-id` argument is used to identify this service in Dapr's runtime. The `app-port` determines which port Dapr's runtime should call into this service. The `protocol` argument informs Dapr which protocol it should use: `grpc` or `http`(default).
76+
The `app-id` argument is used to identify this service in Dapr's runtime. The `app-port` determines which port Dapr's runtime should call into this service. The `protocol` argument informs Dapr which protocol it should use to invoke the application: `grpc` or `http`(default).
7777

7878
### Running the example's client
7979

80-
The other component is the client. It will take in the messages via command line arguments and send each one to the service via Dapr's invoke API over Grpc. Open the `HelloWorldClient.java` file, it contains the `HelloWorldClient` class with the main method and also the `GrpcHelloWorldDaprClient` class. The `GrpcHelloWorldDaprClient` encapsulates an instance of the `DaprFutureStub` class because it is calling Dapr's API. Creating a client to call `HelloWorldService` directly can be an exercise for the reader. In the `GrpcHelloWorldDaprClient` class, the most important method is `sendMessages`. See the code snippet below:
80+
The other component is the client. It will send one message per second to the service via Dapr's invoke API using Dapr's SDK. Open the `HelloWorldClient.java` file, it uses the Dapr's Java SDK to invoke the `say` method on the service above:
8181

8282
```java
83-
private static class GrpcHelloWorldDaprClient {
83+
private static class HelloWorldClient {
8484
///...
85-
private void sendMessages(String... messages) throws ExecutionException, InterruptedException, InvalidProtocolBufferException {
86-
List<ListenableFuture<InvokeServiceResponseEnvelope>> futureResponses = new ArrayList<>();
87-
for (String message : messages)
88-
{
89-
SayRequest request = SayRequest
90-
.newBuilder()
91-
.setMessage(message)
92-
.build();
93-
94-
// Now, wrap the request with Dapr's envelope.
95-
InvokeServiceEnvelope requestEnvelope = InvokeServiceEnvelope
96-
.newBuilder()
97-
.setId("hellogrpc") // Service's identifier.
98-
.setData(Any.pack(request))
99-
.setMethod("say") // The service's method to be invoked by Dapr.
100-
.build();
101-
102-
futureResponses.add(client.invokeService(requestEnvelope));
103-
System.out.println("Client: sent => " + message);
104-
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
105-
}
106-
107-
for (ListenableFuture<InvokeServiceResponseEnvelope> future : futureResponses) {
108-
Any data = future.get().getData(); // Blocks waiting for response.
109-
// IMPORTANT: do not use Any.unpack(), use Type.ParseFrom() instead.
110-
SayResponse response = SayResponse.parseFrom(data.getValue());
111-
System.out.println("Client: got response => " + response.getTimestamp());
112-
}
113-
}
85+
public static void main(String[] args) {
86+
DaprClient client = new DaprClientBuilder().build();
87+
88+
String serviceAppId = "hellogrpc";
89+
String method = "say";
90+
91+
int count = 0;
92+
while (true) {
93+
String message = "Message #" + (count++);
94+
System.out.println("Sending message: " + message);
95+
client.invokeService(Verb.POST, serviceAppId, method, message).block();
96+
System.out.println("Message sent: " + message);
97+
98+
Thread.sleep(1000);
99+
}
100+
}
101+
}
114102
///...
115103
}
116104
```
117105

118-
First, it goes through each message and creates a corresponding `SayRequest` object as if it would call the `HelloWorld` service directly. Then, the request object is wrapped into an instance of `InvokeServiceEnvelope`. As expected, the enveloped request is sent to Dapr's `invokeService` method. Once all responses are completed, they are unwrapped into `SayResponse` objects.
106+
First, it creates an instance of `DaprClient` via `DaprClientBuilder`. The protocol used by DaprClient is transparent to the application. The HTTP and GRPC ports used by Dapr's sidecar are automatically chosen and exported as environment variables: `DAPR_HTTP_PORT` and `DAPR_GRPC_PORT`. Dapr's Java SDK references these environment variables when communicating to Dapr's sidecar.
107+
108+
Finally, it will go through in an infinite loop and invoke the `say` method every second. Notice the use of `block()` on the return from `invokeService` - it is required to actually make the service invocation via a [Mono](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html) object.
119109

120-
Finally, open a new command line terminal and run the client code to send some messages. Feel free to play with the command line to send different messages:
110+
Finally, open a new command line terminal and run the client code to send some messages.
121111

122112
```sh
123-
dapr run --protocol grpc --grpc-port 50001 -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.invoke.grpc.HelloWorldClient -Dexec.args="-p 50001 'message one' 'message two'"
113+
dapr run -- mvn exec:java -pl=examples -D exec.mainClass=io.dapr.examples.invoke.grpc.HelloWorldClient
124114
```
125115

126116
Once the messages are sent, use `CTRL+C` to exit Dapr.
127117

128-
The `protocol` argument tells Dapr which protocol to use. In this command, `grpc-port` is specified so Dapr does not pick a random port and uses the requested port instead. The same port is passed in the client executable via the `p` argument. The last arguments into the Java's main method are the messages to be sent.
129-
130118
Thanks for playing.

sdk/src/main/java/io/dapr/client/DaprClient.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,20 @@ public interface DaprClient {
5656
<T, R> Mono<T> invokeService(
5757
Verb verb, String appId, String method, R request, Map<String, String> metadata, Class<T> clazz);
5858

59+
/**
60+
* Invoke a service without metadata, using serialization.
61+
*
62+
* @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value.
63+
* @param appId The Application ID where the service is.
64+
* @param method The actual Method to be call in the application.
65+
* @param request The request to be sent to invoke the service.
66+
* @param clazz the Type needed as return for the call.
67+
* @param <T> the Type of the return, use byte[] to skip serialization.
68+
* @param <R> The Type of the request, use byte[] to skip serialization.
69+
* @return A Mono Plan of type clazz.
70+
*/
71+
<T, R> Mono<T> invokeService(Verb verb, String appId, String method, R request, Class<T> clazz);
72+
5973
/**
6074
* Invoke a service without input, using serialization for response.
6175
*
@@ -82,6 +96,18 @@ <T, R> Mono<T> invokeService(
8296
*/
8397
<R> Mono<Void> invokeService(Verb verb, String appId, String method, R request, Map<String, String> metadata);
8498

99+
/**
100+
* Invoke a service with void response, no metadata and using serialization.
101+
*
102+
* @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value.
103+
* @param appId The Application ID where the service is.
104+
* @param method The actual Method to be call in the application.
105+
* @param request The request to be sent to invoke the service.
106+
* @param <R> The Type of the request, use byte[] to skip serialization.
107+
* @return A Mono plan for Void.
108+
*/
109+
<R> Mono<Void> invokeService(Verb verb, String appId, String method, R request);
110+
85111
/**
86112
* Invoke a service without input and void response.
87113
*

sdk/src/main/java/io/dapr/client/DaprClientGrpc.java

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -125,45 +125,50 @@ public <T, R> Mono<T> invokeService(
125125
*/
126126
@Override
127127
public <T> Mono<T> invokeService(
128-
Verb verb,
129-
String appId,
130-
String method,
131-
Map<String, String> metadata,
132-
Class<T> clazz) {
133-
return this.invokeService(verb, appId, method, null, null, clazz);
128+
Verb verb, String appId, String method, Map<String, String> metadata, Class<T> clazz) {
129+
return this.invokeService(verb, appId, method, null, metadata, clazz);
130+
}
131+
132+
/**
133+
* {@inheritDoc}
134+
*/
135+
@Override
136+
public <T, R> Mono<T> invokeService(Verb verb, String appId, String method, R request, Class<T> clazz) {
137+
return this.invokeService(verb, appId, method, request, null, clazz);
138+
}
139+
140+
/**
141+
* {@inheritDoc}
142+
*/
143+
@Override
144+
public <R> Mono<Void> invokeService(Verb verb, String appId, String method, R request) {
145+
return this.invokeService(verb, appId, method, request, null, byte[].class).then();
134146
}
135147

136148
/**
137149
* {@inheritDoc}
138150
*/
139151
@Override
140152
public <R> Mono<Void> invokeService(
141-
Verb verb,
142-
String appId,
143-
String method,
144-
R request,
145-
Map<String, String> metadata) {
153+
Verb verb, String appId, String method, R request, Map<String, String> metadata) {
146154
return this.invokeService(verb, appId, method, request, metadata, byte[].class).then();
147155
}
148156

149157
/**
150158
* {@inheritDoc}
151159
*/
152160
@Override
153-
public Mono<Void> invokeService(Verb verb, String appId, String method, Map<String, String> metadata) {
154-
return this.invokeService(verb, appId, method, null, metadata, Void.class).then();
161+
public Mono<Void> invokeService(
162+
Verb verb, String appId, String method, Map<String, String> metadata) {
163+
return this.invokeService(verb, appId, method, null, metadata, byte[].class).then();
155164
}
156165

157166
/**
158167
* {@inheritDoc}
159168
*/
160169
@Override
161170
public Mono<byte[]> invokeService(
162-
Verb verb,
163-
String appId,
164-
String method,
165-
byte[] request,
166-
Map<String, String> metadata) {
171+
Verb verb, String appId, String method, byte[] request, Map<String, String> metadata) {
167172
return this.invokeService(verb, appId, method, request, metadata, byte[].class);
168173
}
169174

0 commit comments

Comments
 (0)