Skip to content

#815 Upgrade and fix issues for new Reactor version. #828

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static void main(String[] args) throws Exception {
System.out.println("Going to publish message : " + message);
}
BulkPublishResponse<?> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages)
.subscriberContext(getReactorContext()).block();
.contextWrite(getReactorContext()).block();
System.out.println("Published the set of messages in a single call to Dapr");
if (res != null) {
if (res.getFailedEntries().size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static void main(String[] args) throws Exception {
client.publishEvent(
PUBSUB_NAME,
TOPIC_NAME,
message).subscriberContext(getReactorContext()).block();
message).contextWrite(getReactorContext()).block();
System.out.println("Published message: " + message);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static void main(String[] args) throws Exception {
InvokeMethodRequest sleepRequest = new InvokeMethodRequest(SERVICE_APP_ID, "proxy_sleep")
.setHttpExtension(HttpExtension.POST);
return client.invokeMethod(sleepRequest, TypeRef.get(Void.class));
}).subscriberContext(getReactorContext()).block();
}).contextWrite(getReactorContext()).block();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Mono<byte[]> echo(
InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "echo")
.setBody(body)
.setHttpExtension(HttpExtension.POST);
return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context));
return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context));
}

/**
Expand All @@ -71,7 +71,7 @@ public Mono<byte[]> echo(
public Mono<Void> sleep(@RequestAttribute(name = "opentelemetry-context") Context context) {
InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "sleep")
.setHttpExtension(HttpExtension.POST);
return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context)).then();
return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context)).then();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
Expand Down Expand Up @@ -65,7 +66,7 @@ public Mono<byte[]> invoke(String actorType, String actorId, String methodName,
.setMethod(methodName)
.setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload))
.build();
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.InvokeActorResponse>createMono(
it -> intercept(context, client).invokeActor(req, it)
)
Expand Down Expand Up @@ -109,7 +110,7 @@ public void start(final Listener<RespT> responseListener, final Metadata metadat
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) {
private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
return GrpcWrapper.intercept(context, client);
}

Expand Down
2 changes: 1 addition & 1 deletion sdk-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.11.RELEASE</version>
<version>3.5.2</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.11.RELEASE</version>
<version>3.4.26</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
Expand Down
50 changes: 21 additions & 29 deletions sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -181,11 +182,8 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
envelopeBuilder.putAllMetadata(metadata);
}

return Mono.subscriberContext().flatMap(
context ->
this.<Empty>createMono(
it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)
)
return Mono.deferContextual(context -> this.<Empty>createMono(
it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it))
).then();
} catch (Exception ex) {
return DaprException.wrapMono(ex);
Expand Down Expand Up @@ -254,8 +252,7 @@ public <T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> requ
for (BulkPublishEntry<T> entry: request.getEntries()) {
entryMap.put(entry.getEntryId(), entry);
}
return Mono.subscriberContext().flatMap(
context ->
return Mono.deferContextual(context ->
this.<DaprProtos.BulkPublishResponse>createMono(
it -> intercept(context, asyncStub).bulkPublishEventAlpha1(envelopeBuilder.build(), it)
)
Expand Down Expand Up @@ -298,8 +295,8 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
// gRPC to gRPC does not handle metadata in Dapr runtime proto.
// gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342

return Mono.subscriberContext().flatMap(
context -> this.<CommonProtos.InvokeResponse>createMono(
return Mono.deferContextual(context ->
this.<CommonProtos.InvokeResponse>createMono(
it -> intercept(context, asyncStub).invokeService(envelope, it)
)
).flatMap(
Expand Down Expand Up @@ -345,8 +342,7 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
}
DaprProtos.InvokeBindingRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
return Mono.deferContextual(context -> this.<DaprProtos.InvokeBindingResponse>createMono(
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
)
).flatMap(
Expand Down Expand Up @@ -392,8 +388,7 @@ public <T> Mono<State<T>> getState(GetStateRequest request, TypeRef<T> type) {

DaprProtos.GetStateRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
context ->
return Mono.deferContextual(context ->
this.<DaprProtos.GetStateResponse>createMono(
it -> intercept(context, asyncStub).getState(envelope, it)
)
Expand Down Expand Up @@ -441,8 +436,8 @@ public <T> Mono<List<State<T>>> getBulkState(GetBulkStateRequest request, TypeRe

DaprProtos.GetBulkStateRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
context -> this.<DaprProtos.GetBulkStateResponse>createMono(it -> intercept(context, asyncStub)
return Mono.deferContextual(context ->
this.<DaprProtos.GetBulkStateResponse>createMono(it -> intercept(context, asyncStub)
.getBulkState(envelope, it)
)
).map(
Expand Down Expand Up @@ -525,7 +520,7 @@ public Mono<Void> executeStateTransaction(ExecuteStateTransactionRequest request
}
DaprProtos.ExecuteStateTransactionRequest req = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).executeStateTransaction(req, it))
).then();
} catch (Exception e) {
Expand All @@ -551,7 +546,7 @@ public Mono<Void> saveBulkState(SaveStateRequest request) {
}
DaprProtos.SaveStateRequest req = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).saveState(req, it))
).then();
} catch (Exception ex) {
Expand Down Expand Up @@ -635,8 +630,8 @@ public Mono<Void> deleteState(DeleteStateRequest request) {

DaprProtos.DeleteStateRequest req = builder.build();

return Mono.subscriberContext().flatMap(
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).deleteState(req, it))
return Mono.deferContextual(context ->
this.<Empty>createMono(it -> intercept(context, asyncStub).deleteState(req, it))
).then();
} catch (Exception ex) {
return DaprException.wrapMono(ex);
Expand Down Expand Up @@ -713,7 +708,7 @@ public Mono<Map<String, String>> getSecret(GetSecretRequest request) {
}
DaprProtos.GetSecretRequest req = requestBuilder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.GetSecretResponse>createMono(it -> intercept(context, asyncStub).getSecret(req, it))
).map(DaprProtos.GetSecretResponse::getDataMap);
}
Expand All @@ -738,9 +733,8 @@ public Mono<Map<String, Map<String, String>>> getBulkSecret(GetBulkSecretRequest

DaprProtos.GetBulkSecretRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
context ->
this.<DaprProtos.GetBulkSecretResponse>createMono(
return Mono.deferContextual(context ->
this.<DaprProtos.GetBulkSecretResponse>createMono(
it -> intercept(context, asyncStub).getBulkSecret(envelope, it)
)
).map(it -> {
Expand Down Expand Up @@ -791,8 +785,7 @@ public <T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Typ

DaprProtos.QueryStateRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
context -> this.<DaprProtos.QueryStateResponse>createMono(
return Mono.deferContextual(context -> this.<DaprProtos.QueryStateResponse>createMono(
it -> intercept(context, asyncStub).queryStateAlpha1(envelope, it)
)
).map(
Expand Down Expand Up @@ -855,7 +848,7 @@ public void close() throws Exception {
*/
@Override
public Mono<Void> shutdown() {
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<Empty>createMono(
it -> intercept(context, asyncStub).shutdown(Empty.getDefaultInstance(), it))
).then();
Expand Down Expand Up @@ -889,8 +882,7 @@ public Mono<Map<String, ConfigurationItem>> getConfiguration(GetConfigurationReq
}

