Skip to content

Commit 220a5aa

Browse files
garyrussellartembilan
authored andcommitted
GH-1244: ReplyingKTemplate reply timeout per send
Resolves #1244 Overloaded `sendAndReceive` with a timeout parameter
1 parent 8f4ca4a commit 220a5aa

File tree

5 files changed

+97
-23
lines changed

5 files changed

+97
-23
lines changed

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaOperations.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616

1717
package org.springframework.kafka.requestreply;
1818

19+
import java.time.Duration;
20+
1921
import org.apache.kafka.clients.producer.ProducerRecord;
2022

23+
import org.springframework.lang.Nullable;
24+
2125
/**
2226
* Request/reply operations.
2327
*
@@ -32,10 +36,19 @@
3236
public interface ReplyingKafkaOperations<K, V, R> {
3337

3438
/**
35-
* Send a request and receive a reply.
39+
* Send a request and receive a reply with the default timeout.
3640
* @param record the record to send.
3741
* @return a RequestReplyFuture.
3842
*/
3943
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
4044

45+
/**
46+
* Send a request and receive a reply.
47+
* @param record the record to send.
48+
* @param replyTimeout the reply timeout; if null, the default will be used.
49+
* @return a RequestReplyFuture.
50+
* @since 2.3
51+
*/
52+
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @Nullable Duration replyTimeout);
53+
4154
}

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.nio.ByteBuffer;
2020
import java.nio.charset.StandardCharsets;
21+
import java.time.Duration;
2122
import java.time.Instant;
2223
import java.util.Collection;
2324
import java.util.List;
@@ -44,6 +45,7 @@
4445
import org.springframework.kafka.listener.GenericMessageListenerContainer;
4546
import org.springframework.kafka.support.KafkaHeaders;
4647
import org.springframework.kafka.support.TopicPartitionOffset;
48+
import org.springframework.lang.Nullable;
4749
import org.springframework.scheduling.TaskScheduler;
4850
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
4951
import org.springframework.util.Assert;
@@ -66,7 +68,9 @@ public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implemen
6668

6769
private static final String WITH_CORRELATION_ID = " with correlationId: ";
6870

69-
private static final long DEFAULT_REPLY_TIMEOUT = 5000L;
71+
private static final int FIVE = 5;
72+
73+
private static final Duration DEFAULT_REPLY_TIMEOUT = Duration.ofSeconds(FIVE);
7074

7175
private final GenericMessageListenerContainer<K, R> replyContainer;
7276

@@ -82,7 +86,7 @@ public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implemen
8286

8387
private boolean autoStartup = true;
8488

85-
private long replyTimeout = DEFAULT_REPLY_TIMEOUT;
89+
private Duration defaultReplyTimeout = DEFAULT_REPLY_TIMEOUT;
8690

8791
private boolean schedulerSet;
8892

@@ -149,13 +153,49 @@ public void setTaskScheduler(TaskScheduler scheduler) {
149153
this.schedulerSet = true;
150154
}
151155

156+
/**
157+
* Return the reply timeout used if no replyTimeout is provided in the
158+
* {@link #sendAndReceive(ProducerRecord, Duration)} call.
159+
* @return the timeout.
160+
* @deprecated in favor of {@link #getDefaultReplyTimeout()}.
161+
*/
162+
@Deprecated
152163
protected long getReplyTimeout() {
153-
return this.replyTimeout;
164+
return this.defaultReplyTimeout.toMillis();
154165
}
155166

