Skip to content

Commit 3a31b3c

Browse files
garyrussellartembilan
authored andcommitted
Switch main branch to 2.8
1 parent c34eb76 commit 3a31b3c

File tree

6 files changed

+38
-39
lines changed

6 files changed

+38
-39
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ ext {
5959
jaywayJsonPathVersion = '2.4.0'
6060
junit4Version = '4.13.2'
6161
junitJupiterVersion = '5.7.2'
62-
kafkaVersion = '2.7.1'
62+
kafkaVersion = '2.8.0'
6363
log4jVersion = '2.14.1'
6464
micrometerVersion = '1.6.7'
6565
mockitoVersion = '3.9.0'

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version=2.7.4-SNAPSHOT
1+
version=2.8.0-SNAPSHOT
22
org.gradle.caching=true
33
org.gradle.parallel=true
44

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import java.io.File;
2020
import java.io.IOException;
2121
import java.io.UncheckedIOException;
22-
import java.lang.reflect.InvocationTargetException;
23-
import java.lang.reflect.Method;
2422
import java.net.InetSocketAddress;
2523
import java.nio.file.Files;
2624
import java.time.Duration;
@@ -40,7 +38,6 @@
4038
import java.util.concurrent.TimeUnit;
4139
import java.util.concurrent.TimeoutException;
4240
import java.util.concurrent.atomic.AtomicBoolean;
43-
import java.util.concurrent.atomic.AtomicReference;
4441
import java.util.function.Function;
4542
import java.util.stream.Collectors;
4643

@@ -54,10 +51,10 @@
5451
import org.apache.kafka.clients.consumer.ConsumerRecords;
5552
import org.apache.kafka.common.TopicPartition;
5653
import org.apache.kafka.common.security.auth.SecurityProtocol;
57-
import org.apache.kafka.common.utils.AppInfoParser;
5854
import org.apache.kafka.common.utils.Exit;
5955
import org.apache.kafka.common.utils.Time;
6056
import org.apache.kafka.common.utils.Utils;
57+
import org.apache.kafka.metadata.BrokerState;
6158
import org.apache.zookeeper.server.NIOServerCnxnFactory;
6259
import org.apache.zookeeper.server.ZooKeeperServer;
6360

@@ -73,7 +70,6 @@
7370
import kafka.common.KafkaException;
7471
import kafka.server.KafkaConfig;
7572
import kafka.server.KafkaServer;
76-
import kafka.server.NotRunning;
7773
import kafka.utils.CoreUtils;
7874
import kafka.utils.TestUtils;
7975
import kafka.zk.ZkFourLetterWords;
@@ -119,24 +115,6 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
119115

120116
public static final int DEFAULT_ZK_SESSION_TIMEOUT = 6000;
121117

122-
private static final Method GET_BROKER_STATE_METHOD;
123-
124-
static {
125-
try {
126-
Method method = KafkaServer.class.getDeclaredMethod("brokerState");
127-
if (method.getReturnType().equals(AtomicReference.class)) {
128-
GET_BROKER_STATE_METHOD = method;
129-
}
130-
else {
131-
GET_BROKER_STATE_METHOD = null;
132-
}
133-
}
134-
catch (NoSuchMethodException | SecurityException e) {
135-
throw new IllegalStateException("Failed to determine KafkaServer.brokerState() method; client version: "
136-
+ AppInfoParser.getVersion(), e);
137-
}
138-
}
139-
140118
private final int count;
141119

142120
private final boolean controlledShutdown;
@@ -579,15 +557,7 @@ public void destroy() {
579557
}
580558

581559
private boolean brokerRunning(KafkaServer kafkaServer) {
582-
if (GET_BROKER_STATE_METHOD != null) {
583-
try {
584-
return !GET_BROKER_STATE_METHOD.invoke(kafkaServer).toString().equals("NOT_RUNNING");
585-
}
586-
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
587-
throw new IllegalStateException(ex);
588-
}
589-
}
590-
return kafkaServer.brokerState().currentState() != NotRunning.state();
560+
return !kafkaServer.brokerState().get().equals(BrokerState.NOT_RUNNING);
591561
}
592562

