Skip to content

Commit 878c643

Browse files
committed
Refactor GraphQlMessage and add GraphQlMessageType enum
See gh-270
1 parent ec00547 commit 878c643

14 files changed

+614
-416
lines changed

spring-graphql/src/main/java/org/springframework/graphql/client/CodecDelegate.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.springframework.core.codec.Encoder;
2121
import org.springframework.core.io.buffer.DataBuffer;
2222
import org.springframework.core.io.buffer.DataBufferUtils;
23-
import org.springframework.graphql.web.webflux.GraphQlWebSocketMessage;
23+
import org.springframework.graphql.web.support.GraphQlMessage;
2424
import org.springframework.http.MediaType;
2525
import org.springframework.http.codec.ClientCodecConfigurer;
2626
import org.springframework.http.codec.CodecConfigurer;
@@ -40,7 +40,7 @@
4040
*/
4141
final class CodecDelegate {
4242

43-
private static final ResolvableType MESSAGE_TYPE = ResolvableType.forClass(GraphQlWebSocketMessage.class);
43+
private static final ResolvableType MESSAGE_TYPE = ResolvableType.forClass(GraphQlMessage.class);
4444

4545

4646
private final CodecConfigurer codecConfigurer;
@@ -84,7 +84,7 @@ public CodecConfigurer getCodecConfigurer() {
8484

8585

8686
@SuppressWarnings("unchecked")
87-
public <T> WebSocketMessage encode(WebSocketSession session, GraphQlWebSocketMessage message) {
87+
public <T> WebSocketMessage encode(WebSocketSession session, GraphQlMessage message) {
8888

8989
DataBuffer buffer = ((Encoder<T>) this.encoder).encodeValue(
9090
(T) message, session.bufferFactory(), MESSAGE_TYPE, MimeTypeUtils.APPLICATION_JSON, null);
@@ -93,9 +93,9 @@ public <T> WebSocketMessage encode(WebSocketSession session, GraphQlWebSocketMes
9393
}
9494

9595
@SuppressWarnings("ConstantConditions")
96-
public GraphQlWebSocketMessage decode(WebSocketMessage webSocketMessage) {
96+
public GraphQlMessage decode(WebSocketMessage webSocketMessage) {
9797
DataBuffer buffer = DataBufferUtils.retain(webSocketMessage.getPayload());
98-
return (GraphQlWebSocketMessage) this.decoder.decode(buffer, MESSAGE_TYPE, null, null);
98+
return (GraphQlMessage) this.decoder.decode(buffer, MESSAGE_TYPE, null, null);
9999
}
100100

101101
}

spring-graphql/src/main/java/org/springframework/graphql/client/WebSocketGraphQlTransport.java

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@
3636
import org.springframework.graphql.GraphQlRequest;
3737
import org.springframework.graphql.support.MapExecutionResult;
3838
import org.springframework.graphql.support.MapGraphQlError;
39-
import org.springframework.graphql.web.webflux.GraphQlWebSocketMessage;
39+
import org.springframework.graphql.web.support.GraphQlMessage;
40+
import org.springframework.graphql.web.support.GraphQlMessageType;
4041
import org.springframework.http.HttpHeaders;
4142
import org.springframework.http.codec.CodecConfigurer;
4243
import org.springframework.lang.Nullable;
@@ -168,7 +169,7 @@ private static class GraphQlSessionHandler implements WebSocketHandler {
168169

169170
private final CodecDelegate codecDelegate;
170171

171-
private final GraphQlWebSocketMessage connectionInitMessage;
172+
private final GraphQlMessage connectionInitMessage;
172173

173174
private final Consumer<Map<String, Object>> connectionAckHandler;
174175

@@ -181,7 +182,7 @@ private static class GraphQlSessionHandler implements WebSocketHandler {
181182
@Nullable Object connectionInitPayload, Consumer<Map<String, Object>> connectionAckHandler) {
182183

183184
this.codecDelegate = new CodecDelegate(codecConfigurer);
184-
this.connectionInitMessage = GraphQlWebSocketMessage.connectionInit(connectionInitPayload);
185+
this.connectionInitMessage = GraphQlMessage.connectionInit(connectionInitPayload);
185186
this.connectionAckHandler = connectionAckHandler;
186187
this.graphQlSessionSink = Sinks.unsafe().one();
187188
}
@@ -240,8 +241,8 @@ public Mono<Void> handle(WebSocketSession session) {
240241
.flatMap(webSocketMessage -> {
241242
if (sessionNotInitialized()) {
242243
try {
243-
GraphQlWebSocketMessage message = this.codecDelegate.decode(webSocketMessage);
244-
Assert.state(message.getType().equals("connection_ack"),
244+
GraphQlMessage message = this.codecDelegate.decode(webSocketMessage);
245+
Assert.state(message.resolvedType() == GraphQlMessageType.CONNECTION_ACK,
245246
() -> "Unexpected message before connection_ack: " + message);
246247
this.connectionAckHandler.accept(message.getPayload());
247248
if (logger.isDebugEnabled()) {
@@ -259,15 +260,15 @@ public Mono<Void> handle(WebSocketSession session) {
259260
}
260261
}
261262
else {
262-
GraphQlWebSocketMessage message = this.codecDelegate.decode(webSocketMessage);
263-
switch (message.getType()) {
264-
case "next":
263+
GraphQlMessage message = this.codecDelegate.decode(webSocketMessage);
264+
switch (message.resolvedType()) {
265+
case NEXT:
265266
graphQlSession.handleNext(message);
266267
break;
267-
case "error":
268+
case ERROR:
268269
graphQlSession.handleError(message);
269270
break;
270-
case "complete":
271+
case COMPLETE:
271272
graphQlSession.handleComplete(message);
272273
break;
273274
default:
@@ -366,7 +367,7 @@ private static class GraphQlSession {
366367

367368
private final AtomicLong requestIndex = new AtomicLong();
368369

369-
private final Sinks.Many<GraphQlWebSocketMessage> requestSink = Sinks.many().unicast().onBackpressureBuffer();
370+
private final Sinks.Many<GraphQlMessage> requestSink = Sinks.many().unicast().onBackpressureBuffer();
370371

371372
private final Map<String, Sinks.One<ExecutionResult>> resultSinks = new ConcurrentHashMap<>();
372373

@@ -381,14 +382,14 @@ private static class GraphQlSession {
381382
/**
382383
* Return the {@code Flux} of GraphQL requests to send as WebSocket messages.
383384
*/
384-
public Flux<GraphQlWebSocketMessage> getRequestFlux() {
385+
public Flux<GraphQlMessage> getRequestFlux() {
385386
return this.requestSink.asFlux();
386387
}
387388

388389
public Mono<ExecutionResult> execute(GraphQlRequest request) {
389390
String id = String.valueOf(this.requestIndex.incrementAndGet());
390391
try {
391-
GraphQlWebSocketMessage message = GraphQlWebSocketMessage.subscribe(id, request);
392+
GraphQlMessage message = GraphQlMessage.subscribe(id, request);
392393
Sinks.One<ExecutionResult> sink = Sinks.one();
393394
this.resultSinks.put(id, sink);
394395
trySend(message);
@@ -403,7 +404,7 @@ public Mono<ExecutionResult> execute(GraphQlRequest request) {
403404
public Flux<ExecutionResult> executeSubscription(GraphQlRequest request) {
404405
String id = String.valueOf(this.requestIndex.incrementAndGet());
405406
try {
406-
GraphQlWebSocketMessage message = GraphQlWebSocketMessage.subscribe(id, request);
407+
GraphQlMessage message = GraphQlMessage.subscribe(id, request);
407408
Sinks.Many<ExecutionResult> sink = Sinks.many().unicast().onBackpressureBuffer();
408409
this.streamingSinks.put(id, sink);
409410
trySend(message);
@@ -417,7 +418,7 @@ public Flux<ExecutionResult> executeSubscription(GraphQlRequest request) {
417418

418419
// TODO: queue to serialize sending?
419420

420-
private void trySend(GraphQlWebSocketMessage message) {
421+
private void trySend(GraphQlMessage message) {
421422
Sinks.EmitResult emitResult = null;
422423
for (int i = 0; i < 100; i++) {
423424
emitResult = this.requestSink.tryEmitNext(message);
@@ -432,7 +433,7 @@ private void cancelStream(String id) {
432433
Sinks.Many<ExecutionResult> streamSink = this.streamingSinks.remove(id);
433434
if (streamSink != null) {
434435
try {
435-
trySend(GraphQlWebSocketMessage.complete(id));
436+
trySend(GraphQlMessage.complete(id));
436437
}
437438
catch (Exception ex) {
438439
if (logger.isErrorEnabled()) {
@@ -447,7 +448,7 @@ private void cancelStream(String id) {
447448
/**
448449
* Handle a "next" message and route to its recipient.
449450
*/
450-
public void handleNext(GraphQlWebSocketMessage message) {
451+
public void handleNext(GraphQlMessage message) {
451452
String id = message.getId();
452453
Sinks.One<ExecutionResult> sink = this.resultSinks.remove(id);
453454
Sinks.Many<ExecutionResult> streamingSink = this.streamingSinks.get(id);
@@ -459,7 +460,7 @@ public void handleNext(GraphQlWebSocketMessage message) {
459460
return;
460461
}
461462

462-
Map<String, Object> resultMap = message.getPayloadOrDefault(Collections.emptyMap());
463+
Map<String, Object> resultMap = message.getPayload();
463464
ExecutionResult result = MapExecutionResult.from(resultMap);
464465

465466
Sinks.EmitResult emitResult = (sink != null ? sink.tryEmitValue(result) : streamingSink.tryEmitNext(result));
@@ -475,7 +476,7 @@ public void handleNext(GraphQlWebSocketMessage message) {
475476
* Handle an "error" message, turning it into an {@link ExecutionResult}
476477
* for a single result response, or signaling an error to streams.
477478
*/
478-
public void handleError(GraphQlWebSocketMessage message) {
479+
public void handleError(GraphQlMessage message) {
479480
String id = message.getId();
480481
Sinks.One<ExecutionResult> sink = this.resultSinks.remove(id);
481482
Sinks.Many<ExecutionResult> streamingSink = this.streamingSinks.remove(id);
@@ -487,7 +488,7 @@ public void handleError(GraphQlWebSocketMessage message) {
487488
return;
488489
}
489490

490-
List<Map<String, Object>> payload = message.getPayloadOrDefault(Collections.emptyList());
491+
List<Map<String, Object>> payload = message.getPayload();
491492

492493
Sinks.EmitResult emitResult;
493494
if (sink != null) {
@@ -508,7 +509,7 @@ public void handleError(GraphQlWebSocketMessage message) {
508509
/**
509510
* Handle a "complete" message.
510511
*/
511-
public void handleComplete(GraphQlWebSocketMessage message) {
512+
public void handleComplete(GraphQlMessage message) {
512513
Sinks.One<ExecutionResult> resultSink = this.resultSinks.remove(message.getId());
513514
Sinks.Many<ExecutionResult> streamingResultSink = this.streamingSinks.remove(message.getId());
514515

0 commit comments

Comments
 (0)