-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19570: Implement offline migration for streams groups #20288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
Offline migration essentially preserves offsets and nothing else. So effectively write tombstones for classic group type when a streams heartbeat is sent to with the group ID of an empty classic group, and write tombstones for the streams group type when a classic consumer attempts to join with a group ID of an empty streams group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements offline migration for Kafka Streams groups, allowing seamless conversion between classic consumer groups and streams groups when the target group is empty. The migration preserves offsets while writing tombstones for the previous group type.
Key changes:
- Added migration logic for empty classic groups to streams groups when a streams heartbeat is received
- Added migration logic for empty streams groups to classic groups when a classic consumer attempts to join
- Enhanced group creation methods to handle cross-group-type migrations
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
File | Description |
---|---|
GroupMetadataManager.java | Implements core migration logic with helper methods and updates group creation/join flows |
GroupMetadataManagerTest.java | Adds comprehensive test coverage for both migration scenarios and edge cases |
@@ -833,19 +833,28 @@ ConsumerGroup getOrMaybeCreateConsumerGroup( | |||
* Gets or creates a streams group without updating the groups map. | |||
* The group will be materialized during the replay. | |||
* | |||
* If there is an empty classic consumer group of the same name, it will be deleted and a new streams | |||
* group. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is incomplete - it ends abruptly with 'a new streams' without completing the sentence. Should be 'a new streams group will be created.'
* group. | |
* group will be created. |
Copilot uses AI. Check for mistakes.
) { | ||
Group group = groups.get(groupId); | ||
|
||
if (group == null) { | ||
return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics); | ||
} else if (maybeDeleteEmptyClassicGroup(group, records)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method maybeDeleteEmptyClassicGroup
is called but this method is not defined in the diff. This will cause a compilation error unless the method exists elsewhere in the codebase.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
otherwise lgtm
* If there is an empty classic consumer group of the same name, it will be deleted and a new streams | ||
* group. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot's first comment looks correct, we're missing a word here.
* If there is an empty classic consumer group of the same name, it will be deleted and a new streams | |
* group. | |
* If there is an empty classic consumer group of the same name, it will be deleted and a new streams | |
* group wil be created. |
@@ -6066,7 +6075,7 @@ public CoordinatorResult<Void, CoordinatorRecord> classicGroupJoin( | |||
// classicGroupJoinToConsumerGroup takes the join requests to non-empty consumer groups. | |||
// The empty consumer groups should be converted to classic groups in classicGroupJoinToClassicGroup. | |||
return classicGroupJoinToConsumerGroup((ConsumerGroup) group, context, request, responseFuture); | |||
} else if (group.type() == CONSUMER || group.type() == CLASSIC) { | |||
} else if (group.type() == CONSUMER || group.type() == CLASSIC || group.type() == STREAMS && group.isEmpty()) { | |||
return classicGroupJoinToClassicGroup(context, request, responseFuture); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a comment to state the groups accepted by classicGroupJoinToClassicGroup here?
- existing classic group
- existing empty consumer group (will be converted to classic)
- existing empty streams group (will be converted to classic)
Offline migration essentially preserves offsets and nothing else. So
effectively write tombstones for classic group type when a streams
heartbeat is sent to with the group ID of an empty classic group, and
write tombstones for the streams group type when a classic consumer
attempts to join with a group ID of an empty streams group.