593563
public Set<String> getTopics() {

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
2727
import org.apache.kafka.streams.KafkaStreams;
2828
import org.apache.kafka.streams.StreamsBuilder;
2929
import org.apache.kafka.streams.Topology;
30+
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
3031
import org.apache.kafka.streams.processor.StateRestoreListener;
3132
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
3233

@@ -90,6 +91,8 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
9091

9192
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
9293

94+
private StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler;
95+
9396
private boolean autoStartup = true;
9497

9598
private int phase = Integer.MAX_VALUE - 1000; // NOSONAR magic #
@@ -188,10 +191,27 @@ public void setStateListener(KafkaStreams.StateListener stateListener) {
188191
this.stateListener = stateListener; // NOSONAR (sync)
189192
}
190193

194+
/**
195+
* Obsolete.
196+
* @deprecated in favor of
197+
* {@link #setStreamsUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)}.
198+
* @param exceptionHandler the handler.
199+
*/
200+
@Deprecated
191201
public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler exceptionHandler) {
192202
this.uncaughtExceptionHandler = exceptionHandler; // NOSONAR (sync)
193203
}
194204

205+
/**
206+
* Set a {@link StreamsUncaughtExceptionHandler}. Supercedes
207+
* {@link #setUncaughtExceptionHandler(java.lang.Thread.UncaughtExceptionHandler)}.
208+
* @param streamsUncaughtExceptionHandler the handler.
209+
* @since 2.8
210+
*/
211+
public void setStreamsUncaughtExceptionHandler(StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
212+
this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
213+
}
214+
195215
public void setStateRestoreListener(StateRestoreListener stateRestoreListener) {
196216
this.stateRestoreListener = stateRestoreListener; // NOSONAR (sync)
197217
}
@@ -303,6 +323,7 @@ public void stop(Runnable callback) {
303323
}
304324
}
305325

326+
@SuppressWarnings("deprecation")
306327
@Override
307328
public synchronized void start() {
308329
if (!this.running) {
@@ -316,7 +337,12 @@ public synchronized void start() {
316337
this.kafkaStreams = new KafkaStreams(topology, this.properties, this.clientSupplier);
317338
this.kafkaStreams.setStateListener(this.stateListener);
318339
this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener);
319-
this.kafkaStreams.setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
340+
if (this.streamsUncaughtExceptionHandler != null) {
341+
this.kafkaStreams.setUncaughtExceptionHandler(this.streamsUncaughtExceptionHandler);
342+
}
343+
else {
344+
this.kafkaStreams.setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
345+
}
320346
if (this.kafkaStreamsCustomizer != null) {
321347
this.kafkaStreamsCustomizer.customize(this.kafkaStreams);
322348
}

spring-kafka/src/main/java/org/springframework/kafka/support/KafkaStreamBrancher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -93,6 +93,7 @@ public KStream<K, V> onTopOf(KStream<K, V> stream) {
9393
}
9494
@SuppressWarnings({ "unchecked", "rawtypes" })
9595
Predicate<? super K, ? super V>[] predicates = this.predicateList.toArray(new Predicate[0]);
96+
@SuppressWarnings("deprecation")
9697
KStream<K, V>[] result = stream.branch(predicates);
9798
for (int i = 0; i < this.consumerList.size(); i++) {
9899
this.consumerList.get(i).accept(result[i]);

spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.time.Duration;
2323
import java.util.HashMap;
24+
import java.util.List;
2425
import java.util.Map;
2526
import java.util.UUID;
2627
import java.util.concurrent.CountDownLatch;
@@ -156,9 +157,10 @@ public void testKStreams() throws Exception {
156157

157158
KafkaStreams kafkaStreams = this.streamsBuilderFactoryBean.getKafkaStreams();
158159

159-
StreamThread[] threads = KafkaTestUtils.getPropertyValue(kafkaStreams, "threads", StreamThread[].class);
160+
@SuppressWarnings("unchecked")
161+
List<StreamThread> threads = KafkaTestUtils.getPropertyValue(kafkaStreams, "threads", List.class);
160162
assertThat(threads).isNotEmpty();
161-
assertThat(threads[0].getUncaughtExceptionHandler()).isSameAs(exceptionHandler);
163+
assertThat(threads.get(0).getUncaughtExceptionHandler()).isSameAs(exceptionHandler);
162164
assertThat(this.stateChangeCalled.get()).isTrue();
163165
}
164166

0 commit comments

Comments
 (0)