Skip to content

Commit e63d9a3

Browse files
committed
KAFKA-12829: Remove deprecated Topology#addProcessor of old Processor API
1 parent 8c55dcc commit e63d9a3

19 files changed

+147
-1235
lines changed

streams/src/main/java/org/apache/kafka/streams/Topology.java

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.kafka.streams.processor.api.Processor;
3131
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
3232
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
33-
import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
3433
import org.apache.kafka.streams.processor.internals.ProcessorNode;
3534
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
3635
import org.apache.kafka.streams.processor.internals.SinkNode;
@@ -875,48 +874,6 @@ public synchronized <K, V> Topology addSink(final String name,
875874
return this;
876875
}
877876

878-
/**
879-
* Add a new processor node that receives and processes records output by one or more parent source or processor
880-
* node.
881-
* Any new record output by this processor will be forwarded to its child processor or sink nodes.
882-
* The supplier should always generate a new instance each time
883-
* {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets called. Creating a single
884-
* {@link org.apache.kafka.streams.processor.Processor} object and returning the same object reference in
885-
* {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} would be a violation of the supplier pattern
886-
* and leads to runtime exceptions.
887-
* If {@code supplier} provides stores via {@link ConnectedStoreProvider#stores()}, the provided {@link StoreBuilder}s
888-
* will be added to the topology and connected to this processor automatically.
889-
*
890-
* @param name the unique name of the processor node
891-
* @param supplier the supplier used to obtain this node's {@link org.apache.kafka.streams.processor.Processor} instance
892-
* @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
893-
* and process
894-
* @return itself
895-
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
896-
* @deprecated Since 2.7.0 Use {@link #addProcessor(String, ProcessorSupplier, String...)} instead.
897-
*/
898-
@SuppressWarnings("rawtypes")
899-
@Deprecated
900-
public synchronized Topology addProcessor(final String name,
901-
final org.apache.kafka.streams.processor.ProcessorSupplier supplier,
902-
final String... parentNames) {
903-
return addProcessor(
904-
name,
905-
new ProcessorSupplier<Object, Object, Object, Object>() {
906-
@Override
907-
public Set<StoreBuilder<?>> stores() {
908-
return supplier.stores();
909-
}
910-
911-
@Override
912-
public org.apache.kafka.streams.processor.api.Processor<Object, Object, Object, Object> get() {
913-
return ProcessorAdapter.adaptRaw(supplier.get());
914-
}
915-
},
916-
parentNames
917-
);
918-
}
919-
920877
/**
921878
* Add a new processor node that receives and processes records output by one or more parent source or processor
922879
* node.

streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java

Lines changed: 0 additions & 59 deletions
This file was deleted.

streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java

Lines changed: 0 additions & 81 deletions
This file was deleted.

streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
import org.apache.kafka.test.GenericInMemoryKeyValueStore;
5858
import org.apache.kafka.test.MockApiProcessor;
5959
import org.apache.kafka.test.MockApiProcessorSupplier;
60-
import org.apache.kafka.test.MockInternalNewProcessorContext;
60+
import org.apache.kafka.test.MockInternalProcessorContext;
6161
import org.apache.kafka.test.MockValueJoiner;
6262
import org.apache.kafka.test.StreamsTestUtils;
6363

@@ -480,7 +480,7 @@ public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {
480480
Optional.of(StoreBuilderWrapper.wrapStoreBuilder(outerStoreBuilder)));
481481

482482
final Processor<String, String, String, String> joinProcessor = join.get();
483-
final MockInternalNewProcessorContext<String, String> procCtx = new MockInternalNewProcessorContext<>();
483+
final MockInternalProcessorContext<String, String> procCtx = new MockInternalProcessorContext<>();
484484
final WindowStore<String, String> otherStore = otherStoreBuilder.build();
485485

486486
final KeyValueStore<TimestampedKeyAndJoinSide<String>, LeftOrRightValue<String, String>> outerStore =

streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamNewProcessorApiTest.java renamed to streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamProcessorApiTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
import static java.util.Arrays.asList;
4747

4848

49-
public class KStreamNewProcessorApiTest {
49+
public class KStreamProcessorApiTest {
5050

5151
@Test
5252
void shouldGetStateStoreWithConnectedStoreProvider() {

streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
import org.apache.kafka.test.MockApiProcessor;
5858
import org.apache.kafka.test.MockApiProcessorSupplier;
5959
import org.apache.kafka.test.MockInitializer;
60-
import org.apache.kafka.test.MockInternalNewProcessorContext;
60+
import org.apache.kafka.test.MockInternalProcessorContext;
6161
import org.apache.kafka.test.StreamsTestUtils;
6262
import org.apache.kafka.test.TestUtils;
6363

@@ -646,7 +646,7 @@ public void shouldNotEmitFinalIfNotProgressEnough(final StrategyType inputType,
646646
try {
647647
// Always process
648648
props.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, 0);
649-
final MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = makeContext(stateDir, windowSize);
649+
final MockInternalProcessorContext<Windowed<String>, Change<String>> context = makeContext(stateDir, windowSize);
650650
final KStreamWindowAggregate<String, String, String, TimeWindow> processorSupplier = new KStreamWindowAggregate<>(
651651
windows,
652652
storeFactory,
@@ -736,7 +736,7 @@ public void shouldEmitWithInterval0(final StrategyType inputType, final boolean
736736
try {
737737
// Always process
738738
props.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, 0);
739-
final MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = makeContext(stateDir, windowSize);
739+
final MockInternalProcessorContext<Windowed<String>, Change<String>> context = makeContext(stateDir, windowSize);
740740
final KStreamWindowAggregate<String, String, String, TimeWindow> processorSupplier = new KStreamWindowAggregate<>(
741741
windows,
742742
storeFactory,
@@ -805,7 +805,7 @@ public void shouldEmitWithLargeInterval(final StrategyType inputType, final bool
805805
try {
806806
// Emit final every second
807807
props.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, 1000L);
808-
final MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = makeContext(stateDir, windowSize);
808+
final MockInternalProcessorContext<Windowed<String>, Change<String>> context = makeContext(stateDir, windowSize);
809809
final KStreamWindowAggregate<String, String, String, TimeWindow> processorSupplier = new KStreamWindowAggregate<>(
810810
windows,
811811
storeFactory,
@@ -906,7 +906,7 @@ public void shouldEmitFromLastEmitTime(final StrategyType inputType, final boole
906906
try {
907907
// Always process
908908
props.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, 0);
909-
final MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = makeContext(stateDir, windowSize);
909+
final MockInternalProcessorContext<Windowed<String>, Change<String>> context = makeContext(stateDir, windowSize);
910910
final KStreamWindowAggregate<String, String, String, TimeWindow> processorSupplier = new KStreamWindowAggregate<>(
911911
windows,
912912
storeFactory,
@@ -1028,8 +1028,8 @@ private TimestampedWindowStore<String, String> getWindowStore(final long windowS
10281028
.build();
10291029
}
10301030

1031-
private MockInternalNewProcessorContext<Windowed<String>, Change<String>> makeContext(final File stateDir, final long windowSize) {
1032-
final MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = new MockInternalNewProcessorContext<>(
1031+
private MockInternalProcessorContext<Windowed<String>, Change<String>> makeContext(final File stateDir, final long windowSize) {
1032+
final MockInternalProcessorContext<Windowed<String>, Change<String>> context = new MockInternalProcessorContext<>(
10331033
props,
10341034
new TaskId(0, 0),
10351035
stateDir

streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.kafka.streams.state.Stores;
3030
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
3131
import org.apache.kafka.streams.state.ValueAndTimestamp;
32-
import org.apache.kafka.test.MockInternalNewProcessorContext;
32+
import org.apache.kafka.test.MockInternalProcessorContext;
3333
import org.apache.kafka.test.StreamsTestUtils;
3434
import org.apache.kafka.test.TestUtils;
3535

@@ -59,7 +59,7 @@ public class ForeignTableJoinProcessorSupplierTests {
5959
Serdes.String()
6060
);
6161

62-
private MockInternalNewProcessorContext<String, SubscriptionResponseWrapper<String>> context = null;
62+
private MockInternalProcessorContext<String, SubscriptionResponseWrapper<String>> context = null;
6363
private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = null;
6464
private Processor<String, Change<String>, String, SubscriptionResponseWrapper<String>> processor = null;
6565
private File stateDir;
@@ -68,7 +68,7 @@ public class ForeignTableJoinProcessorSupplierTests {
6868
public void setUp() {
6969
stateDir = TestUtils.tempDirectory();
7070
final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
71-
context = new MockInternalNewProcessorContext<>(props, new TaskId(0, 0), stateDir);
71+
context = new MockInternalProcessorContext<>(props, new TaskId(0, 0), stateDir);
7272

7373
final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
7474
processor = new ForeignTableJoinProcessorSupplier<String, String, String>(

streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
3030
import org.apache.kafka.streams.state.ValueAndTimestamp;
3131
import org.apache.kafka.streams.state.internals.Murmur3;
32-
import org.apache.kafka.test.MockInternalNewProcessorContext;
32+
import org.apache.kafka.test.MockInternalProcessorContext;
3333

3434
import org.junit.jupiter.api.Test;
3535

@@ -96,7 +96,7 @@ public void shouldNotForwardWhenHashDoesNotMatch() {
9696
leftJoin
9797
);
9898
final Processor<String, SubscriptionResponseWrapper<String>, String, String> processor = processorSupplier.get();
99-
final MockInternalNewProcessorContext<String, String> context = new MockInternalNewProcessorContext<>();
99+
final MockInternalProcessorContext<String, String> context = new MockInternalProcessorContext<>();
100100
processor.init(context);
101101
context.setRecordMetadata("topic", 0, 0);
102102

@@ -125,7 +125,7 @@ public void shouldIgnoreUpdateWhenLeftHasBecomeNull() {
125125
leftJoin
126126
);
127127
final Processor<String, SubscriptionResponseWrapper<String>, String, String> processor = processorSupplier.get();
128-
final MockInternalNewProcessorContext<String, String> context = new MockInternalNewProcessorContext<>();
128+
final MockInternalProcessorContext<String, String> context = new MockInternalProcessorContext<>();
129129
processor.init(context);
130130
context.setRecordMetadata("topic", 0, 0);
131131

@@ -154,7 +154,7 @@ public void shouldForwardWhenHashMatches() {
154154
leftJoin
155155
);
156156
final Processor<String, SubscriptionResponseWrapper<String>, String, String> processor = processorSupplier.get();
157-
final MockInternalNewProcessorContext<String, String> context = new MockInternalNewProcessorContext<>();
157+
final MockInternalProcessorContext<String, String> context = new MockInternalProcessorContext<>();
158158
processor.init(context);
159159
context.setRecordMetadata("topic", 0, 0);
160160

@@ -180,7 +180,7 @@ public void shouldEmitTombstoneForInnerJoinWhenRightIsNull() {
180180
leftJoin
181181
);
182182
final Processor<String, SubscriptionResponseWrapper<String>, String, String> processor = processorSupplier.get();
183-
final MockInternalNewProcessorContext<String, String> context = new MockInternalNewProcessorContext<>();
183+
final MockInternalProcessorContext<String, String> context = new MockInternalProcessorContext<>();
184184
processor.init(context);
185185
context.setRecordMetadata("topic", 0, 0);
186186

@@ -206,7 +206,7 @@ public void shouldEmitResultForLeftJoinWhenRightIsNull() {
206206
leftJoin
207207
);
208208
final Processor<String, SubscriptionResponseWrapper<String>, String, String> processor = processorSupplier.get();
209-
final MockInternalNewProcessorContext<String, String> context = new MockInternalNewProcessorContext<>();
209+
final MockInternalProcessorContext<String, String> context = new MockInternalProcessorContext<>();
210210
processor.init(context);
211211
context.setRecordMetadata("topic", 0, 0);
212212

@@ -232,7 +232,7 @@ public void shouldEmitTombstoneForLeftJoinWhenRightIsNullAndLeftIsNull() {
232232
leftJoin
233233
);
234234
final Processor<String, SubscriptionResponseWrapper<String>, String, String> processor = processorSupplier.get();
235-
final MockInternalNewProcessorContext<String, String> context = new MockInternalNewProcessorContext<>();
235+
final MockInternalProcessorContext<String, String> context = new MockInternalProcessorContext<>();
236236
processor.init(context);
237237
context.setRecordMetadata("topic", 0, 0);
238238

0 commit comments

Comments
 (0)