private Mono<Map<String, ConfigurationItem>> getConfigurationAlpha1(DaprProtos.GetConfigurationRequest envelope) {
return Mono.subscriberContext().flatMap(
context ->
return Mono.deferContextual(context ->
this.<DaprProtos.GetConfigurationResponse>createMono(
it -> intercept(context, asyncStub).getConfigurationAlpha1(envelope, it)
)
Expand Down Expand Up @@ -1034,7 +1026,7 @@ public void start(final Listener<RespT> responseListener, final Metadata metadat
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) {
private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
return GrpcWrapper.intercept(context, client);
}

Expand Down
31 changes: 16 additions & 15 deletions sdk/src/main/java/io/dapr/client/DaprClientHttp.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -177,7 +178,7 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "publish", pubsubName, topic };

Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.client.invokeApi(
DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, serializedEvent, headers, context
)
Expand Down Expand Up @@ -237,7 +238,7 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
} else {
headers.put(Metadata.CONTENT_TYPE, objectSerializer.getContentType());
}
Mono<DaprHttp.Response> response = Mono.subscriberContext().flatMap(
Mono<DaprHttp.Response> response = Mono.deferContextual(
context -> this.client.invokeApi(httpMethod, pathSegments.toArray(new String[0]),
httpExtension.getQueryParams(), serializedRequestBody, headers, context)
);
Expand Down Expand Up @@ -309,7 +310,7 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)

String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "bindings", name };

