Skip to content

Commit 2df71fb

Browse files
artembilangaryrussell
authored andcommitted
INT-4537: Fix RSConsumer for MockIntegrationCtx
JIRA: https://jira.spring.io/browse/INT-4537 Fixes #2582 * Rename `ReactiveStreamsConsumer.messageHandler` property to the `handler` for consistency with other `IntegrationConsumer` s * Do not wrap `Subscriber` into the `MessageHandler` if that one is already a `MessageHandler` * Fix `MockIntegrationContext` for the logic around `ReactiveStreamsConsumer` where it is not enough just replace a `handler`, but we also need to do that with the `subscriber`. Luckily the `MockMessageHandler` is also a Reactive `Subscriber` * Clean up `MockIntegrationContext.beans` in the end of `resetBeans()` * Improve `testing.adoc` **Cherry-pick to 5.0.x**
1 parent 9e9fa2c commit 2df71fb

File tree

4 files changed

+64
-16
lines changed

4 files changed

+64
-16
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class ReactiveStreamsConsumer extends AbstractEndpoint implements Integra
4949

5050
private final MessageChannel inputChannel;
5151

52-
private final MessageHandler messageHandler;
52+
private final MessageHandler handler;
5353

5454
private final Publisher<Message<Object>> publisher;
5555

@@ -83,10 +83,13 @@ public ReactiveStreamsConsumer(MessageChannel inputChannel, final Subscriber<Mes
8383
this.subscriber = subscriber;
8484
this.lifecycleDelegate = subscriber instanceof Lifecycle ? (Lifecycle) subscriber : null;
8585
if (subscriber instanceof MessageHandlerSubscriber) {
86-
this.messageHandler = ((MessageHandlerSubscriber) subscriber).messageHandler;
86+
this.handler = ((MessageHandlerSubscriber) subscriber).messageHandler;
87+
}
88+
else if (subscriber instanceof MessageHandler) {
89+
this.handler = (MessageHandler) subscriber;
8790
}
8891
else {
89-
this.messageHandler = this.subscriber::onNext;
92+
this.handler = this.subscriber::onNext;
9093
}
9194
}
9295

