Skip to content

Commit d766d23

Browse files
committed
Fix ReactiveStreamsConsumerTests and Checkstyle
Also increase timeouts in the `RedisAvailableRule`
1 parent a834ce9 commit d766d23

File tree

3 files changed

+20
-17
lines changed

3 files changed

+20
-17
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public FluxMessageChannel() {
6060

6161
@Override
6262
protected boolean doSend(Message<?> message, long timeout) {
63-
Assert.state(subscribers.size() > 0, () -> "The [" + this + "] doesn't have subscribers to accept messages");
63+
Assert.state(this.subscribers.size() > 0,
64+
() -> "The [" + this + "] doesn't have subscribers to accept messages");
6465
this.sink.next(message);
6566
return true;
6667
}

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,14 @@ public void testReactiveStreamsConsumerFluxMessageChannel() throws InterruptedEx
8888

8989
reactiveConsumer.stop();
9090

91-
testChannel.send(testMessage);
91+
try {
92+
testChannel.send(testMessage);
93+
}
94+
catch (Exception e) {
95+
assertThat(e, instanceOf(MessageDeliveryException.class));
96+
assertThat(e.getCause(), instanceOf(IllegalStateException.class));
97+
assertThat(e.getMessage(), containsString("doesn't have subscribers to accept messages"));
98+
}
9299

93100
reactiveConsumer.start();
94101

@@ -246,7 +253,14 @@ public void testReactiveStreamsConsumerViaConsumerEndpointFactoryBean() throws E
246253

247254
endpointFactoryBean.stop();
248255

249-
testChannel.send(testMessage);
256+
try {
257+
testChannel.send(testMessage);
258+
}
259+
catch (Exception e) {
260+
assertThat(e, instanceOf(MessageDeliveryException.class));
261+
assertThat(e.getCause(), instanceOf(IllegalStateException.class));
262+
assertThat(e.getMessage(), containsString("doesn't have subscribers to accept messages"));
263+
}
250264

251265
endpointFactoryBean.start();
252266

@@ -260,16 +274,4 @@ public void testReactiveStreamsConsumerViaConsumerEndpointFactoryBean() throws E
260274
assertThat(result, Matchers.<Message<?>>contains(testMessage, testMessage2, testMessage2));
261275
}
262276

263-
@Test
264-
public void testFluxMessageChannelSendWithoutSubscription() {
265-
try {
266-
new FluxMessageChannel().send(new GenericMessage<>("foo"));
267-
}
268-
catch (Exception e) {
269-
assertThat(e, instanceOf(MessageDeliveryException.class));
270-
assertThat(e.getCause(), instanceOf(IllegalStateException.class));
271-
assertThat(e.getMessage(), containsString("doesn't have subscribers to accept messages"));
272-
}
273-
}
274-
275277
}

spring-integration-redis/src/test/java/org/springframework/integration/redis/rules/RedisAvailableRule.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ public Statement apply(final Statement base, final FrameworkMethod method, Objec
4747
redisStandaloneConfiguration.setPort(REDIS_PORT);
4848

4949
JedisClientConfiguration clientConfiguration = JedisClientConfiguration.builder()
50-
.connectTimeout(Duration.ofSeconds(10))
51-
.readTimeout(Duration.ofSeconds(10))
50+
.connectTimeout(Duration.ofSeconds(20))
51+
.readTimeout(Duration.ofSeconds(20))
5252
.build();
5353

5454
connectionFactory = new JedisConnectionFactory(redisStandaloneConfiguration, clientConfiguration);

0 commit comments

Comments
 (0)