Skip to content

Commit 95c9166

Browse files
committed
GH-3989 Consder the custom name of the reply topic name in sendAndReceive
Signed-off-by: mipo256 <[email protected]>
1 parent 8126e4b commit 95c9166

File tree

7 files changed

+138
-27
lines changed

7 files changed

+138
-27
lines changed

build.gradle

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
2+
import org.jetbrains.kotlin.gradle.tasks.KotlinCompilationTask
3+
14
buildscript {
25
ext.kotlinVersion = '2.1.21'
36
ext.isCI = System.getenv('GITHUB_ACTION')
@@ -147,9 +150,9 @@ configure(javaProjects) { subproject ->
147150
options.encoding = 'UTF-8'
148151
}
149152

150-
tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompilationTask).configureEach {
153+
tasks.withType(KotlinCompilationTask).configureEach {
151154
compilerOptions {
152-
jvmTarget = org.jetbrains.kotlin.gradle.dsl.JvmTarget.JVM_17
155+
jvmTarget = JvmTarget.JVM_17
153156
javaParameters = true
154157
allWarningsAsErrors = true
155158
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAware.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
*
2424
* @author Gary Russell
2525
* @since 2.5
26-
*
2726
*/
2827
@FunctionalInterface
2928
public interface DeliveryAttemptAware {

spring-kafka/src/main/java/org/springframework/kafka/requestreply/CorrelationKey.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,16 @@ public final class CorrelationKey {
3535

3636
private final byte[] correlationId;
3737

38+
/**
39+
* Cached hex representation of the {@link #correlationId}.
40+
* TODO: Migrate to stable values JEP 502
41+
*/
3842
private @Nullable String asString;
3943

44+
/**
45+
* Cached hash code.
46+
* TODO: Migrate to stable values JEP 502
47+
*/
4048
private volatile @Nullable Integer hashCode;
4149

4250
public CorrelationKey(byte[] correlationId) { // NOSONAR array reference

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.time.Instant;
2323
import java.util.Collection;
2424
import java.util.List;
25+
import java.util.Optional;
2526
import java.util.UUID;
2627
import java.util.concurrent.ConcurrentHashMap;
2728
import java.util.concurrent.ConcurrentMap;
@@ -73,6 +74,7 @@
7374
* @author Artem Bilan
7475
* @author Borahm Lee
7576
* @author Francois Rosiere
77+
* @author Mikhail Polivakha
7678
*
7779
* @since 2.1.3
7880
*
@@ -415,19 +417,17 @@ public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record) {
415417
@Override
416418
public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @Nullable Duration replyTimeout) {
417419
Assert.state(this.running, "Template has not been started"); // NOSONAR (sync)
418-
Duration timeout = replyTimeout;
419-
if (timeout == null) {
420-
timeout = this.defaultReplyTimeout;
421-
}
420+
Duration timeout = Optional.ofNullable(replyTimeout).orElse(this.defaultReplyTimeout);
422421
CorrelationKey correlationId = this.correlationStrategy.apply(record);
423422
Assert.notNull(correlationId, "the created 'correlationId' cannot be null");
424423
Headers headers = record.headers();
425-
boolean hasReplyTopic = headers.lastHeader(KafkaHeaders.REPLY_TOPIC) != null;
426-
if (!hasReplyTopic && this.replyTopic != null) {
424+
boolean replyTopicIsUnset = headers.lastHeader(this.replyTopicHeaderName) == null;
425+
if (replyTopicIsUnset && this.replyTopic != null) {
427426
headers.add(new RecordHeader(this.replyTopicHeaderName, this.replyTopic));
428-
if (this.replyPartition != null) {
429-
headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
430-
}
427+
}
428+
boolean replyPartitionIsUnset = headers.lastHeader(this.replyPartitionHeaderName) != null;
429+
if (replyPartitionIsUnset && this.replyPartition != null) {
430+
headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
431431
}
432432
Object correlation = this.binaryCorrelation ? correlationId : correlationId.toString();
433433
byte[] correlationValue = this.binaryCorrelation

spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyFuture.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
/**
2727
* A {@link CompletableFuture} for requests/replies.
2828
*
29-
* @param <K> the key type.
29+
* @param <K> the key type of inbound kafka message.
3030
* @param <V> the outbound data type.
3131
* @param <R> the reply data type.
3232
*

spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.List;
2727
import java.util.Locale;
2828
import java.util.Map;
29+
import java.util.Optional;
2930
import java.util.Set;
3031
import java.util.stream.Collectors;
3132

@@ -47,9 +48,9 @@
4748
* @author Artem Bilan
4849
* @author Sanghyeok An
4950
* @author Soby Chacko
51+
* @author Mikhail Polivakha
5052
*
5153
* @since 2.1.3
52-
*
5354
*/
5455
public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper {
5556

@@ -269,11 +270,7 @@ private boolean doesMatchInternal(String header) {
269270
* @since 2.2.5
270271
*/
271272
protected Object headerValueToAddOut(String key, Object value) {
272-
Object valueToAdd = mapRawOut(key, value);
273-
if (valueToAdd == null) {
274-
valueToAdd = value;
275-
}
276-
return valueToAdd;
273+
return Optional.ofNullable((Object) mapRawOut(key, value)).orElse(value);
277274
}
278275

279276
/**
@@ -291,12 +288,8 @@ protected boolean doesMatchMultiValueHeader(String headerName) {
291288
}
292289

293290
private boolean doesMatchMultiValueHeaderInternal(String headerName) {
294-
for (HeaderMatcher headerMatcher : this.multiValueHeaderMatchers) {
295-
if (headerMatcher.matchHeader(headerName)) {
296-
return true;
297-
}
298-
}
299-
return false;
291+
return this.multiValueHeaderMatchers.stream()
292+
.anyMatch(headerMatcher -> headerMatcher.matchHeader(headerName));
300293
}
301294

302295
/**

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.atomic.AtomicInteger;
3434
import java.util.concurrent.atomic.AtomicReference;
3535

36+
import com.google.common.collect.Iterables;
3637
import org.apache.kafka.clients.consumer.Consumer;
3738
import org.apache.kafka.clients.consumer.ConsumerConfig;
3839
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -102,12 +103,14 @@
102103
* @author Gary Russell
103104
* @author Nathan Xu
104105
* @author Soby Chacko
106+
* @author Mikhail Polivakha
105107
* @since 2.1.3
106108
*
107109
*/
108110
@SpringJUnitConfig
109111
@DirtiesContext
110-
@EmbeddedKafka(partitions = 5, topics = { ReplyingKafkaTemplateTests.A_REPLY, ReplyingKafkaTemplateTests.A_REQUEST,
112+
@EmbeddedKafka(partitions = 5, topics = {
113+
ReplyingKafkaTemplateTests.A_REPLY, ReplyingKafkaTemplateTests.A_REQUEST,
111114
ReplyingKafkaTemplateTests.B_REPLY, ReplyingKafkaTemplateTests.B_REQUEST,
112115
ReplyingKafkaTemplateTests.C_REPLY, ReplyingKafkaTemplateTests.C_REQUEST,
113116
ReplyingKafkaTemplateTests.D_REPLY, ReplyingKafkaTemplateTests.D_REQUEST,
@@ -119,7 +122,10 @@
119122
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST,
120123
ReplyingKafkaTemplateTests.K_REPLY, ReplyingKafkaTemplateTests.K_REQUEST,
121124
ReplyingKafkaTemplateTests.L_REPLY, ReplyingKafkaTemplateTests.L_REQUEST,
122-
ReplyingKafkaTemplateTests.M_REPLY, ReplyingKafkaTemplateTests.M_REQUEST })
125+
ReplyingKafkaTemplateTests.M_REPLY, ReplyingKafkaTemplateTests.M_REQUEST,
126+
ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_REPLY, ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_REQUEST,
127+
ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY, ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST,
128+
})
123129
public class ReplyingKafkaTemplateTests {
124130

125131
public static final String A_REPLY = "aReply";
@@ -174,6 +180,18 @@ public class ReplyingKafkaTemplateTests {
174180

175181
public static final String M_REQUEST = "mRequest";
176182

183+
// GH-3989
184+
public static final String CUSTOM_REPLY_HEADER_REPLY = "CUSTOM_REPLY_HEADER_REPLY";
185+
186+
// GH-3989
187+
public static final String CUSTOM_REPLY_HEADER_REQUEST = "CUSTOM_REPLY_HEADER_REQUEST";
188+
189+
// GH-3989
190+
public static final String CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY = "CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY";
191+
192+
// GH-3989
193+
public static final String CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST = "CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST";
194+
177195
@Autowired
178196
private EmbeddedKafkaBroker embeddedKafka;
179197

@@ -365,6 +383,54 @@ public void testHandlerReturn() throws Exception {
365383
}
366384
}
367385

386+
@Test
387+
public void testCustomReplyTopicHeaderIsNotDuplicated() throws Exception {
388+
String customReplyHeaderName = "X-Custom-Reply-Header";
389+
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(CUSTOM_REPLY_HEADER_REPLY);
390+
template.setReplyTopicHeaderName(customReplyHeaderName);
391+
try {
392+
Message<String> message = MessageBuilder.withPayload("foo")
393+
.setHeader(customReplyHeaderName, CUSTOM_REPLY_HEADER_REPLY)
394+
.setHeader(KafkaHeaders.TOPIC, CUSTOM_REPLY_HEADER_REQUEST)
395+
.build();
396+
397+
RequestReplyMessageFuture<Integer, String> future = template.sendAndReceive(message, Duration.ofSeconds(30));
398+
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
399+
Message<?> resultingMessage = future.get(30, TimeUnit.SECONDS);
400+
assertThat(resultingMessage.getPayload()).isEqualTo("OK");
401+
}
402+
finally {
403+
template.stop();
404+
template.destroy();
405+
}
406+
}
407+
408+
@Test
409+
public void testCustomReplyHeadersAreNotDuplicated() throws Exception {
410+
String customReplyTopicHeaderName = "X-Custom-Reply-Header";
411+
String customReplyPartitionHeaderName = "X-Custom-Reply-Partition";
412+
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY);
413+
template.setReplyTopicHeaderName(customReplyTopicHeaderName);
414+
template.setReplyPartitionHeaderName(customReplyPartitionHeaderName);
415+
416+
try {
417+
Message<String> message = MessageBuilder.withPayload("foo")
418+
.setHeader(customReplyTopicHeaderName, CUSTOM_REPLY_HEADER_REPLY)
419+
.setHeader(customReplyPartitionHeaderName, "test-partition")
420+
.setHeader(KafkaHeaders.TOPIC, CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST)
421+
.build();
422+
423+
RequestReplyMessageFuture<Integer, String> future = template.sendAndReceive(message, Duration.ofSeconds(30));
424+
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
425+
Message<?> resultingMessage = future.get(30, TimeUnit.SECONDS);
426+
assertThat(resultingMessage.getPayload()).isEqualTo("OK");
427+
}
428+
finally {
429+
template.stop();
430+
template.destroy();
431+
}
432+
}
433+
368434
@Test
369435
public void testMessageReturnNoHeadersProvidedByListener() throws Exception {
370436
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(H_REPLY);
@@ -958,6 +1024,48 @@ public String handleA(String in) throws InterruptedException {
9581024
return in.toUpperCase();
9591025
}
9601026

1027+
// GH-3989
1028+
@KafkaListener(id = CUSTOM_REPLY_HEADER_REQUEST, topics = CUSTOM_REPLY_HEADER_REQUEST)
1029+
@SendTo(value = CUSTOM_REPLY_HEADER_REPLY) // send to custom topic back
1030+
public String handleCustomReplyHeaderNoReplyPartition(ConsumerRecord<?, String> inputMessage) {
1031+
Headers headers = inputMessage.headers();
1032+
1033+
if (Iterables.size(headers.headers("X-Custom-Reply-Header")) != 1) {
1034+
return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once";
1035+
}
1036+
1037+
if (Iterables.size(headers.headers(KafkaHeaders.REPLY_PARTITION)) != 0) {
1038+
return "It is expected that the user does NOT specify the reply partition in this test case";
1039+
}
1040+
1041+
if (!"foo".equals(inputMessage.value())) {
1042+
return "Expected message is 'foo', but got %s".formatted(inputMessage.value());
1043+
}
1044+
1045+
return "OK";
1046+
}
1047+
1048+
// GH-3989
1049+
@KafkaListener(id = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST, topics = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST)
1050+
@SendTo(value = CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY) // send to custom topic back
1051+
public String handleCustomReplyHeaderDefaultPartitionHeader(ConsumerRecord<?, String> inputMessage) {
1052+
Headers headers = inputMessage.headers();
1053+
1054+
if (Iterables.size(headers.headers("X-Custom-Reply-Header")) != 1) {
1055+
return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once";
1056+
}
1057+
1058+
if (Iterables.size(headers.headers("X-Custom-Reply-Partition")) != 1) {
1059+
return "Executed a single reply partition header '%s' in the incoming message".formatted(KafkaHeaders.REPLY_PARTITION);
1060+
}
1061+
1062+
if (!"foo".equals(inputMessage.value())) {
1063+
return "Expected message is 'foo', but got %s".formatted(inputMessage.value());
1064+
}
1065+
1066+
return "OK";
1067+
}
1068+
9611069
@KafkaListener(topics = B_REQUEST, containerFactory = "simpleMapperFactory")
9621070
@SendTo // default REPLY_TOPIC header
9631071
public String handleB(String in) {

0 commit comments

Comments
 (0)