Skip to content

Commit bd87284

Browse files
artembilangaryrussell
authored andcommitted
GH-2300: Add Flux support in WebFluxRequestExecMH
Fixes: #2300 To allow to consume a streaming HTTP response downstream expose `replyToFlux` option on the `WebFluxRequestExecutingMessageHandler`. This way the body of the HTTP response can be converted now to the `Flux` for subsequent output message. The option is `false` by default; can be changed to `true` in the `5.1`
1 parent 21bf69b commit bd87284

File tree

7 files changed

+145
-29
lines changed

7 files changed

+145
-29
lines changed

spring-integration-webflux/src/main/java/org/springframework/integration/webflux/config/WebFluxOutboundGatewayParser.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.springframework.beans.factory.config.RuntimeBeanReference;
2222
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
2323
import org.springframework.beans.factory.xml.ParserContext;
24+
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
2425
import org.springframework.integration.http.config.HttpOutboundGatewayParser;
2526
import org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler;
2627
import org.springframework.util.StringUtils;
@@ -46,6 +47,7 @@ protected BeanDefinitionBuilder getBuilder(Element element, ParserContext parser
4647
.addIndexedArgumentValue(1, new RuntimeBeanReference(webClientRef));
4748
}
4849

50+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "reply-to-flux");
4951
return builder;
5052
}
5153

spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,22 @@
2929
import org.springframework.http.HttpEntity;
3030
import org.springframework.http.HttpMethod;
3131
import org.springframework.http.HttpStatus;
32+
import org.springframework.http.ReactiveHttpInputMessage;
3233
import org.springframework.http.ResponseEntity;
3334
import org.springframework.integration.expression.ValueExpression;
3435
import org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler;
3536
import org.springframework.messaging.Message;
3637
import org.springframework.messaging.MessageHandler;
3738
import org.springframework.util.Assert;
3839
import org.springframework.util.MimeType;
40+
import org.springframework.web.reactive.function.BodyExtractor;
3941
import org.springframework.web.reactive.function.BodyExtractors;
4042
import org.springframework.web.reactive.function.BodyInserters;
4143
import org.springframework.web.reactive.function.client.ClientResponse;
4244
import org.springframework.web.reactive.function.client.WebClient;
4345
import org.springframework.web.reactive.function.client.WebClientResponseException;
4446

47+
import reactor.core.publisher.Flux;
4548
import reactor.core.publisher.Mono;
4649

