Skip to content

Commit d21743e

Browse files
Add docs
1 parent 2641c82 commit d21743e

File tree

4 files changed

+34
-12
lines changed

4 files changed

+34
-12
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Locale;
2626
import java.util.Map;
2727
import java.util.Optional;
28+
import java.util.Properties;
2829
import java.util.Set;
2930
import java.util.function.Function;
3031
import java.util.stream.Collectors;
@@ -211,7 +212,7 @@ default boolean shouldExpire() {
211212
}
212213

213214
/**
214-
* TODO Add docs
215+
* Listener method to trigger some action if config was changed.
215216
*/
216-
default void onGroupConfigChanged() {}
217+
default void onGroupConfigChanged(Properties updatedProperties) {}
217218
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ public void updateGroupConfig(String groupId, Properties newGroupConfig) {
5454
throw new InvalidRequestException("Group name can't be empty.");
5555
}
5656

57-
Optional.ofNullable(listener).ifPresent(listener -> listener.onGroupConfigUpdate(groupId));
57+
if (listener != null) {
58+
listener.onGroupConfigUpdate(groupId, newGroupConfig);
59+
}
5860

5961
final GroupConfig newConfig = GroupConfig.fromProps(
6062
defaultConfig.originals(),

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@
194194
import java.util.Objects;
195195
import java.util.Optional;
196196
import java.util.OptionalInt;
197+
import java.util.Properties;
197198
import java.util.Set;
198199
import java.util.SortedMap;
199200
import java.util.concurrent.CompletableFuture;
@@ -1913,7 +1914,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
19131914
// to persist the change, and bump the group epoch later.
19141915
boolean bumpGroupEpoch = hasStreamsMemberMetadataChanged(groupId, member, updatedMember, records);
19151916

1916-
// todo comment
1917+
// Check if StreamsGroup has flag rebalanceRequired equals to true
19171918
if (group.rebalanceRequired()) {
19181919
bumpGroupEpoch = true;
19191920
}
@@ -8372,10 +8373,12 @@ public List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseT
83728373
}
83738374

83748375
/**
8375-
* TODO docs
8376+
* Trigger {@link org.apache.kafka.coordinator.group.Group#onGroupConfigChanged} for selected group id
8377+
*
8378+
* @see org.apache.kafka.coordinator.group.Group#onGroupConfigChanged(Properties)
83768379
*/
8377-
public void onGroupConfigUpdate(String groupId) {
8378-
Optional.ofNullable(groups.get(groupId)).ifPresent(Group::onGroupConfigChanged);
8380+
public void onGroupConfigUpdate(String groupId, Properties updatedConfig) {
8381+
Optional.ofNullable(groups.get(groupId)).ifPresent(group -> group.onGroupConfigChanged(updatedConfig));
83798382
}
83808383

83818384
/**

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,12 @@
5151
import java.util.Map;
5252
import java.util.Objects;
5353
import java.util.Optional;
54+
import java.util.Properties;
5455
import java.util.Set;
5556
import java.util.TreeMap;
5657
import java.util.concurrent.atomic.AtomicBoolean;
5758

59+
import static org.apache.kafka.coordinator.group.GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG;
5860
import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
5961
import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.DEAD;
6062
import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
@@ -72,6 +74,10 @@ public class StreamsGroup implements Group {
7274
*/
7375
private static final String PROTOCOL_TYPE = "streams";
7476

77+
private static final Set<String> PROPERTIES_TO_TIGGER_REBALANCE = Set.of(
78+
STREAMS_NUM_STANDBY_REPLICAS_CONFIG
79+
);
80+
7581
public enum StreamsGroupState {
7682
EMPTY("Empty"),
7783
NOT_READY("NotReady"),
@@ -197,7 +203,7 @@ public static class DeadlineAndEpoch {
197203
private final TimelineObject<Optional<ConfiguredTopology>> configuredTopology;
198204

199205
/**
200-
* TODO docs
206+
* The flag that group should be rebalanced
201207
*/
202208
private final AtomicBoolean rebalanceRequired;
203209

@@ -834,14 +840,24 @@ public boolean isInStates(Set<String> statesFilter, long committedOffset) {
834840
return statesFilter.contains(state.get(committedOffset).toLowerCaseString());
835841
}
836842

843+
/**
844+
* Check if rebalance is required, after config was changed
845+
*
846+
* @param updatedProperties Properties that was updated
847+
*/
837848
@Override
838-
public void onGroupConfigChanged() {
839-
rebalanceRequired.set(true); // FIXME check config
849+
public void onGroupConfigChanged(Properties updatedProperties) {
850+
PROPERTIES_TO_TIGGER_REBALANCE.forEach(property -> {
851+
if (updatedProperties.containsKey(property)) {
852+
rebalanceRequired.set(true);
853+
}
854+
});
840855
}
841856

842857
/**
843-
* TODO docs
844-
* @return
858+
* Check if rebalance is required. Reset the flag, set it false.
859+
*
860+
* @return true - if rebalance is required
845861
*/
846862
public boolean rebalanceRequired() {
847863
return rebalanceRequired.getAndSet(false);

0 commit comments

Comments
 (0)