-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-15370: Support Participation in 2PC (KIP-939) (1/N) #17687
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This is just a mechanical change to make prepareTransitionTo method use named parameters instead of positional parameters.
updateTimestamp: Long, | ||
clientTransactionVersion: TransactionVersion): TxnTransitMetadata = { | ||
private def prepareTransitionTo(state: TransactionState, | ||
producerId: Long = this.producerId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a previous effort to remove default arguments in order to not accidentally set something incorrect. I think the defaults make sense (keeping the same) so I think it makes sense, but wanted to mention this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The defaults just mean that there is no change, so if the caller doesn't use an argument, then it remains unchanged for the transition. I think it makes code easier to read than to require every parameter to be specified: the changed stuff is easy to see from just glancing at the function call, no need to mentally parse which arguments are updated and which stay the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with these points. Was just sharing some historical context on this (and other methods). I think it is valid to use here.
topicPartitions: immutable.Set[TopicPartition] = this.topicPartitions.toSet, | ||
txnStartTimestamp: Long = this.txnStartTimestamp, | ||
txnLastUpdateTimestamp: Long = this.txnLastUpdateTimestamp, | ||
clientTransactionVersion: TransactionVersion = this.clientTransactionVersion): TxnTransitMetadata = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the idea to keep clientTransactionVersion the same until each PrepareCommit/PrepareAbortCall? Or will KIP-939 introduce changes to set this in another transactional state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I don't think KIP-939 would rely on persisted value of client version, we should be able to just look at the values of the fields (nextProducerId, nextProducerEpoch) and figure out what logic needs to be done.
But it's a good point that this change actually makes the clientVersion "sticky" in the sense that once it's set it'll keep the value during the transitions. Maybe to keep this change completely mechanical, transition to ongoing or empty should reset the version to TV_0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, Things may also change with the change to add partitions implicitly (as we will have the client version there)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a summary of offline conversations for visibility:
- we'll keep the existing client version in the state unless it needs to be changed
- we'll update the
completeTransictionTo
to always copy all fields to eliminate a second decision point for which fields are toing to be update; having a second decision point is confusing and can lead to bugs
Thanks for the PR, makes things much more clear and readable! |
Update completeTransitionTo to fully take the state from transitMetadata, this eliminates a redundant decision point.
Fix unit tests to reflect changed logic (always copy transition data)
@@ -204,7 +204,7 @@ class TransactionMetadataTest { | |||
assertEquals(producerId, txnMetadata.producerId) | |||
assertEquals(producerEpoch + 1, txnMetadata.producerEpoch) | |||
assertEquals(producerEpoch, txnMetadata.lastProducerEpoch) | |||
assertEquals(1L, txnMetadata.txnStartTimestamp) | |||
assertEquals(-1L, txnMetadata.txnStartTimestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is changed (now we unconditionally copy data from the transitMetadata when we complete transition, so original value of 1 is not preserved), so the unit test is changed to codify the new logic. Looking at the history, it doesn't seem like there was a significance in the logic, it's just the UT codified some behavior, and now it codifies a different behavior.
@@ -231,7 +231,7 @@ class TransactionMetadataTest { | |||
assertEquals(producerId + 1, txnMetadata.producerId) | |||
assertEquals(producerEpoch, txnMetadata.lastProducerEpoch) | |||
assertEquals(0, txnMetadata.producerEpoch) | |||
assertEquals(1L, txnMetadata.txnStartTimestamp) | |||
assertEquals(-1L, txnMetadata.txnStartTimestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unconditionally copy data from the transitMetadata when we complete transition.
state = Ongoing, | ||
topicPartitions = (topicPartitions ++ addedTopicPartitions), | ||
txnStartTimestamp = newTxnStartTimestamp, | ||
txnLastUpdateTimestamp = updateTimestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we include clientTransactionVersion here since it is now passed to this method too?
Address review feedback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
…e-old-protocol-versions * apache-github/trunk: KAFKA-18334: Produce v4-v6 should be undeprecated (apache#18288) KAFKA-13722: code cleanup after deprecated StateStore.init() was removed (apache#18249) KAFKA-15370: Support Participation in 2PC (KIP-939) (1/N) (apache#17687)
This is just a mechanical change to make prepareTransitionTo method use named parameters instead of positional parameters. Reviewers: Justine Olshan <[email protected]>, Ritika Reddy <[email protected]>
This is just a mechanical change to make prepareTransitionTo method use named parameters instead of positional parameters.