Skip to content

GH-3989 Consider the custom name of the reply topic name in sendAndRec… #3994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,16 @@ public final class CorrelationKey {

private final byte[] correlationId;

/**
* Cached hex representation of the {@link #correlationId}.
* TODO: Migrate to stable values JEP 502
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate what you mean by migrate to JEP 502 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that in JEP 502 we would have stable values in Java, which are lazily initilized final fields. The lazily computed hashcode and hex representation of correlation id is exactly that use case.

*/
private @Nullable String asString;

/**
* Cached hash code.
* TODO: Migrate to stable values JEP 502
*/
private volatile @Nullable Integer hashCode;

public CorrelationKey(byte[] correlationId) { // NOSONAR array reference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
* @author Artem Bilan
* @author Borahm Lee
* @author Francois Rosiere
* @author Mikhail Polivakha
*
* @since 2.1.3
*
Expand Down Expand Up @@ -422,12 +423,13 @@ public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @
CorrelationKey correlationId = this.correlationStrategy.apply(record);
Assert.notNull(correlationId, "the created 'correlationId' cannot be null");
Headers headers = record.headers();
boolean hasReplyTopic = headers.lastHeader(KafkaHeaders.REPLY_TOPIC) != null;
boolean hasReplyTopic = headers.lastHeader(this.replyTopicHeaderName) != null;
if (!hasReplyTopic && this.replyTopic != null) {
headers.add(new RecordHeader(this.replyTopicHeaderName, this.replyTopic));
if (this.replyPartition != null) {
headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
}
}
boolean hasReplyPartition = headers.lastHeader(this.replyPartitionHeaderName) != null;
if (hasReplyPartition && this.replyPartition != null) {
headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
}
Object correlation = this.binaryCorrelation ? correlationId : correlationId.toString();
byte[] correlationValue = this.binaryCorrelation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,14 @@
* @author Gary Russell
* @author Nathan Xu
* @author Soby Chacko
* @author Mikhail Polivakha
* @since 2.1.3
*
*/
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(partitions = 5, topics = { ReplyingKafkaTemplateTests.A_REPLY, ReplyingKafkaTemplateTests.A_REQUEST,
@EmbeddedKafka(partitions = 5, topics = {
ReplyingKafkaTemplateTests.A_REPLY, ReplyingKafkaTemplateTests.A_REQUEST,
ReplyingKafkaTemplateTests.B_REPLY, ReplyingKafkaTemplateTests.B_REQUEST,
ReplyingKafkaTemplateTests.C_REPLY, ReplyingKafkaTemplateTests.C_REQUEST,
ReplyingKafkaTemplateTests.D_REPLY, ReplyingKafkaTemplateTests.D_REQUEST,
Expand All @@ -119,7 +121,10 @@
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST,
ReplyingKafkaTemplateTests.K_REPLY, ReplyingKafkaTemplateTests.K_REQUEST,
ReplyingKafkaTemplateTests.L_REPLY, ReplyingKafkaTemplateTests.L_REQUEST,
ReplyingKafkaTemplateTests.M_REPLY, ReplyingKafkaTemplateTests.M_REQUEST })
ReplyingKafkaTemplateTests.M_REPLY, ReplyingKafkaTemplateTests.M_REQUEST,
ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_REPLY, ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_REQUEST,
ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY, ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST
})
public class ReplyingKafkaTemplateTests {

public static final String A_REPLY = "aReply";
Expand Down Expand Up @@ -174,6 +179,14 @@ public class ReplyingKafkaTemplateTests {

public static final String M_REQUEST = "mRequest";

public static final String CUSTOM_REPLY_HEADER_REPLY = "CUSTOM_REPLY_HEADER_REPLY";

public static final String CUSTOM_REPLY_HEADER_REQUEST = "CUSTOM_REPLY_HEADER_REQUEST";

public static final String CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY = "CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY";

public static final String CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST = "CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST";

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

Expand Down Expand Up @@ -365,6 +378,54 @@ public void testHandlerReturn() throws Exception {
}
}

@Test
public void testCustomReplyTopicHeaderIsNotDuplicated() throws Exception {
String customReplyHeaderName = "X-Custom-Reply-Header";
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(CUSTOM_REPLY_HEADER_REPLY);
template.setReplyTopicHeaderName(customReplyHeaderName);
try {
Message<String> message = MessageBuilder.withPayload("expected_message")
.setHeader(customReplyHeaderName, CUSTOM_REPLY_HEADER_REPLY)
.setHeader(KafkaHeaders.TOPIC, CUSTOM_REPLY_HEADER_REQUEST)
.build();

RequestReplyMessageFuture<Integer, String> future = template.sendAndReceive(message, Duration.ofSeconds(30));
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
Message<?> resultingMessage = future.get(30, TimeUnit.SECONDS);
assertThat(resultingMessage.getPayload()).isEqualTo("OK");
}
finally {
template.stop();
template.destroy();
}
}

@Test
public void testCustomReplyHeadersAreNotDuplicated() throws Exception {
String customReplyTopicHeaderName = "X-Custom-Reply-Header";
String customReplyPartitionHeaderName = "X-Custom-Reply-Partition";
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY);
template.setReplyTopicHeaderName(customReplyTopicHeaderName);
template.setReplyPartitionHeaderName(customReplyPartitionHeaderName);

try {
Message<String> message = MessageBuilder.withPayload("expected_message")
.setHeader(customReplyTopicHeaderName, CUSTOM_REPLY_HEADER_REPLY)
.setHeader(customReplyPartitionHeaderName, "test-partition")
.setHeader(KafkaHeaders.TOPIC, CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST)
.build();

RequestReplyMessageFuture<Integer, String> future = template.sendAndReceive(message, Duration.ofSeconds(30));
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
Message<?> resultingMessage = future.get(30, TimeUnit.SECONDS);
assertThat(resultingMessage.getPayload()).isEqualTo("OK");
}
finally {
template.stop();
template.destroy();
}
}

@Test
public void testMessageReturnNoHeadersProvidedByListener() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(H_REPLY);
Expand Down Expand Up @@ -1046,6 +1107,45 @@ public List<Message<String>> handleM(String in) throws InterruptedException {
return Collections.singletonList(message);
}

@KafkaListener(id = CUSTOM_REPLY_HEADER_REQUEST, topics = CUSTOM_REPLY_HEADER_REQUEST)
@SendTo(CUSTOM_REPLY_HEADER_REPLY) // send to custom topic back
public String handleCustomReplyHeaderNoReplyPartition(ConsumerRecord<?, String> inputMessage) {
Headers headers = inputMessage.headers();

if (length(headers.headers("X-Custom-Reply-Header")) != 1) {
return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once";
}

if (length(headers.headers(KafkaHeaders.REPLY_PARTITION)) != 0) {
return "It is expected that the user does NOT specify the reply partition in this test case";
}

if (!"expected_message".equals(inputMessage.value())) {
return "Expected message is 'expected_message', but got %s".formatted(inputMessage.value());
}

return "OK";
}

@KafkaListener(id = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST, topics = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST)
@SendTo(CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY) // send to custom topic back
public String handleCustomReplyHeaderDefaultPartitionHeader(ConsumerRecord<?, String> inputMessage) {
Headers headers = inputMessage.headers();

if (length(headers.headers("X-Custom-Reply-Header")) != 1) {
return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once";
}

if (length(headers.headers("X-Custom-Reply-Partition")) != 1) {
return "Executed a single reply partition header '%s' in the incoming message".formatted(KafkaHeaders.REPLY_PARTITION);
}

if (!"expected_message".equals(inputMessage.value())) {
return "Expected message is 'expected_message', but got %s".formatted(inputMessage.value());
}

return "OK";
}
}

@KafkaListener(topics = C_REQUEST, groupId = C_REQUEST)
Expand Down Expand Up @@ -1090,6 +1190,12 @@ public Object deserialize(String topic, Headers headers, byte[] data) {

}

private static int length(Iterable<?> iterable) {
int counter = 0;
for (; iterable.iterator().hasNext(); counter++) {}
return counter;
}

public static class Foo {

private String bar;
Expand Down
Loading