-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-18301; Make coordinator records first class citizen #18261
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the premise of this PR. The hard-coded "version" numbers for the record types have always been a bit inelegant. This is going to be much nicer.
I'm not convinced by the OffsetCommitV0 thing. I think something you're relying on and benefitting from the fact that OffsetCommitV0 v0 is exactly the same as OffsetCommit v0. Sometimes, you have to convert from the v0 class to the current one. Just seems a bit of a faff. Can the definition of the coordinator-key schemas be flexible enough to allow just OffsetCommit to exist without the separate V0 schema?
@@ -14,7 +14,8 @@ | |||
// limitations under the License. | |||
|
|||
{ | |||
"type": "data", | |||
"apiKey": 1, | |||
"type": "coordinator-value", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is clearly a slightly compromised area because of the way that OffsetCommit has already used two key version numbers. I would say that OffsetCommitValueV0 would have valid version 0, and OffsetCommitValue would have valid versions 1-4. In practice, you'll never find an instance of v0 for OffsetCommitValue because it would actually be OffsetCommitValueV0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OffsetCommitValueV0 has definitely only version 0. For OffsetCommitValue, I think that keeping 0+ does not hurt just in case.
|
||
if (type == MessageSpecType.COORDINATOR_KEY) { | ||
if (this.apiKey.isEmpty()) { | ||
throw new RuntimeException("The ApiKey must be set for messages " + name + " with type `record-key`"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
record-key
should be coordinator-key
.
throw new RuntimeException("The ApiKey must be set for messages " + name + " with type `record-key`"); | ||
} | ||
if (!this.validVersions().equals(new Versions((short) 0, ((short) 0)))) { | ||
throw new RuntimeException("The Versions must be set to `0` for messages " + name + " with type `record-key`"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
throw new RuntimeException("The Versions must be set to `0` for messages " + name + " with type `record-key`"); | ||
} | ||
if (!this.flexibleVersions.empty()) { | ||
throw new RuntimeException("The FlexibleVersions are not supported for messages " + name + " with type `record-key`"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
||
if (type == MessageSpecType.COORDINATOR_VALUE) { | ||
if (this.apiKey.isEmpty()) { | ||
throw new RuntimeException("The ApiKey must be set for messages with type `record-key`"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
record-key
should be coordinator-value
.
@@ -1069,7 +1070,7 @@ object GroupMetadataManager { | |||
* @return key for offset commit message | |||
*/ | |||
def offsetCommitKey(groupId: String, topicPartition: TopicPartition): Array[Byte] = { | |||
MessageUtil.toVersionPrefixedBytes(OffsetCommitKey.HIGHEST_SUPPORTED_VERSION, | |||
MessageUtil.toVersionPrefixedBytes(CoordinatorRecordType.OFFSET_COMMIT.id(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't seem like this change alters the schema written/read to/from the log, but i want to confirm that this doesn't break compatibility. should we have a test for this? my question is if there is a bug in one of the record format changes, would that be caught somewhere?
also, internal coordinator topics (txn state) can use this but the difference would be to use a unique api key instead now, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't seem like this change alters the schema written/read to/from the log, but i want to confirm that this doesn't break compatibility. should we have a test for this? my question is if there is a bug in one of the record format changes, would that be caught somewhere?
Let me see if I can some unit tests for this. Otherwise, any regressions should also be caught by upgrade system tests.
also, internal coordinator topics (txn state) can use this but the difference would be to use a unique api key instead now, right?
Right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, I did not change the coordinator tests so they still used hard coded record types. This also ensures that we don't introduce wrong ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added unit tests for the old group coordinator too. The new one has already good coverage (e.g. GroupCoordinatorRecordSerdeTest).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added unit tests for the old group coordinator too.
could you point me to where it was added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
I am not really happy with the OffsetCommit version 0. I considered extending the schema to support a legacy api key but it brings even more complexity. The goal of this refactoring to introduce the enum with all the types and their versions and to have strong semantic verified for the records. Having alternate api keys, makes this part harder. We could make an exception as you suggest but then it does not fit nicely in the enum either. Hence, I went with the separate record. I would argue that the separate record is not wrong because we actually changed the record type but we kept the same name. I think that we introduced this awkwardness when we transitioned to using the auto-generated records. We could perhaps rename that record to By the way, that legacy record was only used in Apache Kafka 0.8. It is very unlikely to ever see it in 4.0 clusters. I was also considering to just drop it but we would need a small KIP for it. It is too bad that the keep freeze for 4.0 is passed because I suppose that we will need to wait for 5.0 until we can remove it now. |
While looking at the message formatters in #18261, I have noticed at few incorrect test cases. * We should not log anything when the record type is unknown because the formatters have clear goals. * We should not parse the value when the key is null or when the key cannot be parsed. While it works in the tests, in practice, this is wrong because we cannot assume that type of the value if the type of the key is not defined. The key drives the type of the entire record. Reviewers: Chia-Ping Tsai <[email protected]>
While looking at the message formatters in #18261, I have noticed at few incorrect test cases. * We should not log anything when the record type is unknown because the formatters have clear goals. * We should not parse the value when the key is null or when the key cannot be parsed. While it works in the tests, in practice, this is wrong because we cannot assume that type of the value if the type of the key is not defined. The key drives the type of the entire record. Reviewers: Chia-Ping Tsai <[email protected]>
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple of instances of V0 remain which I suggest should also be renamed to Legacy. Apart from that nit, looks good to me.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
Outdated
Show resolved
Hide resolved
…e-old-protocol-versions * apache-github/trunk: KAFKA-18312: Added entityType: topicName to SubscribedTopicNames in ShareGroupHeartbeatRequest.json (apache#18285) HOTFIX: fix incompatible types: Optional<TimestampAndOffset> cannot be converted to Option<TimestampAndOffset> (apache#18284) MINOR Fix some test-catalog issues (apache#18272) KAFKA-18180: Move OffsetResultHolder to storage module (apache#18100) KAFKA-18301; Make coordinator records first class citizen (apache#18261) KAFKA-18262 Remove DefaultPartitioner and UniformStickyPartitioner (apache#18204) KAFKA-18296 Remove deprecated KafkaBasedLog constructor (apache#18257) KAFKA-12829: Remove old Processor and ProcessorSupplier interfaces (apache#18238) KAFKA-18292 Remove deprecated methods of UpdateFeaturesOptions (apache#18245) KAFKA-12829: Remove deprecated Topology#addProcessor of old Processor API (apache#18154) KAFKA-18035, KAFKA-18306, KAFKA-18092: Address TransactionsTest flaky tests (apache#18264) MINOR: change the default linger time in the new coordinator (apache#18274) KAFKA-18305: validate controller.listener.names is not in inter.broker.listener.name for kcontrollers (apache#18222) KAFKA-18207: Serde for handling transaction records (apache#18136) KAFKA-13722: Refactor Kafka Streams store interfaces (apache#18243) KAFKA-17131: Refactor TimeDefinitions (apache#18241) MINOR: Fix MessageFormatters (apache#18266) Mark flaky tests for Dec 18, 2024 (apache#18263)
While looking at the message formatters in apache#18261, I have noticed at few incorrect test cases. * We should not log anything when the record type is unknown because the formatters have clear goals. * We should not parse the value when the key is null or when the key cannot be parsed. While it works in the tests, in practice, this is wrong because we cannot assume that type of the value if the type of the key is not defined. The key drives the type of the entire record. Reviewers: Chia-Ping Tsai <[email protected]>
This patch is the first one in a series to improve how coordinator records are managed. It focuses on making coordinator records first class citizen in the generator. * Introduce `coordinator-key` and `coordinator-value` in the schema; * Introduce `apiKey` for those. This is done to avoid relying on the version to determine the type. * It also allows the generator to enforce some rules: the key cannot use flexible versions, the key must have a single version `0`, there must be a key and a value for a given api key, etc. * It generates an enum with all the coordinator record types. This is pretty handy in the code. The patch also updates the group coordinators to use those. Reviewers: Jeff Kim <[email protected]>, Andrew Schofield <[email protected]>
Following #18261, this patch updates the Share Coordinator to use the new record format. Reviewers: Chia-Ping Tsai <[email protected]>, Andrew Schofield <[email protected]>
…#18396) Following apache#18261, this patch updates the Share Coordinator to use the new record format. Reviewers: Chia-Ping Tsai <[email protected]>, Andrew Schofield <[email protected]>
While looking at the message formatters in apache/kafka#18261, I have noticed at few incorrect test cases. * We should not log anything when the record type is unknown because the formatters have clear goals. * We should not parse the value when the key is null or when the key cannot be parsed. While it works in the tests, in practice, this is wrong because we cannot assume that type of the value if the type of the key is not defined. The key drives the type of the entire record. Reviewers: Chia-Ping Tsai <[email protected]>
Following apache/kafka#18261, this patch updates the Share Coordinator to use the new record format. Reviewers: Chia-Ping Tsai <[email protected]>, Andrew Schofield <[email protected]>
While looking at the message formatters in apache/kafka#18261, I have noticed at few incorrect test cases. * We should not log anything when the record type is unknown because the formatters have clear goals. * We should not parse the value when the key is null or when the key cannot be parsed. While it works in the tests, in practice, this is wrong because we cannot assume that type of the value if the type of the key is not defined. The key drives the type of the entire record. Reviewers: Chia-Ping Tsai <[email protected]>
Following apache/kafka#18261, this patch updates the Share Coordinator to use the new record format. Reviewers: Chia-Ping Tsai <[email protected]>, Andrew Schofield <[email protected]>
This patch is the first one in a series to improve how coordinator records are managed. It focuses on making coordinator records first class citizen in the generator.
coordinator-key
andcoordinator-value
in the schema;apiKey
for those. This is done to avoid relying on the version to determine the type.0
, there must be a key and a value for a given api key, etc.The patch also updates the group coordinators to use those.
Committer Checklist (excluded from commit message)