Skip to content

Commit 3420ddc

Browse files
committed
GH-1216: Move Transactions to the top level
Fixes #1216 It is hard to determine transactions support in the reference manual because it is hidden under `Sending Messages` * Move `Transactions` to the top level and after `Receiving Messages` * Upgrade to Spring Data Moore RC3; Gradle 5.6.2
1 parent ae51a9c commit 3420ddc

File tree

3 files changed

+149
-148
lines changed

3 files changed

+149
-148
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ ext {
5454
scalaVersion = '2.12'
5555
springRetryVersion = '1.2.4.RELEASE'
5656
springVersion = '5.2.0.RC2'
57-
springDataCommonsVersion = '2.2.0.BUILD-SNAPSHOT'
57+
springDataCommonsVersion = '2.2.0.RC3'
5858

5959
idPrefix = 'kafka'
6060
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.1-bin.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.2-bin.zip
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists

src/reference/asciidoc/kafka.adoc

Lines changed: 147 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -321,155 +321,9 @@ public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
321321
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
322322
}
323323
324-
----
325-
====
326-
[[transactions]]
327-
===== Transactions
328-
329-
This section describes how Spring for Apache Kafka supports transactions.
330-
331-
====== Overview
332-
333-
The 0.11.0.0 client library added support for transactions.
334-
Spring for Apache Kafka adds support in the following ways:
335-
336-
* `KafkaTransactionManager`: Used with normal Spring transaction support (`@Transactional`, `TransactionTemplate` etc).
337-
* Transactional `KafkaMessageListenerContainer`
338-
* Local transactions with `KafkaTemplate`
339-
340-
Transactions are enabled by providing the `DefaultKafkaProducerFactory` with a `transactionIdPrefix`.
341-
In that case, instead of managing a single shared `Producer`, the factory maintains a cache of transactional producers.
342-
When the user calls `close()` on a producer, it is returned to the cache for reuse instead of actually being closed.
343-
The `transactional.id` property of each producer is `transactionIdPrefix` + `n`, where `n` starts with `0` and is incremented for each new producer, unless the transaction is started by a listener container with a record-based listener.
344-
In that case, the `transactional.id` is `<transactionIdPrefix>.<group.id>.<topic>.<partition>`.
345-
This is to properly support fencing zombies, https://www.confluent.io/blog/transactions-apache-kafka/[as described here].
346-
This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and 2.2.0.
347-
If you wish to revert to the previous behavior, you can set the `producerPerConsumerPartition` property on the `DefaultKafkaProducerFactory` to `false`.
348-
349-
NOTE: While transactions are supported with batch listeners, zombie fencing cannot be supported because a batch may contain records from multiple topics or partitions.
350-
351-
Also see <<transaction-id-prefix>>.
352-
353-
====== Using `KafkaTransactionManager`
354-
355-
The `KafkaTransactionManager` is an implementation of Spring Framework's `PlatformTransactionManager`.
356-
It is provided with a reference to the producer factory in its constructor.
357-
If you provide a custom producer factory, it must support transactions.
358-
See `ProducerFactory.transactionCapable()`.
359-
360-
You can use the `KafkaTransactionManager` with normal Spring transaction support (`@Transactional`, `TransactionTemplate`, and others).
361-
If a transaction is active, any `KafkaTemplate` operations performed within the scope of the transaction use the transaction's `Producer`.
362-
The manager commits or rolls back the transaction, depending on success or failure.
363-
You must configure the `KafkaTemplate` to use the same `ProducerFactory` as the transaction manager.
364-
365-
====== Transactional Listener Container and Exactly Once Processing
366-
367-
You can provide a listener container with a `KafkaAwareTransactionManager` instance.
368-
When so configured, the container starts a transaction before invoking the listener.
369-
Any `KafkaTemplate` operations performed by the listener participate in the transaction.
370-
If the listener successfully processes the record (or multiple records, when using a `BatchMessageListener`), the container sends the offsets to the transaction by using `producer.sendOffsetsToTransaction()`), before the transaction manager commits the transaction.
371-
If the listener throws an exception, the transaction is rolled back and the consumer is repositioned so that the rolled-back record(s) can be retrieved on the next poll.
372-
See <<after-rollback>> for more information and for handling records that repeatedly fail.
373-
374-
====== Transaction Synchronization
375-
376-
If you need to synchronize a Kafka transaction with some other transaction, configure the listener container with the appropriate transaction manager (one that supports synchronization, such as the `DataSourceTransactionManager`).
377-
Any operations performed on a transactional `KafkaTemplate` from the listener participate in a single transaction.
378-
The Kafka transaction is committed (or rolled back) immediately after the controlling transaction.
379-
Before exiting the listener, you should invoke one of the template's `sendOffsetsToTransaction` methods (unless you use a <<chained-transaction-manager,`ChainedKafkaTransactionManager`>>).
380-
For convenience, the listener container binds its consumer group ID to the thread, so, generally, you can use the first method.
381-
The following listing shows the two method signatures:
382-
383-
====
384-
[source, java]
385-
----
386-
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets);
387-
388-
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId);
389-
----
390-
====
391-
392-
The following example shows how to use the first signature of the `sendOffsetsToTransaction` method:
393-
394-
====
395-
[source, java]
396-
----
397-
@Bean
398-
KafkaMessageListenerContainer container(ConsumerFactory<String, String> cf,
399-
final KafkaTemplate template) {
400-
ContainerProperties props = new ContainerProperties("foo");
401-
props.setGroupId("group");
402-
props.setTransactionManager(new SomeOtherTransactionManager());
403-
...
404-
props.setMessageListener((MessageListener<String, String>) m -> {
405-
template.send("foo", "bar");
406-
template.send("baz", "qux");
407-
template.sendOffsetsToTransaction(
408-
Collections.singletonMap(new TopicPartition(m.topic(), m.partition()),
409-
new OffsetAndMetadata(m.offset() + 1)));
410-
});
411-
return new KafkaMessageListenerContainer<>(cf, props);
412-
}
413-
----
414-
====
415-
416-
NOTE: The offset to be committed is one greater than the offset of the records processed by the listener.
417-
418-
IMPORTANT: You should call this only when you use transaction synchronization.
419-
When a listener container is configured to use a `KafkaTransactionManager` or `ChainedKafkaTransactionManager`, it takes care of sending the offsets to the transaction.
420-
421-
See <<ex-jdbc-sync>> for an example application that synchronizes JDBC and Kafka transactions.
422-
423-
[[chained-transaction-manager]]
424-
====== Using `ChainedKafkaTransactionManager`
425-
426-
The `ChainedKafkaTransactionManager` was introduced in version 2.1.3.
427-
This is a subclass of `ChainedTransactionManager` that can have exactly one `KafkaTransactionManager`.
428-
Since it is a `KafkaAwareTransactionManager`, the container can send the offsets to the transaction in the same way as when the container is configured with a simple `KafkaTransactionManager`.
429-
This provides another mechanism for synchronizing transactions without having to send the offsets to the transaction in the listener code.
430-
You should chain your transaction managers in the desired order and provide the `ChainedTransactionManager` in the `ContainerProperties`.
431-
432-
See <<ex-jdbc-sync>> for an example application that synchronizes JDBC and Kafka transactions.
433-
434-
====== `KafkaTemplate` Local Transactions
435-
436-
You can use the `KafkaTemplate` to execute a series of operations within a local transaction.
437-
The following example shows how to do so:
438-
439-
====
440-
[source, java]
441-
----
442-
boolean result = template.executeInTransaction(t -> {
443-
t.sendDefault("thing1", "thing2");
444-
t.sendDefault("cat", "hat");
445-
return true;
446-
});
447324
----
448325
====
449326

