Description
I would like for KafkaItemWriter
to be thread safe, considering KafkaTemplate
is also thread safe. FlatFileItemWriter
/AbstractFileItemWriter
behaves similarly, doing nothing to ensure thread safety itself (beyond not sharing variables across thread boundaries), and instead relying on the underlying BufferedWriter
to handle synchronization. The same thing should be possible for KafkaItemWriter
, since KafkaTemplate
is thread safe.
KafkaItemWriter currently shares a List<ListenableFuture<SendResult<K, T>>>
across threads, which is written to during write()
and later clear()
ed again. I could wrap the whole thing with a SynchronizedItemStreamWriter
, but it would be a shame to lose multithreading capabilities, especially since KafkaTemplate
can handle that anyway. Instead I propose to simply make the listenableFutures
a ThreadLocal<...> listenableFutures = ThreadLocal.withInitial(ArrayList::new)
, which should take care of the problem.
Context:
we noticed ConcurrentModificationException
s in KafkaItemWriter::flush
in our integration tests, which set KafkaTemplate
to auto-flush, unlike the live application, which does not. This causes the write()
call to take longer, and that allows 2 threads to interfere. The live application has 10 threads and chunk size 1000, processing about 1k items/s in total. This works out to about 100 items/s/thread or each thread executing write()
every 10s or so. Since there is no auto-flush outside of tests, write()
only dumps the items onto the KafkaProducer
's RecordAccumulator and immediately exits.It seems that these calls mostly miss each other, but the problem should still be there. We would rather not synchronize everything, so for now we are using the following modified ThreadSafeKafkaItemWriter:
public class ThreadSafeKafkaItemWriter<K, T> extends KeyValueItemWriter<K, T> {
protected KafkaTemplate<K, T> kafkaTemplate;
private final ThreadLocal<List<ListenableFuture<SendResult<K, T>>>> listenableFutures = ThreadLocal.withInitial(ArrayList::new);
private long timeout = -1;
@Override
protected void writeKeyValue(K key, T value) {
List<ListenableFuture<SendResult<K, T>>> futures = this.listenableFutures.get();
if (this.delete) {
futures.add(this.kafkaTemplate.sendDefault(key, null));
} else {
futures.add(this.kafkaTemplate.sendDefault(key, value));
}
}
@Override
protected void flush() throws Exception {
this.kafkaTemplate.flush();
List<ListenableFuture<SendResult<K, T>>> futures = this.listenableFutures.get();
for (ListenableFuture<SendResult<K, T>> future : futures) {
if (this.timeout >= 0) {
future.get(this.timeout, TimeUnit.MILLISECONDS);
} else {
future.get();
}
}
futures.clear();
}
@Override
protected void init() {
Assert.notNull(this.kafkaTemplate, "KafkaTemplate must not be null.");
Assert.notNull(this.kafkaTemplate.getDefaultTopic(), "KafkaTemplate must have the default topic set.");
}
/**
* Set the {@link KafkaTemplate} to use.
*
* @param kafkaTemplate to use
*/
public void setKafkaTemplate(KafkaTemplate<K, T> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* The time limit to wait when flushing items to Kafka.
*
* @param timeout milliseconds to wait, defaults to -1 (no timeout).
* @since 4.3.2
*/
public void setTimeout(long timeout) {
this.timeout = timeout;
}
}