Skip to content

Commit 11fcda4

Browse files
artembilangaryrussell
authored andcommitted
INT-4537: Fix RSConsumer for MockIntegrationCtx
JIRA: https://jira.spring.io/browse/INT-4537 Fixes #2582 * Rename `ReactiveStreamsConsumer.messageHandler` property to the `handler` for consistency with other `IntegrationConsumer` s * Do not wrap `Subscriber` into the `MessageHandler` if that one is already a `MessageHandler` * Fix `MockIntegrationContext` for the logic around `ReactiveStreamsConsumer` where it is not enough just replace a `handler`, but we also need to do that with the `subscriber`. Luckily the `MockMessageHandler` is also a Reactive `Subscriber` * Clean up `MockIntegrationContext.beans` in the end of `resetBeans()` * Improve `testing.adoc` **Cherry-pick to 5.0.x**
1 parent 3644c05 commit 11fcda4

File tree

4 files changed

+67
-19
lines changed

4 files changed

+67
-19
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class ReactiveStreamsConsumer extends AbstractEndpoint implements Integra
4747

4848
private final MessageChannel inputChannel;
4949

50-
private final MessageHandler messageHandler;
50+
private final MessageHandler handler;
5151

5252
private final Publisher<Message<Object>> publisher;
5353

@@ -76,10 +76,13 @@ public ReactiveStreamsConsumer(MessageChannel inputChannel, final Subscriber<Mes
7676
this.subscriber = subscriber;
7777
this.lifecycleDelegate = subscriber instanceof Lifecycle ? (Lifecycle) subscriber : null;
7878
if (subscriber instanceof MessageHandlerSubscriber) {
79-
this.messageHandler = ((MessageHandlerSubscriber) subscriber).messageHandler;
79+
this.handler = ((MessageHandlerSubscriber) subscriber).messageHandler;
80+
}
81+
else if (subscriber instanceof MessageHandler) {
82+
this.handler = (MessageHandler) subscriber;
8083
}
8184
else {
82-
this.messageHandler = this.subscriber::onNext;
85+
this.handler = this.subscriber::onNext;
8386
}
8487
}
8588