Mono<DaprHttp.Response> response = Mono.subscriberContext().flatMap(
Mono<DaprHttp.Response> response = Mono.deferContextual(
context -> this.client.invokeApi(
httpMethod, pathSegments, null, payload, null, context)
);
Expand Down Expand Up @@ -349,7 +350,7 @@ public <T> Mono<List<State<T>>> getBulkState(GetBulkStateRequest request, TypeRe
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, "bulk" };

Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.client
.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, requestBody, null, context)
).flatMap(s -> {
Expand Down Expand Up @@ -394,7 +395,7 @@ public <T> Mono<State<T>> getState(GetStateRequest request, TypeRef<T> type) {

String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, key };

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.client
.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryParams, null, context)
).flatMap(s -> {
Expand Down Expand Up @@ -452,7 +453,7 @@ public Mono<Void> executeStateTransaction(ExecuteStateTransactionRequest request

String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, "transaction" };

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.client.invokeApi(
DaprHttp.HttpMethods.POST.name(), pathSegments, null, serializedOperationBody, null, context
)
Expand Down Expand Up @@ -500,7 +501,7 @@ public Mono<Void> saveBulkState(SaveStateRequest request) {

String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName };

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.client.invokeApi(
DaprHttp.HttpMethods.POST.name(), pathSegments, null, serializedStateBody, null, context)
).then();
Expand Down Expand Up @@ -543,7 +544,7 @@ public Mono<Void> deleteState(DeleteStateRequest request) {

String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, key };

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.client.invokeApi(
DaprHttp.HttpMethods.DELETE.name(), pathSegments, queryParams, headers, context)
).then();
Expand Down Expand Up @@ -631,7 +632,7 @@ public Mono<Map<String, String>> getSecret(GetSecretRequest request) {
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "secrets", secretStoreName, key };

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.client
.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryArgs, (String) null, null, context)
).flatMap(response -> {
Expand Down Expand Up @@ -667,7 +668,7 @@ public Mono<Map<String, Map<String, String>>> getBulkSecret(GetBulkSecretRequest
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "secrets", secretStoreName, "bulk" };

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.client
.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryArgs, (String) null, null, context)
).flatMap(response -> {
Expand Down Expand Up @@ -709,7 +710,7 @@ public <T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Typ
} else {
throw new IllegalArgumentException("Both query and queryString fields are not set.");
}
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.client
.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments,
queryArgs, serializedRequest, null, context)
Expand Down Expand Up @@ -739,7 +740,7 @@ public void close() {
@Override
public Mono<Void> shutdown() {
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "shutdown" };
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> client.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments,
null, null, context))
.then();
Expand Down Expand Up @@ -810,7 +811,7 @@ public Mono<Map<String, ConfigurationItem>> getConfiguration(GetConfigurationReq
queryParams.putAll(queryArgs);

String[] pathSegments = new String[] {DaprHttp.ALPHA_1_API_VERSION, "configuration", configurationStoreName };
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.client
.invokeApi(
DaprHttp.HttpMethods.GET.name(),
Expand Down Expand Up @@ -871,7 +872,7 @@ public Flux<SubscribeConfigurationResponse> subscribeConfiguration(SubscribeConf

String[] pathSegments =
new String[] { DaprHttp.ALPHA_1_API_VERSION, "configuration", configurationStoreName, "subscribe" };
SubscribeConfigurationResponse res = Mono.subscriberContext().flatMap(
SubscribeConfigurationResponse res = Mono.deferContextual(
context -> this.client.invokeApi(
DaprHttp.HttpMethods.GET.name(),
pathSegments, queryParams,
Expand Down Expand Up @@ -913,7 +914,7 @@ public Mono<UnsubscribeConfigurationResponse> unsubscribeConfiguration(Unsubscri
String[] pathSegments = new String[]
{ DaprHttp.ALPHA_1_API_VERSION, "configuration", configStoreName, id, "unsubscribe" };

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.client
.invokeApi(
DaprHttp.HttpMethods.GET.name(),
Expand Down
Loading