@@ -101,11 +104,11 @@ public MessageChannel getInputChannel() {
101104

102105
@Override
103106
public MessageChannel getOutputChannel() {
104-
if (this.messageHandler instanceof MessageProducer) {
105-
return ((MessageProducer) this.messageHandler).getOutputChannel();
107+
if (this.handler instanceof MessageProducer) {
108+
return ((MessageProducer) this.handler).getOutputChannel();
106109
}
107-
else if (this.messageHandler instanceof MessageRouter) {
108-
return ((MessageRouter) this.messageHandler).getDefaultOutputChannel();
110+
else if (this.handler instanceof MessageRouter) {
111+
return ((MessageRouter) this.handler).getDefaultOutputChannel();
109112
}
110113
else {
111114
return null;
@@ -114,7 +117,7 @@ else if (this.messageHandler instanceof MessageRouter) {
114117

115118
@Override
116119
public MessageHandler getHandler() {
117-
return this.messageHandler;
120+
return this.handler;
118121
}
119122

120123
@Override

spring-integration-test/src/main/java/org/springframework/integration/test/context/MockIntegrationContext.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.integration.core.MessageProducer;
3131
import org.springframework.integration.core.MessageSource;
3232
import org.springframework.integration.endpoint.IntegrationConsumer;
33+
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
3334
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
3435
import org.springframework.integration.test.mock.MockMessageHandler;
3536
import org.springframework.integration.test.util.TestUtils;
@@ -38,6 +39,9 @@
3839
import org.springframework.util.Assert;
3940
import org.springframework.util.ObjectUtils;
4041

42+
import reactor.util.function.Tuple2;
43+
import reactor.util.function.Tuples;
44+
4145
/**
4246
* A {@link BeanFactoryAware} component with an API to customize real beans
4347
* in the application context from test code.
@@ -91,10 +95,17 @@ public void resetBeans(String... beanNames) {
9195
if (endpoint instanceof SourcePollingChannelAdapter) {
9296
directFieldAccessor.setPropertyValue("source", e.getValue());
9397
}
98+
else if (endpoint instanceof ReactiveStreamsConsumer) {
99+
Tuple2<?, ?> value = (Tuple2<?, ?>) e.getValue();
100+
directFieldAccessor.setPropertyValue("handler", value.getT1());
101+
directFieldAccessor.setPropertyValue("subscriber", value.getT2());
102+
}
94103
else if (endpoint instanceof IntegrationConsumer) {
95104
directFieldAccessor.setPropertyValue("handler", e.getValue());
96105
}
97106
});
107+
108+
this.beans.clear();
98109
}
99110

100111
/**
@@ -137,7 +148,13 @@ public void substituteMessageHandlerFor(String consumerEndpointId, MessageHandle
137148
}
138149
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(endpoint);
139150
Object targetMessageHandler = directFieldAccessor.getPropertyValue("handler");
140-
this.beans.put(consumerEndpointId, targetMessageHandler);
151+
if (endpoint instanceof ReactiveStreamsConsumer) {
152+
Object targetSubscriber = directFieldAccessor.getPropertyValue("subscriber");
153+
this.beans.put(consumerEndpointId, Tuples.of(targetMessageHandler, targetSubscriber));
154+
}
155+
else {
156+
this.beans.put(consumerEndpointId, targetMessageHandler);
157+
}
141158

142159
if (mockMessageHandler instanceof MessageProducer) {
143160
if (targetMessageHandler instanceof MessageProducer) {
@@ -160,6 +177,10 @@ public void substituteMessageHandlerFor(String consumerEndpointId, MessageHandle
160177

161178
directFieldAccessor.setPropertyValue("handler", mockMessageHandler);
162179

180+
if (endpoint instanceof ReactiveStreamsConsumer) {
181+
directFieldAccessor.setPropertyValue("subscriber", mockMessageHandler);
182+
}
183+
163184
if (autoStartup && endpoint instanceof Lifecycle) {
164185
((Lifecycle) endpoint).start();
165186
}

spring-integration-test/src/test/java/org/springframework/integration/test/mock/MockMessageHandlerTests.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.hamcrest.Matchers.instanceOf;
2121
import static org.junit.Assert.assertEquals;
2222
import static org.junit.Assert.assertNotNull;
23+
import static org.junit.Assert.assertNotSame;
2324
import static org.junit.Assert.assertSame;
2425
import static org.junit.Assert.assertThat;
2526
import static org.junit.Assert.fail;
@@ -36,6 +37,7 @@
3637
import org.junit.Test;
3738
import org.junit.runner.RunWith;
3839
import org.mockito.ArgumentCaptor;
40+
import org.reactivestreams.Subscriber;
3941

4042
import org.springframework.beans.factory.annotation.Autowired;
4143
import org.springframework.context.ApplicationContext;
@@ -46,7 +48,7 @@
4648
import org.springframework.integration.channel.DirectChannel;
4749
import org.springframework.integration.channel.QueueChannel;
4850
import org.springframework.integration.config.EnableIntegration;
49-
import org.springframework.integration.endpoint.EventDrivenConsumer;
51+
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
5052
import org.springframework.integration.expression.ValueExpression;
5153
import org.springframework.integration.handler.ExpressionEvaluatingMessageHandler;
5254
import org.springframework.integration.support.MessageBuilder;
@@ -183,7 +185,8 @@ public void testMockRawHandler() {
183185
ArgumentCaptor<Message<?>> messageArgumentCaptor = MockIntegration.messageArgumentCaptor();
184186
MessageHandler mockMessageHandler =
185187
spy(mockMessageHandler(messageArgumentCaptor))
186-
.handleNext(m -> { });
188+
.handleNext(m -> {
189+
});
187190

188191
String endpointId = "rawHandlerConsumer";
189192
this.mockIntegrationContext.substituteMessageHandlerFor(endpointId, mockMessageHandler);
@@ -214,6 +217,11 @@ public void testMockRawHandler() {
214217
assertThat(e, instanceOf(IllegalStateException.class));
215218
assertThat(e.getMessage(), containsString("with replies can't replace simple MessageHandler"));
216219
}
220+
221+
this.mockIntegrationContext.resetBeans();
222+
223+
assertNotSame(mockMessageHandler, TestUtils.getPropertyValue(endpoint, "handler", MessageHandler.class));
224+
assertNotSame(mockMessageHandler, TestUtils.getPropertyValue(endpoint, "subscriber", Subscriber.class));
217225
}
218226

219227
/**
@@ -272,9 +280,9 @@ public SubscribableChannel rawChannel() {
272280
}
273281

274282
@Bean
275-
public EventDrivenConsumer rawHandlerConsumer() {
276-
return new EventDrivenConsumer(rawChannel(),
277-
new ExpressionEvaluatingMessageHandler(new ValueExpression<>("test")));
283+
public ReactiveStreamsConsumer rawHandlerConsumer() {
284+
return new ReactiveStreamsConsumer(rawChannel(),
285+
(MessageHandler) new ExpressionEvaluatingMessageHandler(new ValueExpression<>("test")));
278286
}
279287

280288
@ServiceActivator(inputChannel = "startChannel", outputChannel = "nextChannel")
@@ -291,7 +299,8 @@ public ArgumentCaptor<Message<?>> argumentCaptorForOutputTest() {
291299
@ServiceActivator(inputChannel = "nextChannel")
292300
public MessageHandler handleNextInput() {
293301
return mockMessageHandler(argumentCaptorForOutputTest())
294-
.handleNext(m -> { });
302+
.handleNext(m -> {
303+
});
295304
}
296305

297306
}

src/reference/asciidoc/testing.adoc

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,14 +265,29 @@ public void testMockMessageSource() {
265265
----
266266
====
267267

268+
NOTE: The `mySourceEndpoint` refers here to the bean name of the `SourcePollingChannelAdapter` for which we replace the real `MessageSource` with our mock.
269+
Similarly the `MockIntegrationContext.substituteMessageHandlerFor()` expects a bean name for the `IntegrationConsumer`, which wraps a `MessageHandler` as an endpoint.
270+
271+
After test is performed you can restore the state of endpoint beans to the real configuration using `MockIntegrationContext.resetBeans()`:
272+
273+
====
274+
[source,java]
275+
----
276+
@After
277+
public void tearDown() {
278+
this.mockIntegrationContext.resetBeans();
279+
}
280+
----
281+
====
282+
268283
See the https://docs.spring.io/spring-integration/api/org/springframework/integration/test/context/MockIntegrationContext.html[Javadoc] for more information.
269284

270285
[[testing-mocks]]
271286
=== Integration Mocks
272287

273288
The `org.springframework.integration.test.mock` package offers tools and utilities for mocking, stubbing, and verification of activity on Spring Integration components.
274289
The mocking functionality is fully based on and compatible with the well known Mockito Framework.
275-
(The current Mockito transitive dependency is on version 2.5.x.)
290+
(The current Mockito transitive dependency is on version 2.5.x or higher.)
276291

277292
==== MockIntegration
278293

0 commit comments

Comments
 (0)