450-
The argument in the callback is the template itself (`this`).
451-
If the callback exits normally, the transaction is committed.
452-
If an exception is thrown, the transaction is rolled back.
453-
454-
NOTE: If there is a `KafkaTransactionManager` (or synchronized) transaction in process, it is not used.
455-
Instead, a new "nested" transaction is used.
456-
457-
[[transaction-id-prefix]]
458-
====== `transactionIdPrefix`
459-
460-
As mentioned in <<transactions, the overview>>, the producer factory is configured with this property to build the producer `transactional.id` property.
461-
There is rather a dichotomy when specifying this property in that, when running multiple instances of the application, it must be the same on all instances to satisfy fencing zombies (also mentioned in the overview) when producing records on a listener container thread.
462-
However, when producing records using transactions that are **not** started by a listener container, the prefix has to be different on each instance.
463-
Version 2.3, makes this simpler to configure, especially in a Spring Boot application.
464-
In previous versions, you had to create two producer factories and `KafkaTemplate` s - one for producing records on a listener container thread and one for stand-alone transactions started by `kafkaTemplate.executeInTransaction()` or by a transaction interceptor on a `@Transactional` method.
465-
466-
Now, you can override the factory's `transactionalIdPrefix` on the `KafkaTemplate` and the `KafkaTransactionManager`.
467-
468-
When using a transaction manager and template for a listener container, you would normally leave this to default to the producer factory's property.
469-
This value should be the same for all application instances.
470-
For transactions started by the template (or the transaction manager for `@Transaction`) you should set the property on the template and transaction manager respectively.
471-
This property must have a different value on each application instance.
472-
473327
[[replying-template]]
474328
===== Using `ReplyingKafkaTemplate`
475329

