Skip to content

KAFKA-18262: Remove DefaultPartitioner and UniformStickyPartitioner #18204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -326,23 +326,6 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
this(Utils.propsToMap(properties), keySerializer, valueSerializer);
}

/**
* Check if partitioner is deprecated and log a warning if it is.
*/
@SuppressWarnings("deprecation")
private void warnIfPartitionerDeprecated() {
// Using DefaultPartitioner and UniformStickyPartitioner is deprecated, see KIP-794.
if (partitioner instanceof org.apache.kafka.clients.producer.internals.DefaultPartitioner) {
log.warn("DefaultPartitioner is deprecated. Please clear " + ProducerConfig.PARTITIONER_CLASS_CONFIG
+ " configuration setting to get the default partitioning behavior");
}
if (partitioner instanceof org.apache.kafka.clients.producer.UniformStickyPartitioner) {
log.warn("UniformStickyPartitioner is deprecated. Please clear " + ProducerConfig.PARTITIONER_CLASS_CONFIG
+ " configuration setting and set " + ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG
+ " to 'true' to get the uniform sticky partitioning behavior");
}
}

// visible for testing
@SuppressWarnings({"unchecked", "this-escape"})
KafkaProducer(ProducerConfig config,
Expand Down Expand Up @@ -385,7 +368,6 @@ private void warnIfPartitionerDeprecated() {
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
warnIfPartitionerDeprecated();
this.partitionerIgnoreKeys = config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,31 +119,6 @@ public MockProducer(final Cluster cluster,
this.mockMetrics = new HashMap<>();
}

/**
* Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers.
*
* Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer) new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
*/
@SuppressWarnings("deprecation")
public MockProducer(final boolean autoComplete,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
this(Cluster.empty(), autoComplete, new org.apache.kafka.clients.producer.internals.DefaultPartitioner(), keySerializer, valueSerializer);
}

/**
* Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers.
*
* Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer) new MockProducer(cluster, autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
*/
@SuppressWarnings("deprecation")
public MockProducer(final Cluster cluster,
final boolean autoComplete,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
this(cluster, autoComplete, new org.apache.kafka.clients.producer.internals.DefaultPartitioner(), keySerializer, valueSerializer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MockProducer has a null partitioner and it may cause NPE. Could you please fix that too? We can pick up a partition based on cluster.partitionsForTopic(topic); if the partitioner is null

}

/**
* Create a new mock producer with invented metadata the given autoComplete setting, partitioner and key\value serializers.
*
Expand Down Expand Up @@ -563,6 +538,9 @@ private int partition(ProducerRecord<K, V> record, Cluster cluster) {
}
byte[] keyBytes = keySerializer.serialize(topic, record.headers(), record.key());
byte[] valueBytes = valueSerializer.serialize(topic, record.headers(), record.value());
if (partitioner == null) {
return this.cluster.partitionsForTopic(record.topic()).get(0).partition();
}
return this.partitioner.partition(topic, record.key(), keyBytes, record.value(), valueBytes, cluster);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public interface Partitioner extends Configurable, Closeable {
void close();

/**
* Note this method is only implemented in DefaultPartitioner and {@link UniformStickyPartitioner} which
* Note this method is only implemented in DefaultPartitioner and UniformStickyPartitioner which
* are now deprecated. See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner">KIP-794</a> for more info.
* <p>
* Notifies the partitioner a new batch is about to be created. When using the sticky partitioner,
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class MockProducerTest {
private final String groupId = "group";

private void buildMockProducer(boolean autoComplete) {
this.producer = new MockProducer<>(autoComplete, new MockSerializer(), new MockSerializer());
this.producer = new MockProducer<>(Cluster.empty(), autoComplete, null, new MockSerializer(), new MockSerializer());
}

@AfterEach
Expand Down Expand Up @@ -87,10 +87,16 @@ public void testPartitioner() throws Exception {
PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null);
Cluster cluster = new Cluster(null, new ArrayList<>(0), asList(partitionInfo0, partitionInfo1),
Collections.emptySet(), Collections.emptySet());
MockProducer<String, String> producer = new MockProducer<>(cluster, true, new StringSerializer(), new StringSerializer());
MockProducer<String, String> producer = new MockProducer<>(
cluster,
true,
new org.apache.kafka.clients.producer.RoundRobinPartitioner(),
new StringSerializer(),
new StringSerializer()
);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key", "value");
Future<RecordMetadata> metadata = producer.send(record);
assertEquals(1, metadata.get().partition(), "Partition should be correct");
assertEquals(0, metadata.get().partition(), "Partition should be correct");
producer.clear();
assertEquals(0, producer.history().size(), "Clear should erase our history");
producer.close();
Expand Down Expand Up @@ -680,7 +686,7 @@ public void shouldNotThrowOnFlushProducerIfProducerIsFenced() {
@Test
@SuppressWarnings("unchecked")
public void shouldThrowClassCastException() {
try (MockProducer<Integer, String> customProducer = new MockProducer<>(true, new IntegerSerializer(), new StringSerializer())) {
try (MockProducer<Integer, String> customProducer = new MockProducer<>(Cluster.empty(), true, null, new IntegerSerializer(), new StringSerializer())) {
assertThrows(ClassCastException.class, () -> customProducer.send(new ProducerRecord(topic, "key1", "value1")));
}
}
Expand Down
Loading
Loading