Skip to content

Commit 56ffed0

Browse files
Abhijit Sarkarartembilan
authored andcommitted
GH-2340: Fix WebFluxMessageHandlerSpec
Fixes #2340 * Make `WebFluxMessageHandlerSpec` to support method chain * Add `WebFluxDslTests.testWebFluxFlowWithReplyPayloadToFlux()`
1 parent 01b0e1c commit 56ffed0

File tree

2 files changed

+77
-4
lines changed

2 files changed

+77
-4
lines changed

spring-integration-webflux/src/main/java/org/springframework/integration/webflux/dsl/WebFluxMessageHandlerSpec.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
*
3636
* @author Shiliang Li
3737
* @author Artem Bilan
38+
* @author Abhijit Sarkar
3839
*
3940
* @since 5.0
4041
*
@@ -64,22 +65,26 @@ public class WebFluxMessageHandlerSpec
6465
* Defaults to {@code false} - simple value is pushed downstream.
6566
* Makes sense when {@code expectedResponseType} is configured.
6667
* @param replyPayloadToFlux represent reply payload as a {@link Flux} or as a value from the {@link Mono}.
68+
* @return the spec
6769
* @since 5.0.1
6870
* @see WebFluxRequestExecutingMessageHandler#setReplyPayloadToFlux(boolean)
6971
*/
70-
public void replyPayloadToFlux(boolean replyPayloadToFlux) {
72+
public WebFluxMessageHandlerSpec replyPayloadToFlux(boolean replyPayloadToFlux) {
7173
this.target.setReplyPayloadToFlux(replyPayloadToFlux);
74+
return this;
7275
}
7376

7477
/**
7578
* Specify a {@link BodyExtractor} as an alternative to the {@code expectedResponseType}
7679
* to allow to get low-level access to the received {@link ClientHttpResponse}.
7780
* @param bodyExtractor the {@link BodyExtractor} to use.
81+
* @return the spec
7882
* @since 5.0.1
7983
* @see WebFluxRequestExecutingMessageHandler#setBodyExtractor(BodyExtractor)
8084
*/
81-
public void bodyExtractor(BodyExtractor<?, ClientHttpResponse> bodyExtractor) {
85+
public WebFluxMessageHandlerSpec bodyExtractor(BodyExtractor<?, ClientHttpResponse> bodyExtractor) {
8286
this.target.setBodyExtractor(bodyExtractor);
87+
return this;
8388
}
8489

8590
@Override

spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,26 +26,33 @@
2626

2727
import java.util.Collections;
2828

29+
import javax.annotation.Resource;
30+
2931
import org.junit.Before;
3032
import org.junit.Test;
3133
import org.junit.runner.RunWith;
3234
import org.reactivestreams.Publisher;
3335

3436
import org.springframework.beans.DirectFieldAccessor;
3537
import org.springframework.beans.factory.annotation.Autowired;
38+
import org.springframework.beans.factory.annotation.Qualifier;
3639
import org.springframework.context.annotation.Bean;
3740
import org.springframework.context.annotation.Configuration;
3841
import org.springframework.core.ResolvableType;
42+
import org.springframework.core.io.buffer.DataBufferFactory;
3943
import org.springframework.http.HttpMethod;
4044
import org.springframework.http.HttpStatus;
4145
import org.springframework.http.MediaType;
4246
import org.springframework.http.client.reactive.ClientHttpConnector;
47+
import org.springframework.integration.channel.QueueChannel;
4348
import org.springframework.integration.config.EnableIntegration;
4449
import org.springframework.integration.dsl.IntegrationFlow;
4550
import org.springframework.integration.dsl.IntegrationFlows;
4651
import org.springframework.integration.http.dsl.Http;
52+
import org.springframework.integration.support.MessageBuilder;
4753
import org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler;
4854
import org.springframework.messaging.Message;
55+
import org.springframework.messaging.MessageChannel;
4956
import org.springframework.messaging.PollableChannel;
5057
import org.springframework.security.access.AccessDecisionManager;
5158
import org.springframework.security.access.vote.AffirmativeBased;
@@ -76,6 +83,7 @@
7683
/**
7784
* @author Artem Bilan
7885
* @author Shiliang Li
86+
* @author Abhijit Sarkar
7987
*
8088
* @since 5.0
8189
*/
@@ -88,7 +96,15 @@ public class WebFluxDslTests {
8896
private WebApplicationContext wac;
8997

9098
@Autowired
91-
private WebFluxRequestExecutingMessageHandler serviceInternalReactiveGatewayHandler;
99+
@Qualifier("webFluxWithReplyPayloadToFlux.handler")
100+
private WebFluxRequestExecutingMessageHandler webFluxWithReplyPayloadToFlux;
101+
102+
@Resource(name = "org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler#1")
103+
private WebFluxRequestExecutingMessageHandler httpReactiveProxyFlow;
104+
105+
@Autowired
106+
@Qualifier("webFluxFlowWithReplyPayloadToFlux.input")
107+
private MessageChannel webFluxFlowWithReplyPayloadToFluxInput;
92108

93109
private MockMvc mockMvc;
94110

@@ -106,6 +122,48 @@ public void setup() {
106122
.build();
107123
}
108124

125+
@Test
126+
public void testWebFluxFlowWithReplyPayloadToFlux() {
127+
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
128+
response.setStatusCode(HttpStatus.OK);
129+
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);
130+
131+
DataBufferFactory bufferFactory = response.bufferFactory();
132+
return response.writeWith(
133+
Flux.just(bufferFactory.wrap("FOO".getBytes()),
134+
bufferFactory.wrap("BAR".getBytes())))
135+
.then(Mono.defer(response::setComplete));
136+
});
137+
138+
WebClient webClient = WebClient.builder()
139+
.clientConnector(httpConnector)
140+
.build();
141+
142+
new DirectFieldAccessor(this.webFluxWithReplyPayloadToFlux)
143+
.setPropertyValue("webClient", webClient);
144+
145+
QueueChannel replyChannel = new QueueChannel();
146+
147+
Message<String> testMessage =
148+
MessageBuilder.withPayload("test")
149+
.setReplyChannel(replyChannel)
150+
.build();
151+
152+
this.webFluxFlowWithReplyPayloadToFluxInput.send(testMessage);
153+
154+
Message<?> receive = replyChannel.receive(10_000);
155+
156+
assertNotNull(receive);
157+
assertThat(receive.getPayload(), instanceOf(Flux.class));
158+
159+
@SuppressWarnings("unchecked")
160+
Flux<String> response = (Flux<String>) receive.getPayload();
161+
162+
StepVerifier.create(response)
163+
.expectNext("FOO", "BAR")
164+
.verifyComplete();
165+
}
166+
109167
@Test
110168
public void testHttpReactiveProxyFlow() throws Exception {
111169
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
@@ -120,7 +178,7 @@ public void testHttpReactiveProxyFlow() throws Exception {
120178
.clientConnector(httpConnector)
121179
.build();
122180

123-
new DirectFieldAccessor(this.serviceInternalReactiveGatewayHandler)
181+
new DirectFieldAccessor(this.httpReactiveProxyFlow)
124182
.setPropertyValue("webClient", webClient);
125183

126184
this.mockMvc.perform(
@@ -200,6 +258,16 @@ protected void configure(HttpSecurity http) throws Exception {
200258
.anonymous().disable();
201259
}
202260

261+
@Bean
262+
public IntegrationFlow webFluxFlowWithReplyPayloadToFlux() {
263+
return f -> f
264+
.handle(WebFlux.outboundGateway("http://www.springsource.org/spring-integration")
265+
.httpMethod(HttpMethod.GET)
266+
.replyPayloadToFlux(true)
267+
.expectedResponseType(String.class),
268+
e -> e.id("webFluxWithReplyPayloadToFlux"));
269+
}
270+
203271
@Bean
204272
public IntegrationFlow httpReactiveProxyFlow() {
205273
return IntegrationFlows

0 commit comments

Comments
 (0)