Skip to content

Commit 684dd5a

Browse files
matthjesartembilan
authored andcommitted
INT-4341: RedisQueueIn: support receiveTimeout=0
JIRA: https://jira.spring.io/browse/INT-4341 Fixes #2228 Add to the `RedisQueueInboundGateway` and `RedisQueueMessageDrivenEndpoint`support for the `receiveTimeout` of 0 and block indefinitely Added some simple tests to check if the receive timeout can be set to 0 **Cherry-pick to 4.3.x** # Conflicts: # spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueInboundGateway.java # spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpoint.java # spring-integration-redis/src/test/java/org/springframework/integration/redis/config/RedisQueueInboundChannelAdapterParserTests-context.xml # spring-integration-redis/src/test/java/org/springframework/integration/redis/config/RedisQueueInboundChannelAdapterParserTests.java
1 parent d3928b0 commit 684dd5a

File tree

4 files changed

+38
-9
lines changed

4 files changed

+38
-9
lines changed

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueInboundGateway.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2015 the original author or authors
2+
* Copyright 2014-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -45,6 +45,8 @@
4545
* @author David Liu
4646
* @author Artem Bilan
4747
* @author Gary Russell
48+
* @author Matthias Jeschke
49+
*
4850
* @since 4.1
4951
*/
5052
@ManagedResource
@@ -125,7 +127,7 @@ public void setSerializer(RedisSerializer<?> serializer) {
125127
* @param receiveTimeout Must be non-negative. Specified in milliseconds.
126128
*/
127129
public void setReceiveTimeout(long receiveTimeout) {
128-
Assert.isTrue(receiveTimeout > 0, "'receiveTimeout' must be > 0.");
130+
Assert.isTrue(receiveTimeout >= 0, "'receiveTimeout' must be >= 0.");
129131
this.receiveTimeout = receiveTimeout;
130132
}
131133

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpoint.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2016 the original author or authors
2+
* Copyright 2013-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,6 +48,8 @@
4848
* @author Artem Bilan
4949
* @author Gary Russell
5050
* @author Rainer Frey
51+
* @author Matthias Jeschke
52+
*
5153
* @since 3.0
5254
*/
5355
@ManagedResource
@@ -131,7 +133,7 @@ public void setExpectMessage(boolean expectMessage) {
131133
* @param receiveTimeout Must be non-negative. Specified in milliseconds.
132134
*/
133135
public void setReceiveTimeout(long receiveTimeout) {
134-
Assert.isTrue(receiveTimeout > 0, "'receiveTimeout' must be > 0.");
136+
Assert.isTrue(receiveTimeout >= 0, "'receiveTimeout' must be >= 0.");
135137
this.receiveTimeout = receiveTimeout;
136138
}
137139

spring-integration-redis/src/test/java/org/springframework/integration/redis/config/RedisQueueInboundChannelAdapterParserTests-context.xml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111

1212
<bean id="redisConnectionFactory"
1313
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
14-
<property name="port" value="#{T(org.springframework.integration.redis.rules.RedisAvailableRule).REDIS_PORT}"/>
14+
<property name="port"
15+
value="#{T(org.springframework.integration.redis.rules.RedisAvailableRule).REDIS_PORT}"/>
1516
</bean>
1617

1718
<bean id="customRedisConnectionFactory" parent="redisConnectionFactory"/>
@@ -33,9 +34,16 @@
3334
auto-startup="false"
3435
phase="100"/>
3536

37+
<int-redis:queue-inbound-channel-adapter id="zeroReceiveTimeoutAdapter"
38+
queue="si.test.Int3017.Inbound2"
39+
channel="sendChannel"
40+
connection-factory="customRedisConnectionFactory"
41+
receive-timeout="0"/>
42+
3643
<bean id="executor" class="org.springframework.integration.util.ErrorHandlingTaskExecutor">
3744
<constructor-arg ref="threadPoolTaskExecutor"/>
38-
<constructor-arg value="#{T(org.springframework.scheduling.support.TaskUtils).LOG_AND_SUPPRESS_ERROR_HANDLER}"/>
45+
<constructor-arg
46+
value="#{T(org.springframework.scheduling.support.TaskUtils).LOG_AND_SUPPRESS_ERROR_HANDLER}"/>
3947
</bean>
4048

4149
<task:executor id="threadPoolTaskExecutor" pool-size="5"/>

spring-integration-redis/src/test/java/org/springframework/integration/redis/config/RedisQueueInboundChannelAdapterParserTests.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2015 the original author or authors.
2+
* Copyright 2013-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,9 +41,12 @@
4141
import org.springframework.test.context.ContextConfiguration;
4242
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
4343

44+
4445
/**
4546
* @author Artem Bilan
4647
* @author Gary Russell
48+
* @author Matthias Jeschke
49+
*
4750
* @since 3.0
4851
*/
4952
@ContextConfiguration
@@ -71,6 +74,10 @@ public class RedisQueueInboundChannelAdapterParserTests {
7174
@Qualifier("customAdapter")
7275
private RedisQueueMessageDrivenEndpoint customAdapter;
7376

77+
@Autowired
78+
@Qualifier("zeroReceiveTimeoutAdapter")
79+
private RedisQueueMessageDrivenEndpoint zeroReceiveTimeoutAdapter;
80+
7481
@Autowired
7582
@Qualifier("errorChannel")
7683
private MessageChannel errorChannel;
@@ -87,11 +94,13 @@ public class RedisQueueInboundChannelAdapterParserTests {
8794
@Autowired
8895
private RedisSerializer<?> serializer;
8996

97+
9098
@Test
9199
public void testInt3017DefaultConfig() {
92100
assertSame(this.connectionFactory,
93101
TestUtils.getPropertyValue(this.defaultAdapter, "boundListOperations.ops.template.connectionFactory"));
94-
assertEquals("si.test.Int3017.Inbound1", TestUtils.getPropertyValue(this.defaultAdapter, "boundListOperations.key"));
102+
assertEquals("si.test.Int3017.Inbound1",
103+
TestUtils.getPropertyValue(this.defaultAdapter, "boundListOperations.key"));
95104
assertFalse(TestUtils.getPropertyValue(this.defaultAdapter, "expectMessage", Boolean.class));
96105
assertEquals(1000L, TestUtils.getPropertyValue(this.defaultAdapter, "receiveTimeout"));
97106
assertEquals(5000L, TestUtils.getPropertyValue(this.defaultAdapter, "recoveryInterval"));
@@ -105,11 +114,13 @@ public void testInt3017DefaultConfig() {
105114
assertSame(this.defaultAdapterChannel, TestUtils.getPropertyValue(this.defaultAdapter, "outputChannel"));
106115
}
107116

117+
108118
@Test
109119
public void testInt3017CustomConfig() {
110120
assertSame(this.customRedisConnectionFactory,
111121
TestUtils.getPropertyValue(this.customAdapter, "boundListOperations.ops.template.connectionFactory"));
112-
assertEquals("si.test.Int3017.Inbound2", TestUtils.getPropertyValue(this.customAdapter, "boundListOperations.key"));
122+
assertEquals("si.test.Int3017.Inbound2",
123+
TestUtils.getPropertyValue(this.customAdapter, "boundListOperations.key"));
113124
assertTrue(TestUtils.getPropertyValue(this.customAdapter, "expectMessage", Boolean.class));
114125
assertEquals(2000L, TestUtils.getPropertyValue(this.customAdapter, "receiveTimeout"));
115126
assertEquals(3000L, TestUtils.getPropertyValue(this.customAdapter, "recoveryInterval"));
@@ -121,4 +132,10 @@ public void testInt3017CustomConfig() {
121132
assertSame(this.sendChannel, TestUtils.getPropertyValue(this.customAdapter, "outputChannel"));
122133
}
123134

135+
136+
@Test
137+
public void testInt4341ZeroReceiveTimeoutConfig() {
138+
assertEquals(0L, TestUtils.getPropertyValue(this.zeroReceiveTimeoutAdapter, "receiveTimeout"));
139+
}
140+
124141
}

0 commit comments

Comments
 (0)