4750
/**
@@ -59,6 +62,8 @@ public class WebFluxRequestExecutingMessageHandler extends AbstractHttpRequestEx
5962

6063
private final WebClient webClient;
6164

65+
private boolean replyToFlux;
66+
6267
/**
6368
* Create a handler that will send requests to the provided URI.
6469
* @param uri The URI.
@@ -110,6 +115,20 @@ public WebFluxRequestExecutingMessageHandler(Expression uriExpression, WebClient
110115
this.setAsync(true);
111116
}
112117

118+
/**
119+
* The boolean flag to identify if the reply payload should be as a {@link Flux} from the response body
120+
* or as resolved value from the {@link Mono} of the response body.
121+
* Defaults to {@code false} - simple value is pushed downstream.
122+
* Makes sense when {@code expectedResponseType} is configured.
123+
* @param replyToFlux represent reply payload as a {@link Flux} or as a value from the {@link Mono}.
124+
* @since 5.0.1
125+
* @see #setExpectedResponseType(Class)
126+
* @see #setExpectedResponseTypeExpression(Expression)
127+
*/
128+
public void setReplyToFlux(boolean replyToFlux) {
129+
this.replyToFlux = replyToFlux;
130+
}
131+
113132
@Override
114133
public String getComponentType() {
115134
return (isExpectReply() ? "webflux:outbound-gateway" : "webflux:outbound-channel-adapter");
@@ -169,13 +188,35 @@ protected Object exchange(Supplier<URI> uriSupplier, HttpMethod httpMethod, Http
169188
ResponseEntity.status(response.statusCode())
170189
.headers(response.headers().asHttpHeaders());
171190

172-
Mono<?> bodyMono = Mono.empty();
173-
174-
if (expectedResponseType instanceof ParameterizedTypeReference<?>) {
175-
bodyMono = response.body(BodyExtractors.toMono((ParameterizedTypeReference<?>) expectedResponseType));
191+
Mono<?> bodyMono;
192+
193+
if (expectedResponseType != null) {
194+
if (this.replyToFlux) {
195+
BodyExtractor<? extends Flux<?>, ReactiveHttpInputMessage> extractor;
196+
if (expectedResponseType instanceof ParameterizedTypeReference<?>) {
197+
extractor = BodyExtractors.toFlux(
198+
(ParameterizedTypeReference<?>) expectedResponseType);
199+
}
200+
else {
201+
extractor = BodyExtractors.toFlux((Class<?>) expectedResponseType);
202+
}
203+
Flux<?> flux = response.body(extractor);
204+
bodyMono = Mono.just(flux);
205+
}
206+
else {
207+
BodyExtractor<? extends Mono<?>, ReactiveHttpInputMessage> extractor;
208+
if (expectedResponseType instanceof ParameterizedTypeReference<?>) {
209+
extractor = BodyExtractors.toMono(
210+
(ParameterizedTypeReference<?>) expectedResponseType);
211+
}
212+
else {
213+
extractor = BodyExtractors.toMono((Class<?>) expectedResponseType);
214+
}
215+
bodyMono = response.body(extractor);
216+
}
176217
}
177-
else if (expectedResponseType != null) {
178-
bodyMono = response.body(BodyExtractors.toMono((Class<?>) expectedResponseType));
218+
else {
219+
bodyMono = Mono.empty();
179220
}
180221

181222
return bodyMono

spring-integration-webflux/src/main/resources/org/springframework/integration/webflux/config/spring-integration-webflux-5.0.xsd

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<xsd:import namespace="http://www.springframework.org/schema/beans"/>
1111
<xsd:import namespace="http://www.springframework.org/schema/tool"/>
1212
<xsd:import namespace="http://www.springframework.org/schema/integration"
13-
schemaLocation="http://www.springframework.org/schema/integration/spring-integration-5.0.xsd" />
13+
schemaLocation="http://www.springframework.org/schema/integration/spring-integration-5.0.xsd"/>
1414
<xsd:import namespace="http://www.springframework.org/schema/integration/http"
1515
schemaLocation="http://www.springframework.org/schema/integration/http/spring-integration-http-5.0.xsd"/>
1616

@@ -82,7 +82,8 @@
8282
<xsd:complexContent>
8383
<xsd:extension base="int-http:gatewayType">
8484
<xsd:choice minOccurs="0" maxOccurs="3">
85-
<xsd:element name="uri-variable" type="int-http:uriVariableType" minOccurs="0" maxOccurs="unbounded">
85+
<xsd:element name="uri-variable" type="int-http:uriVariableType" minOccurs="0"
86+
maxOccurs="unbounded">
8687
<xsd:annotation>
8788
<xsd:documentation>
8889
Specify an expression for URI variable placeholder within 'url'.
@@ -126,14 +127,17 @@
126127
</xsd:documentation>
127128
</xsd:annotation>
128129
</xsd:attribute>
129-
<xsd:attribute name="transfer-cookies" type="xsd:string" default="false">
130+
<xsd:attribute name="transfer-cookies" default="false">
130131
<xsd:annotation>
131132
<xsd:documentation><![CDATA[
132133
When set to "true", if a response contains a 'Set-Cookie' header, it will be mapped to a 'Cookie' header.
133134
This enables simple cookie handling where subsequent HTTP interactions in the same message flow can use a cookie
134135
supplied by the server. Default is "false".
135136
]]></xsd:documentation>
136137
</xsd:annotation>
138+
<xsd:simpleType>
139+
<xsd:union memberTypes="xsd:boolean xsd:string"/>
140+
</xsd:simpleType>
137141
</xsd:attribute>
138142
<xsd:attribute name="reply-timeout" type="xsd:string">
139143
<xsd:annotation>
@@ -173,6 +177,19 @@
173177
</xsd:documentation>
174178
</xsd:annotation>
175179
</xsd:attribute>
180+
<xsd:attribute name="reply-to-flux" default="false">
181+
<xsd:annotation>
182+
<xsd:documentation>
183+
When set to "true", the response is converted to the Flux which is sent as a reply
184+
message payload.
185+
The downstream flow might have a splitter to walk through that Flux.
186+
Default is "false".
187+
</xsd:documentation>
188+
</xsd:annotation>
189+
<xsd:simpleType>
190+
<xsd:union memberTypes="xsd:boolean xsd:string"/>
191+
</xsd:simpleType>
192+
</xsd:attribute>
176193
</xsd:extension>
177194
</xsd:complexContent>
178195
</xsd:complexType>

spring-integration-webflux/src/test/java/org/springframework/integration/webflux/config/WebFluxOutboundGatewayParserTests-context.xml

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,23 @@
2020
</si:channel>
2121

2222
<outbound-gateway id="reactiveMinimalConfig" url="http://localhost/test1" request-channel="requests"
23-
web-client="webClient"/>
23+
web-client="webClient"/>
2424

2525
<outbound-gateway id="reactiveFullConfig"
26-
url="http://localhost/test2"
27-
http-method="PUT"
28-
request-channel="requests"
29-
reply-timeout="1234"
30-
extract-request-payload="false"
31-
expected-response-type="java.lang.String"
32-
mapped-request-headers="requestHeader1, requestHeader2"
33-
mapped-response-headers="responseHeader"
34-
reply-channel="replies"
35-
charset="UTF-8"
36-
order="77"
37-
auto-startup="false"
38-
transfer-cookies="true">
26+
url="http://localhost/test2"
27+
http-method="PUT"
28+
request-channel="requests"
29+
reply-timeout="1234"
30+
extract-request-payload="false"
31+
expected-response-type="java.lang.String"
32+
mapped-request-headers="requestHeader1, requestHeader2"
33+
mapped-response-headers="responseHeader"
34+
reply-channel="replies"
35+
charset="UTF-8"
36+
order="77"
37+
auto-startup="false"
38+
transfer-cookies="true"
39+
reply-to-flux="true">
3940
<uri-variable name="foo" expression="headers.bar"/>
4041
</outbound-gateway>
4142

spring-integration-webflux/src/test/java/org/springframework/integration/webflux/config/WebFluxOutboundGatewayParserTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public void reactiveMinimalConfig() {
8282
assertEquals(Charset.forName("UTF-8"), handlerAccessor.getPropertyValue("charset"));
8383
assertEquals(true, handlerAccessor.getPropertyValue("extractPayload"));
8484
assertEquals(false, handlerAccessor.getPropertyValue("transferCookies"));
85+
assertEquals(false, handlerAccessor.getPropertyValue("replyToFlux"));
8586
}
8687

8788
@Test
@@ -123,6 +124,7 @@ public void reactiveFullConfig() {
123124
assertTrue(ObjectUtils.containsElement(mappedRequestHeaders, "requestHeader2"));
124125
assertEquals("responseHeader", mappedResponseHeaders[0]);
125126
assertEquals(true, handlerAccessor.getPropertyValue("transferCookies"));
127+
assertEquals(true, handlerAccessor.getPropertyValue("replyToFlux"));
126128
}
127129

128130
}

spring-integration-webflux/src/test/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandlerTests.java

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@
2828
import org.junit.Test;
2929
import org.reactivestreams.Subscriber;
3030

31+
import org.springframework.core.io.buffer.DataBuffer;
32+
import org.springframework.core.io.buffer.DataBufferFactory;
3133
import org.springframework.http.HttpStatus;
34+
import org.springframework.http.MediaType;
3235
import org.springframework.http.client.reactive.ClientHttpConnector;
3336
import org.springframework.integration.channel.FluxMessageChannel;
3437
import org.springframework.integration.channel.QueueChannel;
@@ -42,6 +45,7 @@
4245
import org.springframework.web.reactive.function.client.WebClient;
4346
import org.springframework.web.reactive.function.client.WebClientResponseException;
4447

48+
import reactor.core.publisher.Flux;
4549
import reactor.core.publisher.Mono;
4650
import reactor.test.StepVerifier;
4751

@@ -54,7 +58,7 @@
5458
public class WebFluxRequestExecutingMessageHandlerTests {
5559

5660
@Test
57-
public void testReactiveReturn() throws Throwable {
61+
public void testReactiveReturn() {
5862
ClientHttpConnector httpConnector =
5963
new HttpHandlerConnector((request, response) -> {
6064
response.setStatusCode(HttpStatus.OK);
@@ -84,7 +88,7 @@ public void testReactiveReturn() throws Throwable {
8488
}
8589

8690
@Test
87-
public void testReactiveErrorOneWay() throws Throwable {
91+
public void testReactiveErrorOneWay() {
8892
ClientHttpConnector httpConnector =
8993
new HttpHandlerConnector((request, response) -> {
9094
response.setStatusCode(HttpStatus.UNAUTHORIZED);
@@ -114,7 +118,7 @@ public void testReactiveErrorOneWay() throws Throwable {
114118
}
115119

116120
@Test
117-
public void testReactiveConnectErrorOneWay() throws Throwable {
121+
public void testReactiveConnectErrorOneWay() {
118122
ClientHttpConnector httpConnector =
119123
new HttpHandlerConnector((request, response) -> {
120124
throw new RuntimeException("Intentional connection error");
@@ -182,4 +186,50 @@ public void testServiceUnavailableWithoutBody() {
182186
assertNull(replyMessage);
183187
}
184188

189+
@Test
190+
@SuppressWarnings("unchecked")
191+
public void testFluxReply() {
192+
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
193+
response.setStatusCode(HttpStatus.OK);
194+
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);
195+
196+
DataBufferFactory bufferFactory = response.bufferFactory();
197+
198+
Flux<DataBuffer> data =
199+
Flux.just(bufferFactory.wrap("foo".getBytes()),
200+
bufferFactory.wrap("bar".getBytes()),
201+
bufferFactory.wrap("baz".getBytes()));
202+
203+
return response.writeWith(data)
204+
.then(Mono.defer(response::setComplete));
205+
});
206+
207+
WebClient webClient = WebClient.builder()
208+
.clientConnector(httpConnector)
209+
.build();
210+
211+
String destinationUri = "http://www.springsource.org/spring-integration";
212+
WebFluxRequestExecutingMessageHandler reactiveHandler =
213+
new WebFluxRequestExecutingMessageHandler(destinationUri, webClient);
214+
215+
QueueChannel replyChannel = new QueueChannel();
216+
reactiveHandler.setOutputChannel(replyChannel);
217+
reactiveHandler.setExpectedResponseType(String.class);
218+
reactiveHandler.setReplyToFlux(true);
219+
220+
reactiveHandler.handleMessage(MessageBuilder.withPayload("hello, world").build());
221+
222+
Message<?> receive = replyChannel.receive(10_000);
223+
224+
assertNotNull(receive);
225+
226+
assertThat(receive.getPayload(), instanceOf(Flux.class));
227+
228+
Flux<String> flux = (Flux<String>) receive.getPayload();
229+
230+
StepVerifier.create(flux)
231+
.expectNext("foo", "bar", "baz")
232+
.verifyComplete();
233+
}
234+
185235
}

src/reference/asciidoc/webflux.adoc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,13 @@ You can configure a `WebClient` instance to use:
9595
</bean>
9696
----
9797

98-
The `WebClient` `exchange()` operation returns a `Mono<ClientResponse>` which is mapped to the `AbstractIntegrationMessageBuilder` reactive support (using `Mono.map()`) as the output from the `WebFluxRequestExecutingMessageHandler`.
98+
The `WebClient` `exchange()` operation returns a `Mono<ClientResponse>` which is mapped (using several `Mono.map()` steps) to an `AbstractIntegrationMessageBuilder` as the output from the `WebFluxRequestExecutingMessageHandler`.
9999
Together with the `ReactiveChannel` as an `outputChannel`, the `Mono<ClientResponse>` evaluation is deferred until a downstream subscription is made.
100100
Otherwise, it is treated as an `async` mode and the `Mono` response is adapted to an `SettableListenableFuture` for an asynchronous reply from the `WebFluxRequestExecutingMessageHandler`.
101-
101+
The target payload of the output message depends on the `WebFluxRequestExecutingMessageHandler` configuration.
102+
The `setExpectedResponseType(Class<?>)` or `setExpectedResponseTypeExpression(Expression)` identifies the target type of the response body element conversion.
103+
If `replyToFlux` is set to `true`, the response body is converted to a `Flux` with the provided `expectedResponseType` for each element and this `Flux` is sent as the payload downstream.
104+
A <<splitter,splitter>> afterwards can be used to iterate over this `Flux` in a reactive manner.
102105

103106
Also see <<http-outbound>> for more possible configuration options.
104107

@@ -237,4 +240,4 @@ public IntegrationFlow outboundReactive() {
237240
=== WebFlux Header Mappings
238241

239242
Since WebFlux components are fully based on the HTTP protocol there is no difference in the HTTP headers mapping.
240-
See <<http-header-mapping>> for more possible options and components to use for mapping headers.
243+
See <<http-header-mapping>> for more possible options and components to use for mapping headers.

0 commit comments

Comments
 (0)