Skip to content

Commit 2fef0fc

Browse files
garyrussellartembilan
authored andcommitted
GH-1101: Overide transactionIdPrefix
Resolves #1101
1 parent 02dc029 commit 2fef0fc

15 files changed

+216
-74
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Map;
2525
import java.util.Map.Entry;
2626
import java.util.concurrent.BlockingQueue;
27+
import java.util.concurrent.ConcurrentHashMap;
2728
import java.util.concurrent.Future;
2829
import java.util.concurrent.LinkedBlockingQueue;
2930
import java.util.concurrent.TimeUnit;
@@ -99,7 +100,7 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
99100

100101
private final AtomicInteger transactionIdSuffix = new AtomicInteger();
101102

102-
private final BlockingQueue<CloseSafeProducer<K, V>> cache = new LinkedBlockingQueue<>();
103+
private final Map<String, BlockingQueue<CloseSafeProducer<K, V>>> cache = new ConcurrentHashMap<>();
103104

104105
private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<>();
105106

@@ -264,16 +265,19 @@ public void destroy() {
264265
if (producerToClose != null) {
265266
producerToClose.getDelegate().close(this.physicalCloseTimeout);
266267
}
267-
producerToClose = this.cache.poll();
268-
while (producerToClose != null) {
269-
try {
270-
producerToClose.getDelegate().close(this.physicalCloseTimeout);
271-
}
272-
catch (Exception e) {
273-
LOGGER.error(e, "Exception while closing producer");
268+
this.cache.values().forEach(queue -> {
269+
CloseSafeProducer<K, V> next = queue.poll();
270+
while (next != null) {
271+
try {
272+
next.getDelegate().close(this.physicalCloseTimeout);
273+
}
274+
catch (Exception e) {
275+
LOGGER.error(e, "Exception while closing producer");
276+
}
277+
next = queue.poll();
274278
}
275-
producerToClose = this.cache.poll();
276-
}
279+
});
280+
this.cache.clear();
277281
synchronized (this.consumerProducers) {
278282
this.consumerProducers.forEach(
279283
(k, v) -> v.getDelegate().close(this.physicalCloseTimeout));
@@ -314,12 +318,18 @@ public boolean isRunning() {
314318

315319
@Override
316320
public Producer<K, V> createProducer() {
317-
if (this.transactionIdPrefix != null) {
321+
return createProducer(this.transactionIdPrefix);
322+
}
323+
324+
@Override
325+
public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
326+
String txIdPrefix = txIdPrefixArg == null ? this.transactionIdPrefix : txIdPrefixArg;
327+
if (txIdPrefix != null) {
318328
if (this.producerPerConsumerPartition) {
319-
return createTransactionalProducerForPartition();
329+
return createTransactionalProducerForPartition(txIdPrefix);
320330
}
321331
else {
322-
return createTransactionalProducer();
332+
return createTransactionalProducer(txIdPrefix);
323333
}
324334
}
325335
if (this.producerPerThread) {
@@ -349,15 +359,20 @@ protected Producer<K, V> createKafkaProducer() {
349359
return new KafkaProducer<>(this.configs, this.keySerializer, this.valueSerializer);
350360
}
351361

352-
Producer<K, V> createTransactionalProducerForPartition() {
362+
protected Producer<K, V> createTransactionalProducerForPartition() {
363+
return createTransactionalProducerForPartition(this.transactionIdPrefix);
364+
}
365+
366+
protected Producer<K, V> createTransactionalProducerForPartition(String txIdPrefix) {
353367
String suffix = TransactionSupport.getTransactionIdSuffix();
354368
if (suffix == null) {
355-
return createTransactionalProducer();
369+
return createTransactionalProducer(txIdPrefix);
356370
}
357371
else {
358372
synchronized (this.consumerProducers) {
359373
if (!this.consumerProducers.containsKey(suffix)) {
360-
CloseSafeProducer<K, V> newProducer = doCreateTxProducer(suffix, this::removeConsumerProducer);
374+
CloseSafeProducer<K, V> newProducer = doCreateTxProducer(txIdPrefix, suffix,
375+
this::removeConsumerProducer);
361376
this.consumerProducers.put(suffix, newProducer);
362377
return newProducer;
363378
}
@@ -387,29 +402,43 @@ private void removeConsumerProducer(CloseSafeProducer<K, V> producerToRemove) {
387402
* @since 1.3
388403
*/
389404
protected Producer<K, V> createTransactionalProducer() {
390-
Producer<K, V> cachedProducer = this.cache.poll();
405+
return createTransactionalProducer(this.transactionIdPrefix);
406+
}
407+
408+
protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
409+
BlockingQueue<CloseSafeProducer<K, V>> queue = getCache(txIdPrefix);
410+
Producer<K, V> cachedProducer = queue.poll();
391411
if (cachedProducer == null) {
392-
return doCreateTxProducer("" + this.transactionIdSuffix.getAndIncrement(), null);
412+
return doCreateTxProducer(txIdPrefix, "" + this.transactionIdSuffix.getAndIncrement(), null);
393413
}
394414
else {
395415
return cachedProducer;
396416
}
397417
}
398418

399-
private CloseSafeProducer<K, V> doCreateTxProducer(String suffix,
419+
private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
400420
@Nullable Consumer<CloseSafeProducer<K, V>> remover) {
401421

402422
Producer<K, V> newProducer;
403423
Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
404-
newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
424+
newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, prefix + suffix);
405425
newProducer = new KafkaProducer<>(newProducerConfigs, this.keySerializer, this.valueSerializer);
406426
newProducer.initTransactions();
407-
return new CloseSafeProducer<>(newProducer, this.cache, remover,
427+
return new CloseSafeProducer<>(newProducer, this.cache.get(prefix), remover,
408428
(String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
409429
}
410430

431+
@Nullable
411432
protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
412-
return this.cache;
433+
return getCache(this.transactionIdPrefix);
434+
}
435+
436+
@Nullable
437+
protected BlockingQueue<CloseSafeProducer<K, V>> getCache(String txIdPrefix) {
438+
if (txIdPrefix == null) {
439+
return null;
440+
}
441+
return this.cache.computeIfAbsent(txIdPrefix, txId -> new LinkedBlockingQueue<>());
413442
}
414443

415444
@Override

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
7878

7979
private volatile ProducerListener<K, V> producerListener = new LoggingProducerListener<K, V>();
8080

81+
private String transactionIdPrefix;
82+
8183

8284
/**
8385
* Create an instance using the supplied producer factory and autoFlush false.
@@ -154,6 +156,19 @@ public boolean isTransactional() {
154156
return this.transactional;
155157
}
156158

159+
public String getTransactionIdPrefix() {
160+
return this.transactionIdPrefix;
161+
}
162+
163+
/**
164+
* Set a transaction id prefix to override the prefix in the producer factory.
165+
* @param transactionIdPrefix the prefix.
166+
* @since 2.3
167+
*/
168+
public void setTransactionIdPrefix(String transactionIdPrefix) {
169+
this.transactionIdPrefix = transactionIdPrefix;
170+
}
171+
157172
/**
158173
* Return the producer factory used by this template.
159174
* @return the factory.
@@ -277,7 +292,7 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
277292
transactionIdSuffix = null;
278293
}
279294

280-
producer = this.producerFactory.createProducer();
295+
producer = this.producerFactory.createProducer(this.transactionIdPrefix);
281296

282297
try {
283298
producer.beginTransaction();
@@ -427,11 +442,11 @@ private Producer<K, V> getTheProducer() {
427442
return producer;
428443
}
429444
KafkaResourceHolder<K, V> holder = ProducerFactoryUtils
430-
.getTransactionalResourceHolder(this.producerFactory);
445+
.getTransactionalResourceHolder(this.producerFactory, this.transactionIdPrefix);
431446
return holder.getProducer();
432447
}
433448
else {
434-
return this.producerFactory.createProducer();
449+
return this.producerFactory.createProducer(this.transactionIdPrefix);
435450
}
436451
}
437452

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,16 @@ public interface ProducerFactory<K, V> {
3434
*/
3535
Producer<K, V> createProducer();
3636

37+
/**
38+
* Create a producer with an overridden transaction id prefix.
39+
* @param txIdPrefix the transaction id prefix.
40+
* @return the producer.
41+
* @since 2.3
42+
*/
43+
default Producer<K, V> createProducer(@SuppressWarnings("unused") String txIdPrefix) {
44+
throw new UnsupportedOperationException("This factory does not support this method");
45+
}
46+
3747
/**
3848
* Return true if the factory supports transactions.
3949
* @return true if transactional.

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,29 @@ private ProducerFactoryUtils() {
5050
public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
5151
final ProducerFactory<K, V> producerFactory) {
5252

53+
return getTransactionalResourceHolder(producerFactory, null);
54+
}
55+
56+
/**
57+
* Obtain a Producer that is synchronized with the current transaction, if any.
58+
* @param producerFactory the ProducerFactory to obtain a Channel for
59+
* @param txIdPrefix the transaction id prefix; if null, the producer factory
60+
* prefix is used.
61+
* @param <K> the key type.
62+
* @param <V> the value type.
63+
* @return the resource holder.
64+
* @since 2.3
65+
*/
66+
public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
67+
final ProducerFactory<K, V> producerFactory, @Nullable String txIdPrefix) {
68+
5369
Assert.notNull(producerFactory, "ProducerFactory must not be null");
5470

5571
@SuppressWarnings("unchecked")
5672
KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager
5773
.getResource(producerFactory);
5874
if (resourceHolder == null) {
59-
Producer<K, V> producer = producerFactory.createProducer();
75+
Producer<K, V> producer = producerFactory.createProducer(txIdPrefix);
6076

6177
try {
6278
producer.beginTransaction();

spring-kafka/src/main/java/org/springframework/kafka/transaction/KafkaTransactionManager.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ public class KafkaTransactionManager<K, V> extends AbstractPlatformTransactionMa
7070

7171
private final ProducerFactory<K, V> producerFactory;
7272

73+
private String transactionIdPrefix;
74+
7375
/**
7476
* Create a new KafkaTransactionManager, given a ProducerFactory.
7577
* Transaction synchronization is turned off by default, as this manager might be used alongside a datastore-based
@@ -84,6 +86,15 @@ public KafkaTransactionManager(ProducerFactory<K, V> producerFactory) {
8486
this.producerFactory = producerFactory;
8587
}
8688

89+
/**
90+
* Set a transaction id prefix to override the prefix in the producer factory.
91+
* @param transactionIdPrefix the prefix.
92+
* @since 2.3
93+
*/
94+
public void setTransactionIdPrefix(String transactionIdPrefix) {
95+
this.transactionIdPrefix = transactionIdPrefix;
96+
}
97+
8798
/**
8899
* Get the producer factory.
89100
* @return the producerFactory
@@ -132,7 +143,8 @@ protected void doBegin(Object transaction, TransactionDefinition definition) {
132143
KafkaTransactionObject<K, V> txObject = (KafkaTransactionObject<K, V>) transaction;
133144
KafkaResourceHolder<K, V> resourceHolder = null;
134145
try {
135-
resourceHolder = ProducerFactoryUtils.getTransactionalResourceHolder(getProducerFactory());
146+
resourceHolder = ProducerFactoryUtils.getTransactionalResourceHolder(getProducerFactory(),
147+
this.transactionIdPrefix);
136148
if (logger.isDebugEnabled()) {
137149
logger.debug("Created Kafka transaction on producer [" + resourceHolder.getProducer() + "]");
138150
}

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.Collections;
2222
import java.util.Map;
23-
import java.util.concurrent.BlockingQueue;
2423
import java.util.concurrent.CountDownLatch;
2524
import java.util.concurrent.TimeUnit;
2625

@@ -101,10 +100,12 @@ public void testNestedTxProducerIsCached() throws Exception {
101100
container.start();
102101
try {
103102
ListenableFuture<SendResult<Integer, String>> future = template.send("txCache1", "foo");
104-
future.get();
105-
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class)).hasSize(0);
103+
future.get(10, TimeUnit.SECONDS);
104+
pf.getCache();
105+
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", Map.class)).hasSize(0);
106106
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
107-
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", BlockingQueue.class)).hasSize(1);
107+
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(1);
108+
assertThat(pfTx.getCache()).hasSize(1);
108109
}
109110
finally {
110111
container.stop();
@@ -140,10 +141,10 @@ public void testContainerTxProducerIsNotCached() throws Exception {
140141
container.start();
141142
try {
142143
ListenableFuture<SendResult<Integer, String>> future = template.send("txCache2", "foo");
143-
future.get();
144-
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class)).hasSize(0);
144+
future.get(10, TimeUnit.SECONDS);
145+
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", Map.class)).hasSize(0);
145146
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
146-
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", BlockingQueue.class)).hasSize(0);
147+
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(0);
147148
}
148149
finally {
149150
container.stop();

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.time.Duration;
2727
import java.util.HashMap;
28+
import java.util.Map;
2829
import java.util.Queue;
2930
import java.util.concurrent.BlockingQueue;
3031
import java.util.concurrent.atomic.AtomicInteger;
@@ -55,9 +56,9 @@ public void testProducerClosedAfterBadTransition() throws Exception {
5556
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
5657

5758
@Override
58-
protected Producer createTransactionalProducer() {
59+
protected Producer createTransactionalProducer(String txIdPrefix) {
5960
producer.initTransactions();
60-
BlockingQueue<Producer> cache = getCache();
61+
BlockingQueue<Producer> cache = getCache(txIdPrefix);
6162
Producer cached = cache.poll();
6263
return cached == null ? new CloseSafeProducer(producer, cache) : cached;
6364
}
@@ -80,8 +81,10 @@ protected Producer createTransactionalProducer() {
8081
kafkaTemplate.send("foo", "bar");
8182
return null;
8283
});
83-
BlockingQueue cache = KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class);
84+
Map<?, ?> cache = KafkaTestUtils.getPropertyValue(pf, "cache", Map.class);
8485
assertThat(cache).hasSize(1);
86+
Queue queue = (Queue) cache.get("foo");
87+
assertThat(queue).hasSize(1);
8588
try {
8689
transactionTemplate.execute(s -> {
8790
return null;
@@ -90,7 +93,7 @@ protected Producer createTransactionalProducer() {
9093
catch (CannotCreateTransactionException e) {
9194
assertThat(e.getCause().getMessage()).contains("Invalid transition");
9295
}
93-
assertThat(cache).hasSize(0);
96+
assertThat(queue).hasSize(0);
9497

9598
InOrder inOrder = inOrder(producer);
9699
inOrder.verify(producer).initTransactions();
@@ -119,7 +122,7 @@ protected Producer createKafkaProducer() {
119122
assertThat(aProducer).isNotNull();
120123
aProducer.close();
121124
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNotNull();
122-
Queue cache = KafkaTestUtils.getPropertyValue(pf, "cache", Queue.class);
125+
Map<?, ?> cache = KafkaTestUtils.getPropertyValue(pf, "cache", Map.class);
123126
assertThat(cache.size()).isEqualTo(0);
124127
pf.reset();
125128
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
@@ -134,9 +137,9 @@ public void testResetTx() throws Exception {
134137
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
135138

136139
@Override
137-
protected Producer createTransactionalProducer() {
140+
protected Producer createTransactionalProducer(String txIdPrefix) {
138141
producer.initTransactions();
139-
BlockingQueue<Producer> cache = getCache();
142+
BlockingQueue<Producer> cache = getCache(txIdPrefix);
140143
Producer cached = cache.poll();
141144
return cached == null ? new CloseSafeProducer(producer, cache) : cached;
142145
}
@@ -148,10 +151,12 @@ protected Producer createTransactionalProducer() {
148151
assertThat(aProducer).isNotNull();
149152
aProducer.close();
150153
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
151-
Queue cache = KafkaTestUtils.getPropertyValue(pf, "cache", Queue.class);
154+
Map<?, ?> cache = KafkaTestUtils.getPropertyValue(pf, "cache", Map.class);
152155
assertThat(cache.size()).isEqualTo(1);
156+
Queue queue = (Queue) cache.get("foo");
157+
assertThat(queue.size()).isEqualTo(1);
153158
pf.onApplicationEvent(new ContextStoppedEvent(ctx));
154-
assertThat(cache.size()).isEqualTo(0);
159+
assertThat(queue.size()).isEqualTo(0);
155160
verify(producer).close(any(Duration.class));
156161
}
157162

0 commit comments

Comments
 (0)