@@ -94,11 +97,11 @@ public MessageChannel getInputChannel() {
9497

9598
@Override
9699
public MessageChannel getOutputChannel() {
97-
if (this.messageHandler instanceof MessageProducer) {
98-
return ((MessageProducer) this.messageHandler).getOutputChannel();
100+
if (this.handler instanceof MessageProducer) {
101+
return ((MessageProducer) this.handler).getOutputChannel();
99102
}
100-
else if (this.messageHandler instanceof MessageRouter) {
101-
return ((MessageRouter) this.messageHandler).getDefaultOutputChannel();
103+
else if (this.handler instanceof MessageRouter) {
104+
return ((MessageRouter) this.handler).getDefaultOutputChannel();
102105
}
103106
else {
104107
return null;
@@ -107,7 +110,7 @@ else if (this.messageHandler instanceof MessageRouter) {
107110

108111
@Override
109112
public MessageHandler getHandler() {
110-
return this.messageHandler;
113+
return this.handler;
111114
}
112115

113116
@Override

spring-integration-test/src/main/java/org/springframework/integration/test/context/MockIntegrationContext.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.integration.core.MessageProducer;
3131
import org.springframework.integration.core.MessageSource;
3232
import org.springframework.integration.endpoint.IntegrationConsumer;
33+
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
3334
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
3435
import org.springframework.integration.test.mock.MockMessageHandler;
3536
import org.springframework.integration.test.util.TestUtils;
@@ -38,6 +39,9 @@
3839
import org.springframework.util.Assert;
3940
import org.springframework.util.ObjectUtils;
4041

42+
import reactor.util.function.Tuple2;
43+
import reactor.util.function.Tuples;
44+
4145
/**
4246
* A {@link BeanFactoryAware} component with an API to customize real beans
4347
* in the application context from test code.
@@ -91,10 +95,17 @@ public void resetBeans(String... beanNames) {
9195
if (endpoint instanceof SourcePollingChannelAdapter) {
9296
directFieldAccessor.setPropertyValue("source", e.getValue());
9397
}
98+
else if (endpoint instanceof ReactiveStreamsConsumer) {
99+
Tuple2<?, ?> value = (Tuple2<?, ?>) e.getValue();
100+
directFieldAccessor.setPropertyValue("handler", value.getT1());
101+
directFieldAccessor.setPropertyValue("subscriber", value.getT2());
102+
}
94103
else if (endpoint instanceof IntegrationConsumer) {
95104
directFieldAccessor.setPropertyValue("handler", e.getValue());
96105
}
97106
});
107+
108+
this.beans.clear();
98109
}
99110

100111
/**
@@ -137,7 +148,13 @@ public void substituteMessageHandlerFor(String consumerEndpointId, MessageHandle
137148
}
138149
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(endpoint);
139150
Object targetMessageHandler = directFieldAccessor.getPropertyValue("handler");
140-
this.beans.put(consumerEndpointId, targetMessageHandler);
151+
if (endpoint instanceof ReactiveStreamsConsumer) {
152+
Object targetSubscriber = directFieldAccessor.getPropertyValue("subscriber");
153+
this.beans.put(consumerEndpointId, Tuples.of(targetMessageHandler, targetSubscriber));
154+
}
155+
else {
156+
this.beans.put(consumerEndpointId, targetMessageHandler);
157+
}
141158

142159
if (mockMessageHandler instanceof MessageProducer) {
143160
if (targetMessageHandler instanceof MessageProducer) {
@@ -160,6 +177,10 @@ public void substituteMessageHandlerFor(String consumerEndpointId, MessageHandle
160177

161178
directFieldAccessor.setPropertyValue("handler", mockMessageHandler);
162179

180+
if (endpoint instanceof ReactiveStreamsConsumer) {
181+
directFieldAccessor.setPropertyValue("subscriber", mockMessageHandler);
182+
}
183+
163184
if (autoStartup && endpoint instanceof Lifecycle) {
164185
((Lifecycle) endpoint).start();
165186
}

spring-integration-test/src/test/java/org/springframework/integration/test/mock/MockMessageHandlerTests.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.hamcrest.Matchers.instanceOf;
2121
import static org.junit.Assert.assertEquals;
2222
import static org.junit.Assert.assertNotNull;
23+
import static org.junit.Assert.assertNotSame;
2324
import static org.junit.Assert.assertSame;
2425
import static org.junit.Assert.assertThat;
2526
import static org.junit.Assert.fail;
@@ -36,6 +37,7 @@
3637
import org.junit.Test;
3738
import org.junit.runner.RunWith;
3839
import org.mockito.ArgumentCaptor;
40+
import org.reactivestreams.Subscriber;
3941

4042
import org.springframework.beans.factory.annotation.Autowired;
4143
import org.springframework.context.ApplicationContext;
@@ -46,7 +48,7 @@
4648
import org.springframework.integration.channel.DirectChannel;
4749
import org.springframework.integration.channel.QueueChannel;
4850
import org.springframework.integration.config.EnableIntegration;
49-
import org.springframework.integration.endpoint.EventDrivenConsumer;
51+
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
5052
import org.springframework.integration.expression.ValueExpression;
5153
import org.springframework.integration.handler.ExpressionEvaluatingMessageHandler;
5254
import org.springframework.integration.support.MessageBuilder;
@@ -183,7 +185,8 @@ public void testMockRawHandler() {
183185
ArgumentCaptor<Message<?>> messageArgumentCaptor = MockIntegration.messageArgumentCaptor();
184186
MessageHandler mockMessageHandler =
185187
spy(mockMessageHandler(messageArgumentCaptor))
186-
.handleNext(m -> { });
188+
.handleNext(m -> {
189+
});
187190

188191
String endpointId = "rawHandlerConsumer";
189192
this.mockIntegrationContext.substituteMessageHandlerFor(endpointId, mockMessageHandler);
@@ -214,6 +217,11 @@ public void testMockRawHandler() {
214217
assertThat(e, instanceOf(IllegalStateException.class));
215218
assertThat(e.getMessage(), containsString("with replies can't replace simple MessageHandler"));
216219
}
220+
221+
this.mockIntegrationContext.resetBeans();
222+
223+
assertNotSame(mockMessageHandler, TestUtils.getPropertyValue(endpoint, "handler", MessageHandler.class));
224+
assertNotSame(mockMessageHandler, TestUtils.getPropertyValue(endpoint, "subscriber", Subscriber.class));
217225
}
218226

219227
/**
@@ -272,9 +280,9 @@ public SubscribableChannel rawChannel() {
272280
}
273281

274282
@Bean
275-
public EventDrivenConsumer rawHandlerConsumer() {
276-
return new EventDrivenConsumer(rawChannel(),
277-
new ExpressionEvaluatingMessageHandler(new ValueExpression<>("test")));
283+
public ReactiveStreamsConsumer rawHandlerConsumer() {
284+
return new ReactiveStreamsConsumer(rawChannel(),
285+
(MessageHandler) new ExpressionEvaluatingMessageHandler(new ValueExpression<>("test")));
278286
}
279287

280288
@ServiceActivator(inputChannel = "startChannel", outputChannel = "nextChannel")
@@ -291,7 +299,8 @@ public ArgumentCaptor<Message<?>> argumentCaptorForOutputTest() {
291299
@ServiceActivator(inputChannel = "nextChannel")
292300
public MessageHandler handleNextInput() {
293301
return mockMessageHandler(argumentCaptorForOutputTest())
294-
.handleNext(m -> { });
302+
.handleNext(m -> {
303+
});
295304
}
296305

297306
}

src/reference/asciidoc/testing.adoc

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,14 +229,29 @@ public void testMockMessageSource() {
229229
}
230230
----
231231

232-
See their JavaDocs for more information.
232+
NOTE: The `mySourceEndpoint` refers here to the bean name of the `SourcePollingChannelAdapter` for which we replace the real `MessageSource` with our mock.
233+
Similarly the `MockIntegrationContext.substituteMessageHandlerFor()` expects a bean name for the `IntegrationConsumer`, which wraps a `MessageHandler` as an endpoint.
234+
235+
After test is performed you can restore the state of endpoint beans to the real configuration using `MockIntegrationContext.resetBeans()`:
236+
237+
====
238+
[source,java]
239+
----
240+
@After
241+
public void tearDown() {
242+
this.mockIntegrationContext.resetBeans();
243+
}
244+
----
245+
====
246+
247+
See the https://docs.spring.io/spring-integration/api/org/springframework/integration/test/context/MockIntegrationContext.html[Javadoc] for more information.
233248

234249
[[testing-mocks]]
235250
=== Integration Mocks
236251

237-
The `org.springframework.integration.test.mock` package offers tools and utilities for mocking, stubbing and verification of activity on Spring Integration components.
238-
The mocking functionality is fully based and compatible with the well known Mockito Framework.
239-
(The current Mockito transitive dependency is of _version 2.5.x_.)
252+
The `org.springframework.integration.test.mock` package offers tools and utilities for mocking, stubbing, and verification of activity on Spring Integration components.
253+
The mocking functionality is fully based on and compatible with the well known Mockito Framework.
254+
(The current Mockito transitive dependency is on version 2.5.x or higher.)
240255

241256
==== MockIntegration
242257

0 commit comments

Comments
 (0)