-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-19301: Move and rewrite partition state classes to Java in org.apache.kafka.storage.internals.log #20083
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@joshua2519 thanks for this patch.
storage/src/main/java/org/apache/kafka/storage/internals/log/PendingShrinkIsr.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@joshua2519: Thanks for the patch.
storage/src/main/java/org/apache/kafka/storage/internals/log/PendingExpandIsr.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/PendingShrinkIsr.java
Outdated
Show resolved
Hide resolved
197889a
to
238ed5f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@joshua2519 Thanks for this patch. The migration introduces some extra overhead in deep copying. Please try to eliminate the copying as much as possible.
storage/src/main/java/org/apache/kafka/storage/internals/log/CommittedPartitionState.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/PendingExpandIsr.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/PendingPartitionChange.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionState.java
Outdated
Show resolved
Hide resolved
@@ -1949,9 +1799,9 @@ class Partition(val topicPartition: TopicPartition, | |||
// 2) leaderAndIsr.partitionEpoch == partitionEpoch: No update was performed since proposed and actual state are the same. | |||
// In both cases, we want to move from Pending to Committed state to ensure new updates are processed. | |||
|
|||
partitionState = CommittedPartitionState(leaderAndIsr.isr.asScala.map(_.toInt).toSet, leaderAndIsr.leaderRecoveryState) | |||
partitionState = new CommittedPartitionState(util.Set.copyOf(leaderAndIsr.isr), leaderAndIsr.leaderRecoveryState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you change leaderAndIsr.isr
to return immutable set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you change
leaderAndIsr.isr
to return immutable set?
Thanks for that.
I changed it to immutable set and patched related test.
@@ -1298,7 +1148,7 @@ class Partition(val topicPartition: TopicPartition, | |||
def getOutOfSyncReplicas(maxLagMs: Long): Set[Int] = { | |||
val current = partitionState | |||
if (!current.isInflight) { | |||
val candidateReplicaIds = current.isr - localBrokerId | |||
val candidateReplicaIds = current.isr.asScala.map(_.toInt).toSet - localBrokerId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@chia7712
To reduce collection copies, we could use an iterator like this:
current.isr.asScala.iterator.map(_.toInt).filter(_ != localBrokerId)).to(Set)
However, this approach is less concise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
current.isr.stream().filter(isr => isr != localBrokerId).collect(Collectors.toUnmodifiableSet).asScala
@@ -1098,7 +948,7 @@ class Partition(val topicPartition: TopicPartition, | |||
case (brokerId, logEndOffset) => s"broker $brokerId: $logEndOffset" | |||
} | |||
|
|||
val curInSyncReplicaObjects = (curMaximalIsr - localBrokerId).flatMap(getReplica) | |||
val curInSyncReplicaObjects = (curMaximalIsr.asScala.map(_.toInt) - localBrokerId).flatMap(getReplica) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as #20083 (comment)
curMaximalIsr.asScala.iterator.filter(_.intValue() != localBrokerId).map(_.toInt).flatMap(getReplica).to(Set)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@joshua2519 thanks for updates. some comments are left. PTAL
storage/src/main/java/org/apache/kafka/storage/internals/log/CommittedPartitionState.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/PendingPartitionChange.java
Outdated
Show resolved
Hide resolved
* This set may include un-committed ISR members following an expansion. This "effective" ISR is used for advancing | ||
* the high watermark as well as determining which replicas are required for acks=all produce requests. | ||
* | ||
* Only applicable as of IBP 2.7-IV2, for older versions this will return the committed ISR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment is out-of-date, shouldn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the minimum versions is 3.3-IV3 now.
I removed this part.
…tion-scala-to-storage-internals-log
…tion-scala-to-storage-internals-log
…tion-scala-to-storage-internals-log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@joshua2519 thanks for your patch. it is almost there. two small comments left.
storage/src/main/java/org/apache/kafka/storage/internals/log/AssignmentState.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/SimpleAssignmentState.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@joshua2519 thanks for your patch. overall LGTM
storage/src/main/java/org/apache/kafka/storage/internals/log/OngoingReassignmentState.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/CommittedPartitionState.java
Outdated
Show resolved
Hide resolved
…tion-scala-to-storage-internals-log
move following interface/class to
org.apache.kafka.storage.internals.log
and rewrtie to javaPartitionListener
AlterPartitionListener
AssignmentState
OngoingReassignmentState
SimpleAssignmentState
PartitionState
PendingPartitionChange
PendingExpandIsr
PendingShrinkIsr
CommittedPartitionState