@@ -2298,6 +2152,153 @@ Note that `SimpleThreadScope` does not destroy beans that have a destruction int
22982152
IMPORTANT: By default, the application context's event multicaster invokes event listeners on the calling thread.
22992153
If you change the multicaster to use an async executor, thread cleanup is not effective.
23002154

2155+
[[transactions]]
2156+
==== Transactions
2157+
2158+
This section describes how Spring for Apache Kafka supports transactions.
2159+
2160+
===== Overview
2161+
2162+
The 0.11.0.0 client library added support for transactions.
2163+
Spring for Apache Kafka adds support in the following ways:
2164+
2165+
* `KafkaTransactionManager`: Used with normal Spring transaction support (`@Transactional`, `TransactionTemplate` etc).
2166+
* Transactional `KafkaMessageListenerContainer`
2167+
* Local transactions with `KafkaTemplate`
2168+
2169+
Transactions are enabled by providing the `DefaultKafkaProducerFactory` with a `transactionIdPrefix`.
2170+
In that case, instead of managing a single shared `Producer`, the factory maintains a cache of transactional producers.
2171+
When the user calls `close()` on a producer, it is returned to the cache for reuse instead of actually being closed.
2172+
The `transactional.id` property of each producer is `transactionIdPrefix` + `n`, where `n` starts with `0` and is incremented for each new producer, unless the transaction is started by a listener container with a record-based listener.
2173+
In that case, the `transactional.id` is `<transactionIdPrefix>.<group.id>.<topic>.<partition>`.
2174+
This is to properly support fencing zombies, https://www.confluent.io/blog/transactions-apache-kafka/[as described here].
2175+
This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and 2.2.0.
2176+
If you wish to revert to the previous behavior, you can set the `producerPerConsumerPartition` property on the `DefaultKafkaProducerFactory` to `false`.
2177+
2178+
NOTE: While transactions are supported with batch listeners, zombie fencing cannot be supported because a batch may contain records from multiple topics or partitions.
2179+
2180+
Also see <<transaction-id-prefix>>.
2181+
2182+
===== Using `KafkaTransactionManager`
2183+
2184+
The `KafkaTransactionManager` is an implementation of Spring Framework's `PlatformTransactionManager`.
2185+
It is provided with a reference to the producer factory in its constructor.
2186+
If you provide a custom producer factory, it must support transactions.
2187+
See `ProducerFactory.transactionCapable()`.
2188+
2189+
You can use the `KafkaTransactionManager` with normal Spring transaction support (`@Transactional`, `TransactionTemplate`, and others).
2190+
If a transaction is active, any `KafkaTemplate` operations performed within the scope of the transaction use the transaction's `Producer`.
2191+
The manager commits or rolls back the transaction, depending on success or failure.
2192+
You must configure the `KafkaTemplate` to use the same `ProducerFactory` as the transaction manager.
2193+
2194+
===== Transactional Listener Container and Exactly Once Processing
2195+
2196+
You can provide a listener container with a `KafkaAwareTransactionManager` instance.
2197+
When so configured, the container starts a transaction before invoking the listener.
2198+
Any `KafkaTemplate` operations performed by the listener participate in the transaction.
2199+
If the listener successfully processes the record (or multiple records, when using a `BatchMessageListener`), the container sends the offsets to the transaction by using `producer.sendOffsetsToTransaction()`), before the transaction manager commits the transaction.
2200+
If the listener throws an exception, the transaction is rolled back and the consumer is repositioned so that the rolled-back record(s) can be retrieved on the next poll.
2201+
See <<after-rollback>> for more information and for handling records that repeatedly fail.
2202+
2203+
===== Transaction Synchronization
2204+
2205+
If you need to synchronize a Kafka transaction with some other transaction, configure the listener container with the appropriate transaction manager (one that supports synchronization, such as the `DataSourceTransactionManager`).
2206+
Any operations performed on a transactional `KafkaTemplate` from the listener participate in a single transaction.
2207+
The Kafka transaction is committed (or rolled back) immediately after the controlling transaction.
2208+
Before exiting the listener, you should invoke one of the template's `sendOffsetsToTransaction` methods (unless you use a <<chained-transaction-manager,`ChainedKafkaTransactionManager`>>).
2209+
For convenience, the listener container binds its consumer group ID to the thread, so, generally, you can use the first method.
2210+
The following listing shows the two method signatures:
2211+
2212+
====
2213+
[source, java]
2214+
----
2215+
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets);
2216+
2217+
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId);
2218+
----
2219+
====
2220+
2221+
The following example shows how to use the first signature of the `sendOffsetsToTransaction` method:
2222+
2223+
====
2224+
[source, java]
2225+
----
2226+
@Bean
2227+
KafkaMessageListenerContainer container(ConsumerFactory<String, String> cf,
2228+
final KafkaTemplate template) {
2229+
ContainerProperties props = new ContainerProperties("foo");
2230+
props.setGroupId("group");
2231+
props.setTransactionManager(new SomeOtherTransactionManager());
2232+
...
2233+
props.setMessageListener((MessageListener<String, String>) m -> {
2234+
template.send("foo", "bar");
2235+
template.send("baz", "qux");
2236+
template.sendOffsetsToTransaction(
2237+
Collections.singletonMap(new TopicPartition(m.topic(), m.partition()),
2238+
new OffsetAndMetadata(m.offset() + 1)));
2239+
});
2240+
return new KafkaMessageListenerContainer<>(cf, props);
2241+
}
2242+
----
2243+
====
2244+
2245+
NOTE: The offset to be committed is one greater than the offset of the records processed by the listener.
2246+
2247+
IMPORTANT: You should call this only when you use transaction synchronization.
2248+
When a listener container is configured to use a `KafkaTransactionManager` or `ChainedKafkaTransactionManager`, it takes care of sending the offsets to the transaction.
2249+
2250+
See <<ex-jdbc-sync>> for an example application that synchronizes JDBC and Kafka transactions.
2251+
2252+
[[chained-transaction-manager]]
2253+
===== Using `ChainedKafkaTransactionManager`
2254+
2255+
The `ChainedKafkaTransactionManager` was introduced in version 2.1.3.
2256+
This is a subclass of `ChainedTransactionManager` that can have exactly one `KafkaTransactionManager`.
2257+
Since it is a `KafkaAwareTransactionManager`, the container can send the offsets to the transaction in the same way as when the container is configured with a simple `KafkaTransactionManager`.
2258+
This provides another mechanism for synchronizing transactions without having to send the offsets to the transaction in the listener code.
2259+
You should chain your transaction managers in the desired order and provide the `ChainedTransactionManager` in the `ContainerProperties`.
2260+
2261+
See <<ex-jdbc-sync>> for an example application that synchronizes JDBC and Kafka transactions.
2262+
2263+
===== `KafkaTemplate` Local Transactions
2264+
2265+
You can use the `KafkaTemplate` to execute a series of operations within a local transaction.
2266+
The following example shows how to do so:
2267+
2268+
====
2269+
[source, java]
2270+
----
2271+
boolean result = template.executeInTransaction(t -> {
2272+
t.sendDefault("thing1", "thing2");
2273+
t.sendDefault("cat", "hat");
2274+
return true;
2275+
});
2276+
----
2277+
====
2278+
2279+
The argument in the callback is the template itself (`this`).
2280+
If the callback exits normally, the transaction is committed.
2281+
If an exception is thrown, the transaction is rolled back.
2282+
2283+
NOTE: If there is a `KafkaTransactionManager` (or synchronized) transaction in process, it is not used.
2284+
Instead, a new "nested" transaction is used.
2285+
2286+
[[transaction-id-prefix]]
2287+
===== `transactionIdPrefix`
2288+
2289+
As mentioned in <<transactions, the overview>>, the producer factory is configured with this property to build the producer `transactional.id` property.
2290+
There is rather a dichotomy when specifying this property in that, when running multiple instances of the application, it must be the same on all instances to satisfy fencing zombies (also mentioned in the overview) when producing records on a listener container thread.
2291+
However, when producing records using transactions that are **not** started by a listener container, the prefix has to be different on each instance.
2292+
Version 2.3, makes this simpler to configure, especially in a Spring Boot application.
2293+
In previous versions, you had to create two producer factories and `KafkaTemplate` s - one for producing records on a listener container thread and one for stand-alone transactions started by `kafkaTemplate.executeInTransaction()` or by a transaction interceptor on a `@Transactional` method.
2294+
2295+
Now, you can override the factory's `transactionalIdPrefix` on the `KafkaTemplate` and the `KafkaTransactionManager`.
2296+
2297+
When using a transaction manager and template for a listener container, you would normally leave this to default to the producer factory's property.
2298+
This value should be the same for all application instances.
2299+
For transactions started by the template (or the transaction manager for `@Transaction`) you should set the property on the template and transaction manager respectively.
2300+
This property must have a different value on each application instance.
2301+
23012302
[[interceptors]]
23022303
==== Wiring Spring Beans into Producer/Consumer Interceptors
23032304

0 commit comments

Comments
 (0)