167+
/**
168+
* Set the reply timeout used if no replyTimeout is provided in the
169+
* {@link #sendAndReceive(ProducerRecord, Duration)} call.
170+
* @param replyTimeout the timeout.
171+
* @deprecated in favor of {@link #setDefaultReplyTimeout(Duration)}.
172+
*/
173+
@Deprecated
156174
public void setReplyTimeout(long replyTimeout) {
157175
Assert.isTrue(replyTimeout >= 0, "'replyTimeout' must be >= 0");
158-
this.replyTimeout = replyTimeout;
176+
this.defaultReplyTimeout = Duration.ofMillis(replyTimeout);
177+
}
178+
179+
/**
180+
* Return the reply timeout used if no replyTimeout is provided in the
181+
* {@link #sendAndReceive(ProducerRecord, Duration)} call.
182+
* @return the timeout.
183+
* @since 2.3
184+
*/
185+
protected Duration getDefaultReplyTimeout() {
186+
return this.defaultReplyTimeout;
187+
}
188+
189+
/**
190+
* Set the reply timeout used if no replyTimeout is provided in the
191+
* {@link #sendAndReceive(ProducerRecord, Duration)} call.
192+
* @param defaultReplyTimeout the timeout.
193+
* @since 2.3
194+
*/
195+
public void setDefaultReplyTimeout(Duration defaultReplyTimeout) {
196+
Assert.notNull(defaultReplyTimeout, "'defaultReplyTimeout' cannot be null");
197+
Assert.isTrue(defaultReplyTimeout.toMillis() >= 0, "'replyTimeout' must be >= 0");
198+
this.defaultReplyTimeout = defaultReplyTimeout;
159199
}
160200

161201
@Override
@@ -282,6 +322,11 @@ public void stop(Runnable callback) {
282322

283323
@Override
284324
public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record) {
325+
return sendAndReceive(record, null);
326+
}
327+
328+
@Override
329+
public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @Nullable Duration replyTimeout) {
285330
Assert.state(this.running, "Template has not been start()ed"); // NOSONAR (sync)
286331
CorrelationKey correlationId = this.correlationStrategy.apply(record);
287332
Assert.notNull(correlationId, "the created 'correlationId' cannot be null");
@@ -304,11 +349,11 @@ public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record) {
304349
this.futures.remove(correlationId);
305350
throw new KafkaException("Send failed", e);
306351
}
307-
scheduleTimeout(record, correlationId);
352+
scheduleTimeout(record, correlationId, replyTimeout == null ? this.defaultReplyTimeout : replyTimeout);
308353
return future;
309354
}
310355

311-
private void scheduleTimeout(ProducerRecord<K, V> record, CorrelationKey correlationId) {
356+
private void scheduleTimeout(ProducerRecord<K, V> record, CorrelationKey correlationId, Duration replyTimeout) {
312357
this.scheduler.schedule(() -> {
313358
RequestReplyFuture<K, V, R> removed = this.futures.remove(correlationId);
314359
if (removed != null) {
@@ -317,7 +362,7 @@ private void scheduleTimeout(ProducerRecord<K, V> record, CorrelationKey correla
317362
removed.setException(new KafkaReplyTimeoutException("Reply timed out"));
318363
}
319364
}
320-
}, Instant.now().plusMillis(this.replyTimeout));
365+
}, Instant.now().plus(replyTimeout));
321366
}
322367

