-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-18583 Fix getPartitionReplicaEndpoints for KRaft #18635
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Although MetadataCache's getPartitionReplicaEndpoints takes a single topic-partition, the KRaftMetadataCache implementation iterates over all partitions of the matching topic. This is not necessary and can cause significant performance degradation when the topic has a relatively high number of partitions. Note that this is not a recent regression - it has been a part of KRaftMetadataCache since its creation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. I left one comment for consideration.
// Verify that for each partition we have exactly $replicationFactor endpoints | ||
assertEquals(replicationFactor, | ||
replicaSet.size, | ||
s"Unexpected replica set $replicaSet for partition $partitionId") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to also verify that the method returns the replicas that we expect (replica ids + nodes)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, let me know if there's anything you'd like changed in the chosen approach.
@@ -433,7 +433,7 @@ class KRaftMetadataCache( | |||
val image = _currentImage | |||
val result = new mutable.HashMap[Int, Node]() | |||
Option(image.topics().getTopic(tp.topic())).foreach { topic => | |||
topic.partitions().values().forEach { partition => | |||
Option(topic.partitions().get(tp.partition())).foreach { partition => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ouch - this is a painful oversight.
@@ -511,6 +512,101 @@ class MetadataCacheTest { | |||
assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet) | |||
} | |||
|
|||
@ParameterizedTest | |||
@MethodSource(Array("cacheProvider")) | |||
def testGetPartitionReplicaEndpoints(cache: MetadataCache): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that we didn't have a test for this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly - there is an FFF integration test verifying that the method doesn't return a shutdown node (eventually).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the patch.
Although `MetadataCache`'s `getPartitionReplicaEndpoints` takes a single topic-partition, the `KRaftMetadataCache` implementation iterates over all partitions of the matching topic. This is not necessary and can cause significant performance degradation when the topic has a relatively high number of partitions. Note that this is not a recent regression - it has been a part of `KRaftMetadataCache` since its creation. Reviewers: Ismael Juma <[email protected]>, David Jacot <[email protected]>
Merged it to trunk and to 4.0. @dimitarndimitrov I was not able to cherry-pick it to 3.9 and to 3.8 due to conflicts. Could you please open PRs for those branches? |
The cherry-pick required reimplementing the accompanying test to work with UpdateMetadataRequest (removed in 4.0 and trunk) in order to also apply to `ZkMetadataCache`. If the removal of UpdateMetadataRequest is backported here as well, the test can be changed to match the trunk version. Conflicts: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
Hey @dajac check out #18657 when you have a moment. The sole commit in that PR also applies cleanly to 3.8 (tested locally). |
Although `MetadataCache`'s `getPartitionReplicaEndpoints` takes a single topic-partition, the `KRaftMetadataCache` implementation iterates over all partitions of the matching topic. This is not necessary and can cause significant performance degradation when the topic has a relatively high number of partitions. Note that this is not a recent regression - it has been a part of `KRaftMetadataCache` since its creation. Reviewers: Ismael Juma <[email protected]>, David Jacot <[email protected]>
Although `MetadataCache`'s `getPartitionReplicaEndpoints` takes a single topic-partition, the `KRaftMetadataCache` implementation iterates over all partitions of the matching topic. This is not necessary and can cause significant performance degradation when the topic has a relatively high number of partitions. Note that this is not a recent regression - it has been a part of `KRaftMetadataCache` since its creation. Reviewers: Ismael Juma <[email protected]>, David Jacot <[email protected]>
Although `MetadataCache`'s `getPartitionReplicaEndpoints` takes a single topic-partition, the `KRaftMetadataCache` implementation iterates over all partitions of the matching topic. This is not necessary and can cause significant performance degradation when the topic has a relatively high number of partitions. Note that this is not a recent regression - it has been a part of `KRaftMetadataCache` since its creation. Reviewers: Ismael Juma <[email protected]>, David Jacot <[email protected]>
Although
MetadataCache
'sgetPartitionReplicaEndpoints
takes a single topic-partition, theKRaftMetadataCache
implementation iterates over all partitions of the matching topic. This is not necessary and can cause significant performance degradation when the topic has a relatively high number of partitions.Note that this is not a recent regression - it has been a part of
KRaftMetadataCache
since its creation.Committer Checklist (excluded from commit message)