Skip to content

KAFKA-12829: Remove deprecated Topology#addProcessor of old Processor API #18154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 20, 2024

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Dec 12, 2024

Must be cherry-picked to 4.0 branch.

@mjsax mjsax added the streams label Dec 12, 2024
@@ -57,7 +57,7 @@
import org.apache.kafka.test.GenericInMemoryKeyValueStore;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.apache.kafka.test.MockInternalProcessorContext;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow up clean from #18103

Rename this class, what result in lot of updates in many files...

@@ -46,7 +46,7 @@
import static java.util.Arrays.asList;


public class KStreamNewProcessorApiTest {
public class KStreamProcessorApiTest {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a similar update -- we don't have the old Processor API any longer, so dropping the New from this name, too.

props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup to get rid of some IntelliJ warning. (also cleaner code.

Similar stuff below.

assertEquals("value2", store.get("key2"));
assertEquals("value3", store.get("key3"));
assertNull(store.get("key4"));
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Diff is just weird.. this was not removed


@Deprecated // testing old PAPI
@Test
public void testDrivingConnectedStateStoreInDifferentProcessorsTopologyWithOldAPI() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this (and some other tests) -- verified that we have equivalent tests for new Processor API for all of them

assertEquals("key3", results.get(2).key);
assertEquals("value3", results.get(2).value);

}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird diff..

@@ -1568,21 +1105,19 @@ private Topology createSimpleTopologyWithDroppingPartitioner() {
.addSink("sink", OUTPUT_TOPIC_1, new DroppingPartitioner(), "processor");
}

@Deprecated // testing old PAPI
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This annotation was wrong -- should have been @SuppressWarning("deprecation")

Updating this code to use new Processor API now.

.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()), "processor")
.addSink("counts", OUTPUT_TOPIC_1, "processor");
}

@Deprecated // testing old PAPI
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

* A processor that stores each key-value pair in an in-memory key-value store registered with the context.
*/
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
protected static class OldAPIStatefulProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, String> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a StatefulProcessor for the new PAPI -- nothing to be migrated -- deleting.

@@ -1775,30 +1286,9 @@ public void process(final Record<String, String> record) {
}
}

@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private <K, V> org.apache.kafka.streams.processor.ProcessorSupplier<K, V> define(final org.apache.kafka.streams.processor.Processor<K, V> processor) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just dropped this one -- not really useful to have..

}