323368
/**

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.requestreply;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2021
import static org.assertj.core.api.Assertions.fail;
2122
import static org.mockito.ArgumentMatchers.any;
2223
import static org.mockito.ArgumentMatchers.isNull;
@@ -151,7 +152,7 @@ public void captureTestName(TestInfo info) {
151152
public void testGood() throws Exception {
152153
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(A_REPLY);
153154
try {
154-
template.setReplyTimeout(30_000);
155+
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
155156
Headers headers = new RecordHeaders();
156157
headers.add("baz", "buz".getBytes());
157158
ProducerRecord<Integer, String> record = new ProducerRecord<>(A_REQUEST, null, null, null, "foo", headers);
@@ -165,6 +166,11 @@ public void testGood() throws Exception {
165166
assertThat(receivedHeaders).hasSize(2);
166167
assertThat(this.registry.getListenerContainer(A_REQUEST).getContainerProperties().isMissingTopicsFatal())
167168
.isFalse();
169+
ProducerRecord<Integer, String> record2 =
170+
new ProducerRecord<>(A_REQUEST, null, null, null, "slow", headers);
171+
assertThatExceptionOfType(ExecutionException.class)
172+
.isThrownBy(() -> template.sendAndReceive(record2, Duration.ZERO).get(10, TimeUnit.SECONDS))
173+
.withCauseExactlyInstanceOf(KafkaReplyTimeoutException.class);
168174
}
169175
finally {
170176
template.stop();
@@ -176,7 +182,7 @@ public void testGood() throws Exception {
176182
public void testMultiListenerMessageReturn() throws Exception {
177183
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(C_REPLY);
178184
try {
179-
template.setReplyTimeout(30_000);
185+
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
180186
ProducerRecord<Integer, String> record = new ProducerRecord<>(C_REQUEST, "foo");
181187
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, C_REPLY.getBytes()));
182188
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
@@ -195,7 +201,7 @@ public void testGoodDefaultReplyHeaders() throws Exception {
195201
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(
196202
new TopicPartitionOffset(A_REPLY, 3));
197203
try {
198-
template.setReplyTimeout(30_000);
204+
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
199205
ProducerRecord<Integer, String> record = new ProducerRecord<>(A_REQUEST, "bar");
200206
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
201207
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
@@ -213,7 +219,7 @@ public void testGoodDefaultReplyHeaders() throws Exception {
213219
public void testGoodSamePartition() throws Exception {
214220
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(A_REPLY);
215221
try {
216-
template.setReplyTimeout(30_000);
222+
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
217223
ProducerRecord<Integer, String> record = new ProducerRecord<>(A_REQUEST, 2, null, "baz");
218224
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, A_REPLY.getBytes()));
219225
record.headers()
@@ -235,7 +241,7 @@ public void testGoodSamePartition() throws Exception {
235241
public void testTimeout() throws Exception {
236242
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(A_REPLY);
237243
try {
238-
template.setReplyTimeout(1);
244+
template.setDefaultReplyTimeout(Duration.ofMillis(1));
239245
ProducerRecord<Integer, String> record = new ProducerRecord<>(A_REQUEST, "fiz");
240246
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, A_REPLY.getBytes()));
241247
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
@@ -265,7 +271,7 @@ public void testTimeout() throws Exception {
265271
public void testGoodWithSimpleMapper() throws Exception {
266272
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(B_REPLY);
267273
try {
268-
template.setReplyTimeout(30_000);
274+
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
269275
Headers headers = new RecordHeaders();
270276
headers.add("baz", "buz".getBytes());
271277
ProducerRecord<Integer, String> record = new ProducerRecord<>(B_REQUEST, null, null, null, "qux", headers);
@@ -291,7 +297,7 @@ public void testAggregateNormal() throws Exception {
291297
AggregatingReplyingKafkaTemplate<Integer, String, String> template = aggregatingTemplate(
292298
new TopicPartitionOffset(D_REPLY, 0), 2);
293299
try {
294-
template.setReplyTimeout(30_000);
300+
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
295301
ProducerRecord<Integer, String> record = new ProducerRecord<>(D_REQUEST, null, null, null, "foo");
296302
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
297303
template.sendAndReceive(record);
@@ -320,7 +326,7 @@ public void testAggregateTimeout() throws Exception {
320326
AggregatingReplyingKafkaTemplate<Integer, String, String> template = aggregatingTemplate(
321327
new TopicPartitionOffset(E_REPLY, 0), 3);
322328
try {
323-
template.setReplyTimeout(5_000);
329+
template.setDefaultReplyTimeout(Duration.ofSeconds(5));
324330
ProducerRecord<Integer, String> record = new ProducerRecord<>(E_REQUEST, null, null, null, "foo");
325331
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
326332
template.sendAndReceive(record);
@@ -355,7 +361,7 @@ public void testAggregateTimeoutPartial() throws Exception {
355361
new TopicPartitionOffset(F_REPLY, 0), 3);
356362
template.setReturnPartialOnTimeout(true);
357363
try {
358-
template.setReplyTimeout(5_000);
364+
template.setDefaultReplyTimeout(Duration.ofSeconds(5));
359365
ProducerRecord<Integer, String> record = new ProducerRecord<>(F_REQUEST, null, null, null, "foo");
360366
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
361367
template.sendAndReceive(record);
@@ -395,7 +401,7 @@ public void testAggregateOrphansNotStored() throws Exception {
395401
return null;
396402
}).given(producer).send(any(), any());
397403
AggregatingReplyingKafkaTemplate template = new AggregatingReplyingKafkaTemplate(pf, container, coll -> true);
398-
template.setReplyTimeout(30_000);
404+
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
399405
template.start();
400406
List<ConsumerRecord> records = new ArrayList<>();
401407
ConsumerRecord record = new ConsumerRecord("two", 0, 0L, null, "test1");
@@ -497,7 +503,7 @@ public void withCustomHeaders() throws Exception {
497503
template.setReplyTopicHeaderName("custom.reply.to");
498504
template.setReplyPartitionHeaderName("custom.reply.partition");
499505
try {
500-
template.setReplyTimeout(30_000);
506+
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
501507
Headers headers = new RecordHeaders();
502508
ProducerRecord<Integer, String> record = new ProducerRecord<>(G_REQUEST, null, null, null, "foo", headers);
503509
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
@@ -575,7 +581,10 @@ public Map<String, Object> additionalHeaders() {
575581

576582
@KafkaListener(id = A_REQUEST, topics = A_REQUEST)
577583
@SendTo // default REPLY_TOPIC header
578-
public String handleA(String in) {
584+
public String handleA(String in) throws InterruptedException {
585+
if (in.equals("slow")) {
586+
Thread.sleep(50);
587+
}
579588
return in.toUpperCase();
580589
}
581590

src/reference/asciidoc/kafka.adoc

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -329,19 +329,24 @@ public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
329329

330330
Version 2.1.3 introduced a subclass of `KafkaTemplate` to provide request/reply semantics.
331331
The class is named `ReplyingKafkaTemplate` and has one method (in addition to those in the superclass).
332-
The following listing shows the method's signature:
332+
The following listing shows the method signatures:
333333

334334
====
335335
[source, java]
336336
----
337337
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
338+
339+
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
340+
Duration replyTimeout);
338341
----
339342
====
340343

341344
The result is a `ListenableFuture` that is asynchronously populated with the result (or an exception, for a timeout).
342345
The result also has a `sendFuture` property, which is the result of calling `KafkaTemplate.send()`.
343346
You can use this future to determine the result of the send operation.
344347

348+
If the first method is used, or the `replyTimeout` argument is `null`, the template's `defaultReplyTimeout` property is used (5 seconds by default).
349+
345350
The following Spring Boot application shows an example of how to use the feature:
346351

347352
====
@@ -359,9 +364,9 @@ public class KRequestingApplication {
359364
return args -> {
360365
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
361366
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
362-
SendResult<String, String> sendResult = replyFuture.getSendFuture().get();
367+
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
363368
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
364-
ConsumerRecord<String, String> consumerRecord = replyFuture.get();
369+
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
365370
System.out.println("Return value: " + consumerRecord.value());
366371
};
367372
}

src/reference/asciidoc/whats-new.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ See the javadocs, <<serdes>>, and <<serde>> for more informaion.
112112

113113
When a reply times out, the future is completed exceptionally with a `KafkaReplyTimeoutException` instead of a `KafkaException`.
114114

115+
Also, an overloaded `sendAndReceive` method is now provided that allows specifying the reply timeout on a per message basis.
116+
115117
==== AggregatingReplyingKafkaTemplate
116118

117119
Extends the `ReplyingKafkaTemplate` by aggregating replies from multiple receivers.

0 commit comments

Comments
 (0)