Skip to content

KAFKA-18026: KIP-1112 migrate KTableSuppressProcessorSupplier #18150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
Expand Down Expand Up @@ -565,12 +564,6 @@ public KTable<K, V> suppress(final Suppressed<? super K> suppressed) {
final String storeName =
suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME);

final ProcessorSupplier<K, Change<V>, K, Change<V>> suppressionSupplier = new KTableSuppressProcessorSupplier<>(
suppressedInternal,
storeName,
this
);

final StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V, Change<V>>> storeBuilder;

if (suppressedInternal.bufferConfig().isLoggingEnabled()) {
Expand All @@ -588,10 +581,16 @@ public KTable<K, V> suppress(final Suppressed<? super K> suppressed) {
.withLoggingDisabled();
}

final ProcessorSupplier<K, Change<V>, K, Change<V>> suppressionSupplier = new KTableSuppressProcessorSupplier<>(
suppressedInternal,
storeBuilder,
this
);

final ProcessorGraphNode<K, Change<V>> node = new TableSuppressNode<>(
name,
new ProcessorParameters<>(suppressionSupplier, name),
StoreBuilderWrapper.wrapStoreBuilder(storeBuilder)
new String[]{storeName}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's definitely not being used in the PR, but actually we do still need it so it should/will be used (see https://github.com/apache/kafka/pull/18150/files#r1885482579)

);
node.setOutputVersioned(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@
*/
package org.apache.kafka.streams.kstream.internals.graph;

import org.apache.kafka.streams.processor.internals.StoreFactory;

public class TableSuppressNode<K, V> extends StatefulProcessorNode<K, V> {
public TableSuppressNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?> processorParameters,
final StoreFactory materializedKTableStoreBuilder) {
super(nodeName, processorParameters, materializedKTableStoreBuilder);
final String[] storeNames) {
super(nodeName, processorParameters, storeNames);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,39 @@
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Maybe;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;

import java.util.Set;

import static java.util.Objects.requireNonNull;

public class KTableSuppressProcessorSupplier<K, V> implements
KTableProcessorSupplier<K, V, K, V> {
private final SuppressedInternal<K> suppress;
private final String storeName;
private final StoreBuilder<?> storeBuilder;
private final KTableImpl<K, ?, V> parentKTable;

public KTableSuppressProcessorSupplier(final SuppressedInternal<K> suppress,
final String storeName,
final StoreBuilder<?> storeBuilder,
final KTableImpl<K, ?, V> parentKTable) {
this.suppress = suppress;
this.storeName = storeName;
this.storeBuilder = storeBuilder;
this.parentKTable = parentKTable;
// The suppress buffer requires seeing the old values, to support the prior value view.
parentKTable.enableSendingOldValues(true);
}

@Override
public Processor<K, Change<V>, K, Change<V>> get() {
return new KTableSuppressProcessor<>(suppress, storeName);
return new KTableSuppressProcessor<>(suppress, storeBuilder.name());
}

@Override
public Set<StoreBuilder<?>> stores() {
return Set.of(storeBuilder);
}

@Override
Expand All @@ -75,7 +83,7 @@ public KTableValueGetter<K, V> get() {
public void init(final ProcessorContext<?, ?> context) {
parentGetter.init(context);
// the main processor is responsible for the buffer's lifecycle
buffer = requireNonNull(context.getStateStore(storeName));
buffer = requireNonNull(context.getStateStore(storeBuilder.name()));
}

@Override
Expand Down Expand Up @@ -107,7 +115,7 @@ public String[] storeNames() {
final String[] parentStores = parentValueGetterSupplier.storeNames();
final String[] stores = new String[1 + parentStores.length];
System.arraycopy(parentStores, 0, stores, 1, parentStores.length);
stores[0] = storeName;
stores[0] = storeBuilder.name();
return stores;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TableJoined;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.StateStore;
Expand Down Expand Up @@ -1517,6 +1518,30 @@ public void shouldWrapProcessorsForStreamAggregate() {
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
}

@Test
public void shouldWrapProcessorsForSuppress() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);

final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);

final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));

builder.stream("input", Consumed.as("source"))
.groupByKey()
.count(Named.as("count"))// wrapped 1
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(10), Suppressed.BufferConfig.unbounded()).withName("suppressed")) // wrapped 2
.toStream(Named.as("toStream"))// wrapped 3
.to("output", Produced.as("sink"));

builder.build();
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(3));
assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder("count", "toStream", "suppressed"));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
}

@Test
public void shouldWrapProcessorsForTimeWindowStreamAggregate() {
final Map<Object, Object> props = dummyStreamsConfigMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer;
import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
Expand All @@ -38,6 +39,7 @@
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
Expand Down Expand Up @@ -144,7 +146,7 @@ public void shouldRecordMetricsWithBuiltInMetricsVersionLatest() {
final Processor<String, Change<Long>, String, Change<Long>> processor =
new KTableSuppressProcessorSupplier<>(
(SuppressedInternal<String>) Suppressed.<String>untilTimeLimit(Duration.ofDays(100), maxRecords(1)),
storeName,
mockBuilderWithName(storeName),
mock
).get();

Expand Down Expand Up @@ -206,4 +208,10 @@ private static <T> void verifyMetric(final Map<MetricName, ? extends Metric> met
assertThat(metrics.get(metricName).metricName().description(), is(metricName.description()));
assertThat((T) metrics.get(metricName).metricValue(), matcher);
}

private StoreBuilder<?> mockBuilderWithName(final String name) {
final StoreBuilder<?> builder = Mockito.mock(StoreBuilder.class);
Mockito.when(builder.name()).thenReturn(name);
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer;
import org.apache.kafka.test.MockInternalNewProcessorContext;

Expand All @@ -43,6 +44,7 @@
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
Expand Down Expand Up @@ -92,7 +94,7 @@ private static class Harness<K, V> {
@SuppressWarnings("unchecked")
final KTableImpl<K, ?, V> parent = mock(KTableImpl.class);
final Processor<K, Change<V>, K, Change<V>> processor =
new KTableSuppressProcessorSupplier<>((SuppressedInternal<K>) suppressed, storeName, parent).get();
new KTableSuppressProcessorSupplier<>((SuppressedInternal<K>) suppressed, mockBuilderWithName(storeName), parent).get();

final MockInternalNewProcessorContext<K, Change<V>> context = new MockInternalNewProcessorContext<>();
context.setCurrentNode(new ProcessorNode("testNode"));
Expand Down Expand Up @@ -487,4 +489,10 @@ private static <K> Serde<Windowed<K>> timeWindowedSerdeFrom(final Class<K> rawTy
new TimeWindowedDeserializer<>(kSerde.deserializer(), windowSize)
);
}

private static StoreBuilder<?> mockBuilderWithName(final String name) {
final StoreBuilder<?> builder = Mockito.mock(StoreBuilder.class);
Mockito.when(builder.name()).thenReturn(name);
return builder;
}
}
Loading