Skip to content

GH-3989 Consder the custom name of the reply topic name in sendAndRec… #3994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

mipo256
Copy link
Contributor

@mipo256 mipo256 commented Jul 14, 2025

Closes #3989

WIP, writing tests

@mipo256 mipo256 force-pushed the GH-3989 branch 8 times, most recently from 99a759d to 95c9166 Compare July 14, 2025 15:00
@sobychacko
Copy link
Contributor

@mipo256 PR build is failing, just letting you know.

@mipo256
Copy link
Contributor Author

mipo256 commented Jul 14, 2025

Hey @sobychacko ! I know, thanks! I'll fix the checkstyle violations shortly and then ping you :)

@mipo256 mipo256 marked this pull request as ready for review July 15, 2025 11:16
@mipo256
Copy link
Contributor Author

mipo256 commented Jul 15, 2025

Hey guys @sobychacko, @artembilan.

I've added some test cases for this scenario and introduced minor polishing, the PR is ready for review

@@ -33,6 +33,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.collect.Iterables;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether we can avoid using this. I searched the repository, and we are not using this anywhere else. Maybe we can simply look for the header in the code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular issue is about the duplication of headers in the Kafka message. We cannot just check the existence of the header in the message - we need to count the occurrences of it.

The alternative would be to write our own analog of Iterables.size, but I do not think it makes much sense.

@@ -174,6 +180,18 @@ public class ReplyingKafkaTemplateTests {

public static final String M_REQUEST = "mRequest";

// GH-3989
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should avoid commenting this way using the GH issue number. Either leave it out, or describe what you want to comment there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just was used to it in the Spring Data. In Spring Data when bug is getting fixed for instance, the tests that verify the actual fix typically contain GH comment as a reference to the issue.

I think this approach makes sense, since it just adds the context about what exactly use case is targeted by particular test method.

Either way, of course I do not insist on this. The final decision is on core maintainers of the spring-kafka.

// GH-3989
public static final String CUSTOM_REPLY_HEADER_REPLY = "CUSTOM_REPLY_HEADER_REPLY";

// GH-3989
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

// GH-3989
public static final String CUSTOM_REPLY_HEADER_REQUEST = "CUSTOM_REPLY_HEADER_REQUEST";

// GH-3989
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

// GH-3989
public static final String CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY = "CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY";

// GH-3989
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -1046,6 +1112,47 @@ public List<Message<String>> handleM(String in) throws InterruptedException {
return Collections.singletonList(message);
}

// GH-3989
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

describe the comment or remove it.

@@ -35,8 +35,16 @@ public final class CorrelationKey {

private final byte[] correlationId;

/**
* Cached hex representation of the {@link #correlationId}.
* TODO: Migrate to stable values JEP 502
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate what you mean by migrate to JEP 502 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that in JEP 502 we would have stable values in Java, which are lazily initilized final fields. The lazily computed hashcode and hex representation of correlation id is exactly that use case.

@mipo256
Copy link
Contributor Author

mipo256 commented Jul 16, 2025

Hey @sobychacko, can you please re-review? I've answered you questions and fixed the comments that I think we both agree upon.

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ReplyingKafkaTemplate checks for default reply header name instead of custom one
2 participants