@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private <K, V> org.apache.kafka.streams.processor.ProcessorSupplier<K, V> defineWithStoresOldAPI(final Supplier<org.apache.kafka.streams.processor.Processor<K, V>> supplier,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already defineWithStores() for new PAPI -- nothing to be migrated -- deleting

private <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> defineWithStores(final Supplier<Processor<KIn, VIn, KOut, VOut>> supplier,
final Set<StoreBuilder<?>> stores) {
return new ProcessorSupplier<KIn, VIn, KOut, VOut>() {
return new ProcessorSupplier<>() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side cleanup

private Topology createStatefulTopology(final String storeName) {
return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
.addProcessor("processor", define(new OldAPIStatefulProcessor(storeName)), "source")
.addProcessor("processor", () -> new StatefulProcessor(storeName), "source")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, it looks like OldAPIStatefulProcessor has been corrected to StatefulProcessor for migrating to current API. Got it! Thanks for teaching!

@chia7712
Copy link
Member

@mjsax could you please fix the conflicts ?

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax thanks for this patch. LGTM and one small question is left.

import static org.mockito.Mockito.when;

@SuppressWarnings("deprecation") // this is a test of a deprecated API
public class MockProcessorContextTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deprecated org.apache.kafka.streams.processor.MockProcessorContext still exists in the codebase. Should we retain the unit tests?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I suppose the trouble is that the unit test uses the removed classes, but the unit under test will not be removed yet. Can the tests be rewritten with transformers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @mjsax - LGTM

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, LGTM modulo the question raised by Chia

@mjsax
Copy link
Member Author

mjsax commented Dec 19, 2024

Java 23:

Found 1 test failures:
FAILED ❌ PlaintextAdminIntegrationTest > testConsumerGroupsDeprecatedConsumerGroupState(String, String).quorum=kraft.groupProtocol=consumer
Found 1 flaky test failures:
FLAKY ⚠️  SslTransportLayerTest > "testSelectorPollReadSize(Args).args=tlsProtocol=TLSv1.2, useInlinePem=false"

Java 17:

Found 5 test failures:
FAILED ❌ AuthorizerIntegrationTest > testConsumeUsingAssignWithNoAccess(String, String).quorum=kraft.groupProtocol=consumer
FAILED ❌ PlaintextProducerSendTest > testCloseWithZeroTimeoutFromCallerThread(String, String).quorum=kraft.groupProtocol=classic
FAILED ❌ RemoteLogManagerTest > testRLMOpsWhenMetadataIsNotReady()
FAILED ❌ PlaintextAdminIntegrationTest > testConsumerGroupsDeprecatedConsumerGroupState(String, String).quorum=kraft.groupProtocol=consumer
FAILED ❌ PlaintextAdminIntegrationTest > testConsumerGroups(String, String).quorum=kraft.groupProtocol=consumer
Found 2 flaky test failures:
FLAKY ⚠️  TransactionsWithTieredStoreTest > "testBumpTransactionalEpochWithTV2Enabled(String, String, boolean).quorum=kraft, groupProtocol=consumer, isTV2Enabled=true"
FLAKY ⚠️  SslTransportLayerTest > "testSelectorPollReadSize(Args).args=tlsProtocol=TLSv1.2, useInlinePem=false"

@mjsax mjsax merged commit 9aa900f into apache:trunk Dec 20, 2024
9 checks passed
@mjsax mjsax deleted the kafka-12829-remove-topology-process branch December 20, 2024 00:11
mjsax added a commit that referenced this pull request Dec 20, 2024
@mjsax
Copy link
Member Author

mjsax commented Dec 20, 2024

Merged to trunk and cherry-picked to 4.0 branch.

ijuma added a commit to ijuma/kafka that referenced this pull request Dec 20, 2024
…e-old-protocol-versions

* apache-github/trunk:
  KAFKA-18312: Added entityType: topicName to SubscribedTopicNames in ShareGroupHeartbeatRequest.json (apache#18285)
  HOTFIX: fix incompatible types: Optional<TimestampAndOffset> cannot be converted to Option<TimestampAndOffset> (apache#18284)
  MINOR Fix some test-catalog issues (apache#18272)
  KAFKA-18180: Move OffsetResultHolder to storage module (apache#18100)
  KAFKA-18301; Make coordinator records first class citizen (apache#18261)
  KAFKA-18262 Remove DefaultPartitioner and UniformStickyPartitioner (apache#18204)
  KAFKA-18296 Remove deprecated KafkaBasedLog constructor (apache#18257)
  KAFKA-12829: Remove old Processor and ProcessorSupplier interfaces (apache#18238)
  KAFKA-18292 Remove deprecated methods of UpdateFeaturesOptions (apache#18245)
  KAFKA-12829: Remove deprecated Topology#addProcessor of old Processor API (apache#18154)
  KAFKA-18035, KAFKA-18306, KAFKA-18092: Address TransactionsTest flaky tests (apache#18264)
  MINOR: change the default linger time in the new coordinator (apache#18274)
  KAFKA-18305: validate controller.listener.names is not in inter.broker.listener.name for kcontrollers (apache#18222)
  KAFKA-18207: Serde for handling transaction records (apache#18136)
  KAFKA-13722: Refactor Kafka Streams store interfaces (apache#18243)
  KAFKA-17131: Refactor TimeDefinitions (apache#18241)
  MINOR: Fix MessageFormatters (apache#18266)
  Mark flaky tests for Dec 18, 2024 (apache#18263)
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants