Skip to content

Commit 0179193

Browse files
authored
KAFKA-19529: State updater sensor names should be unique (#20262) (#20274)
All state updater threads use the same metrics instance, but do not use unique names for their sensors. This can have the following symptoms: 1) Data inserted into one sensor by one thread can affect the metrics of all state updater threads. 2) If one state updater thread is shutdown, the metrics associated to all state updater threads are removed. 3) If one state updater thread is started, while another one is removed, it can happen that a metric is registered with the `Metrics` instance, but not associated to any `Sensor` (because it is concurrently removed), which means that the metric will not be removed upon shutdown. If a thread with the same name later tries to register the same metric, we may run into a `java.lang.IllegalArgumentException: A metric named ... already exists`, as described in the ticket. This change fixes the bug giving unique names to the sensors. A test is added that there is no interference of the removal of sensors and metrics during shutdown. Reviewers: Matthias J. Sax <[email protected]>
1 parent 9c83c6d commit 0179193

File tree

3 files changed

+82
-28
lines changed

3 files changed

+82
-28
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.kafka.streams.processor.TaskId;
3737
import org.apache.kafka.streams.processor.internals.Task.State;
3838
import org.apache.kafka.streams.processor.internals.TaskAndAction.Action;
39+
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
3940

4041
import org.slf4j.Logger;
4142

@@ -89,7 +90,7 @@ private class StateUpdaterThread extends Thread {
8990
private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new KafkaFutureImpl<>();
9091

9192
public StateUpdaterThread(final String name,
92-
final Metrics metrics,
93+
final StreamsMetricsImpl metrics,
9394
final ChangelogReader changelogReader) {
9495
super(name);
9596
this.changelogReader = changelogReader;
@@ -745,7 +746,7 @@ private void recordMetrics(final long now, final long totalLatency, final long t
745746
private final Time time;
746747
private final Logger log;
747748
private final String name;
748-
private final Metrics metrics;
749+
private final StreamsMetricsImpl metrics;
749750
private final Consumer<byte[], byte[]> restoreConsumer;
750751
private final ChangelogReader changelogReader;
751752
private final TopologyMetadata topologyMetadata;
@@ -766,7 +767,7 @@ private void recordMetrics(final long now, final long totalLatency, final long t
766767
private StateUpdaterThread stateUpdaterThread = null;
767768

768769
public DefaultStateUpdater(final String name,
769-
final Metrics metrics,
770+
final StreamsMetricsImpl metrics,
770771
final StreamsConfig config,
771772
final Consumer<byte[], byte[]> restoreConsumer,
772773
final ChangelogReader changelogReader,
@@ -1062,70 +1063,71 @@ private class StateUpdaterMetrics {
10621063
private final Sensor standbyRestoreRatioSensor;
10631064
private final Sensor checkpointRatioSensor;
10641065

1065-
private final Deque<String> allSensorNames = new LinkedList<>();
1066+
private final Deque<Sensor> allSensors = new LinkedList<>();
10661067
private final Deque<MetricName> allMetricNames = new LinkedList<>();
10671068

1068-
private StateUpdaterMetrics(final Metrics metrics, final String threadId) {
1069+
private StateUpdaterMetrics(final StreamsMetricsImpl metrics, final String threadId) {
10691070
final Map<String, String> threadLevelTags = new LinkedHashMap<>();
10701071
threadLevelTags.put(THREAD_ID_TAG, threadId);
1072+
final Metrics metricsRegistry = metrics.metricsRegistry();
10711073

1072-
MetricName metricName = metrics.metricName("active-restoring-tasks",
1074+
MetricName metricName = metricsRegistry.metricName("active-restoring-tasks",
10731075
STATE_LEVEL_GROUP,
10741076
"The number of active tasks currently undergoing restoration",
10751077
threadLevelTags);
1076-
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
1078+
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
10771079
stateUpdaterThread.numRestoringActiveTasks() : 0);
10781080
allMetricNames.push(metricName);
10791081

1080-
metricName = metrics.metricName("standby-updating-tasks",
1082+
metricName = metricsRegistry.metricName("standby-updating-tasks",
10811083
STATE_LEVEL_GROUP,
10821084
"The number of standby tasks currently undergoing state update",
10831085
threadLevelTags);
1084-
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
1086+
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
10851087
stateUpdaterThread.numUpdatingStandbyTasks() : 0);
10861088
allMetricNames.push(metricName);
10871089

1088-
metricName = metrics.metricName("active-paused-tasks",
1090+
metricName = metricsRegistry.metricName("active-paused-tasks",
10891091
STATE_LEVEL_GROUP,
10901092
"The number of active tasks paused restoring",
10911093
threadLevelTags);
1092-
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
1094+
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
10931095
stateUpdaterThread.numPausedActiveTasks() : 0);
10941096
allMetricNames.push(metricName);
10951097

1096-
metricName = metrics.metricName("standby-paused-tasks",
1098+
metricName = metricsRegistry.metricName("standby-paused-tasks",
10971099
STATE_LEVEL_GROUP,
10981100
"The number of standby tasks paused state update",
10991101
threadLevelTags);
1100-
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
1102+
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
11011103
stateUpdaterThread.numPausedStandbyTasks() : 0);
11021104
allMetricNames.push(metricName);
11031105

1104-
this.idleRatioSensor = metrics.sensor("idle-ratio", RecordingLevel.INFO);
1106+
this.idleRatioSensor = metrics.threadLevelSensor(threadId, "idle-ratio", RecordingLevel.INFO);
11051107
this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
1106-
allSensorNames.add("idle-ratio");
1108+
allSensors.add(this.idleRatioSensor);
11071109

1108-
this.activeRestoreRatioSensor = metrics.sensor("active-restore-ratio", RecordingLevel.INFO);
1110+
this.activeRestoreRatioSensor = metrics.threadLevelSensor(threadId, "active-restore-ratio", RecordingLevel.INFO);
11091111
this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
1110-
allSensorNames.add("active-restore-ratio");
1112+
allSensors.add(this.activeRestoreRatioSensor);
11111113

1112-
this.standbyRestoreRatioSensor = metrics.sensor("standby-update-ratio", RecordingLevel.INFO);
1114+
this.standbyRestoreRatioSensor = metrics.threadLevelSensor(threadId, "standby-update-ratio", RecordingLevel.INFO);
11131115
this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
1114-
allSensorNames.add("standby-update-ratio");
1116+
allSensors.add(this.standbyRestoreRatioSensor);
11151117

1116-
this.checkpointRatioSensor = metrics.sensor("checkpoint-ratio", RecordingLevel.INFO);
1118+
this.checkpointRatioSensor = metrics.threadLevelSensor(threadId, "checkpoint-ratio", RecordingLevel.INFO);
11171119
this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg());
1118-
allSensorNames.add("checkpoint-ratio");
1120+
allSensors.add(this.checkpointRatioSensor);
11191121

1120-
this.restoreSensor = metrics.sensor("restore-records", RecordingLevel.INFO);
1122+
this.restoreSensor = metrics.threadLevelSensor(threadId, "restore-records", RecordingLevel.INFO);
11211123
this.restoreSensor.add(new MetricName("restore-records-rate", STATE_LEVEL_GROUP, RESTORE_RECORDS_RATE_DESCRIPTION, threadLevelTags), new Rate());
11221124
this.restoreSensor.add(new MetricName("restore-call-rate", STATE_LEVEL_GROUP, RESTORE_RATE_DESCRIPTION, threadLevelTags), new Rate(new WindowedCount()));
1123-
allSensorNames.add("restore-records");
1125+
allSensors.add(this.restoreSensor);
11241126
}
11251127

11261128
void clear() {
1127-
while (!allSensorNames.isEmpty()) {
1128-
metrics.removeSensor(allSensorNames.pop());
1129+
while (!allSensors.isEmpty()) {
1130+
metrics.removeSensor(allSensors.pop());
11291131
}
11301132

11311133
while (!allMetricNames.isEmpty()) {

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateU
646646
final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx;
647647
final StateUpdater stateUpdater = new DefaultStateUpdater(
648648
name,
649-
streamsMetrics.metricsRegistry(),
649+
streamsMetrics,
650650
streamsConfig,
651651
restoreConsumer,
652652
changelogReader,

streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.streams.processor.TaskId;
2929
import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTask;
3030
import org.apache.kafka.streams.processor.internals.Task.State;
31+
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
3132

3233
import org.hamcrest.Matcher;
3334
import org.junit.jupiter.api.AfterEach;
@@ -105,7 +106,7 @@ class DefaultStateUpdaterTest {
105106

106107
// need an auto-tick timer to work for draining with timeout
107108
private final Time time = new MockTime(1L);
108-
private final Metrics metrics = new Metrics(time);
109+
private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", time);
109110
private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
110111
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
111112
private final TopologyMetadata topologyMetadata = unnamedTopology().build();
@@ -1680,8 +1681,59 @@ public void shouldRecordMetrics() throws Exception {
16801681
assertThat(metrics.metrics().size(), is(1));
16811682
}
16821683

1684+
@Test
1685+
public void shouldRemoveMetricsWithoutInterference() {
1686+
final DefaultStateUpdater stateUpdater2 =
1687+
new DefaultStateUpdater("test-state-updater2", metrics, config, null, changelogReader, topologyMetadata, time);
1688+
final List<MetricName> threadMetrics = getMetricNames("test-state-updater");
1689+
final List<MetricName> threadMetrics2 = getMetricNames("test-state-updater2");
1690+
1691+
stateUpdater.start();
1692+
stateUpdater2.start();
1693+
1694+
for (final MetricName metricName : threadMetrics) {
1695+
assertTrue(metrics.metrics().containsKey(metricName));
1696+
}
1697+
for (final MetricName metricName : threadMetrics2) {
1698+
assertTrue(metrics.metrics().containsKey(metricName));
1699+
}
1700+
1701+
stateUpdater2.shutdown(Duration.ofMinutes(1));
1702+
1703+
for (final MetricName metricName : threadMetrics) {
1704+
assertTrue(metrics.metrics().containsKey(metricName));
1705+
}
1706+
for (final MetricName metricName : threadMetrics2) {
1707+
assertFalse(metrics.metrics().containsKey(metricName));
1708+
}
1709+
1710+
stateUpdater.shutdown(Duration.ofMinutes(1));
1711+
1712+
for (final MetricName metricName : threadMetrics) {
1713+
assertFalse(metrics.metrics().containsKey(metricName));
1714+
}
1715+
for (final MetricName metricName : threadMetrics2) {
1716+
assertFalse(metrics.metrics().containsKey(metricName));
1717+
}
1718+
}
1719+
1720+
private static List<MetricName> getMetricNames(final String threadId) {
1721+
final Map<String, String> tagMap = Map.of("thread-id", threadId);
1722+
return List.of(
1723+
new MetricName("active-restoring-tasks", "stream-state-updater-metrics", "", tagMap),
1724+
new MetricName("standby-updating-tasks", "stream-state-updater-metrics", "", tagMap),
1725+
new MetricName("active-paused-tasks", "stream-state-updater-metrics", "", tagMap),
1726+
new MetricName("standby-paused-tasks", "stream-state-updater-metrics", "", tagMap),
1727+
new MetricName("idle-ratio", "stream-state-updater-metrics", "", tagMap),
1728+
new MetricName("standby-update-ratio", "stream-state-updater-metrics", "", tagMap),
1729+
new MetricName("checkpoint-ratio", "stream-state-updater-metrics", "", tagMap),
1730+
new MetricName("restore-records-rate", "stream-state-updater-metrics", "", tagMap),
1731+
new MetricName("restore-call-rate", "stream-state-updater-metrics", "", tagMap)
1732+
);
1733+
}
1734+
16831735
@SuppressWarnings("unchecked")
1684-
private static <T> void verifyMetric(final Metrics metrics,
1736+
private static <T> void verifyMetric(final StreamsMetricsImpl metrics,
16851737
final MetricName metricName,
16861738
final Matcher<T> matcher) {
16871739
assertThat(metrics.metrics().get(metricName).metricName().description(), is(metricName.description()));

0 commit comments

Comments
 (0)