Skip to content

Commit 1404c44

Browse files
authored
Fix cluster creation with ConfluentKafkaContainer and KafkaContainer (#9910)
`KAFKA_CONTROLLER_QUORUM_VOTERS` must be overridden. Fixes #9873
1 parent 4eeec15 commit 1404c44

File tree

7 files changed

+403
-9
lines changed

7 files changed

+403
-9
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package com.example.kafkacluster;
2+
3+
import org.apache.kafka.common.Uuid;
4+
import org.awaitility.Awaitility;
5+
import org.testcontainers.containers.Container;
6+
import org.testcontainers.containers.GenericContainer;
7+
import org.testcontainers.containers.Network;
8+
import org.testcontainers.kafka.KafkaContainer;
9+
import org.testcontainers.lifecycle.Startable;
10+
import org.testcontainers.utility.DockerImageName;
11+
12+
import java.time.Duration;
13+
import java.util.Collection;
14+
import java.util.stream.Collectors;
15+
import java.util.stream.IntStream;
16+
17+
import static org.assertj.core.api.Assertions.assertThat;
18+
19+
public class ApacheKafkaContainerCluster implements Startable {
20+
21+
private final int brokersNum;
22+
23+
private final Network network;
24+
25+
private final Collection<KafkaContainer> brokers;
26+
27+
public ApacheKafkaContainerCluster(String version, int brokersNum, int internalTopicsRf) {
28+
if (brokersNum < 0) {
29+
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
30+
}
31+
if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
32+
throw new IllegalArgumentException(
33+
"internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0"
34+
);
35+
}
36+
37+
this.brokersNum = brokersNum;
38+
this.network = Network.newNetwork();
39+
40+
String controllerQuorumVoters = IntStream
41+
.range(0, brokersNum)
42+
.mapToObj(brokerNum -> String.format("%d@broker-%d:9094", brokerNum, brokerNum))
43+
.collect(Collectors.joining(","));
44+
45+
String clusterId = Uuid.randomUuid().toString();
46+
47+
this.brokers =
48+
IntStream
49+
.range(0, brokersNum)
50+
.mapToObj(brokerNum -> {
51+
return new KafkaContainer(DockerImageName.parse("apache/kafka").withTag(version))
52+
.withNetwork(this.network)
53+
.withNetworkAliases("broker-" + brokerNum)
54+
.withEnv("CLUSTER_ID", clusterId)
55+
.withEnv("KAFKA_BROKER_ID", brokerNum + "")
56+
.withEnv("KAFKA_NODE_ID", brokerNum + "")
57+
.withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", controllerQuorumVoters)
58+
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "")
59+
.withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
60+
.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "")
61+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "")
62+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + "")
63+
.withStartupTimeout(Duration.ofMinutes(1));
64+
})
65+
.collect(Collectors.toList());
66+
}
67+
68+
public Collection<KafkaContainer> getBrokers() {
69+
return this.brokers;
70+
}
71+
72+
public String getBootstrapServers() {
73+
return brokers.stream().map(KafkaContainer::getBootstrapServers).collect(Collectors.joining(","));
74+
}
75+
76+
@Override
77+
public void start() {
78+
// Needs to start all the brokers at once
79+
brokers.parallelStream().forEach(GenericContainer::start);
80+
81+
Awaitility
82+
.await()
83+
.atMost(Duration.ofSeconds(30))
84+
.untilAsserted(() -> {
85+
Container.ExecResult result =
86+
this.brokers.stream()
87+
.findFirst()
88+
.get()
89+
.execInContainer(
90+
"sh",
91+
"-c",
92+
"/opt/kafka/bin/kafka-log-dirs.sh --bootstrap-server localhost:9093 --describe | grep -o '\"broker\"' | wc -l"
93+
);
94+
String brokers = result.getStdout().replace("\n", "");
95+
96+
assertThat(brokers).asInt().isEqualTo(this.brokersNum);
97+
});
98+
}
99+
100+
@Override
101+
public void stop() {
102+
this.brokers.stream().parallel().forEach(GenericContainer::stop);
103+
}
104+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package com.example.kafkacluster;
2+
3+
import com.google.common.collect.ImmutableMap;
4+
import org.apache.kafka.clients.admin.AdminClient;
5+
import org.apache.kafka.clients.admin.AdminClientConfig;
6+
import org.apache.kafka.clients.admin.NewTopic;
7+
import org.apache.kafka.clients.consumer.ConsumerConfig;
8+
import org.apache.kafka.clients.consumer.ConsumerRecord;
9+
import org.apache.kafka.clients.consumer.ConsumerRecords;
10+
import org.apache.kafka.clients.consumer.KafkaConsumer;
11+
import org.apache.kafka.clients.producer.KafkaProducer;
12+
import org.apache.kafka.clients.producer.ProducerConfig;
13+
import org.apache.kafka.clients.producer.ProducerRecord;
14+
import org.apache.kafka.common.serialization.StringDeserializer;
15+
import org.apache.kafka.common.serialization.StringSerializer;
16+
import org.awaitility.Awaitility;
17+
import org.junit.jupiter.api.Test;
18+
19+
import java.time.Duration;
20+
import java.util.Collection;
21+
import java.util.Collections;
22+
import java.util.UUID;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import static org.assertj.core.api.Assertions.assertThat;
26+
import static org.assertj.core.api.Assertions.tuple;
27+
28+
class ApacheKafkaContainerClusterTest {
29+
30+
@Test
31+
void testKafkaContainerCluster() throws Exception {
32+
try (ApacheKafkaContainerCluster cluster = new ApacheKafkaContainerCluster("3.8.0", 3, 2)) {
33+
cluster.start();
34+
String bootstrapServers = cluster.getBootstrapServers();
35+
36+
assertThat(cluster.getBrokers()).hasSize(3);
37+
38+
testKafkaFunctionality(bootstrapServers, 3, 2);
39+
}
40+
}
41+
42+
protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
43+
try (
44+
AdminClient adminClient = AdminClient.create(
45+
ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
46+
);
47+
KafkaProducer<String, String> producer = new KafkaProducer<>(
48+
ImmutableMap.of(
49+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
50+
bootstrapServers,
51+
ProducerConfig.CLIENT_ID_CONFIG,
52+
UUID.randomUUID().toString()
53+
),
54+
new StringSerializer(),
55+
new StringSerializer()
56+
);
57+
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
58+
ImmutableMap.of(
59+
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
60+
bootstrapServers,
61+
ConsumerConfig.GROUP_ID_CONFIG,
62+
"tc-" + UUID.randomUUID(),
63+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
64+
"earliest"
65+
),
66+
new StringDeserializer(),
67+
new StringDeserializer()
68+
);
69+
) {
70+
String topicName = "messages";
71+
72+
Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
73+
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);
74+
75+
consumer.subscribe(Collections.singletonList(topicName));
76+
77+
producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();
78+
79+
Awaitility
80+
.await()
81+
.atMost(Duration.ofSeconds(10))
82+
.untilAsserted(() -> {
83+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
84+
85+
assertThat(records)
86+
.hasSize(1)
87+
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
88+
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));
89+
});
90+
91+
consumer.unsubscribe();
92+
}
93+
}
94+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package com.example.kafkacluster;
2+
3+
import org.apache.kafka.common.Uuid;
4+
import org.awaitility.Awaitility;
5+
import org.testcontainers.containers.Container;
6+
import org.testcontainers.containers.GenericContainer;
7+
import org.testcontainers.containers.Network;
8+
import org.testcontainers.kafka.ConfluentKafkaContainer;
9+
import org.testcontainers.lifecycle.Startable;
10+
import org.testcontainers.utility.DockerImageName;
11+
12+
import java.time.Duration;
13+
import java.util.Collection;
14+
import java.util.stream.Collectors;
15+
import java.util.stream.IntStream;
16+
17+
import static org.assertj.core.api.Assertions.assertThat;
18+
19+
public class ConfluentKafkaContainerCluster implements Startable {
20+
21+
private final int brokersNum;
22+
23+
private final Network network;
24+
25+
private final Collection<ConfluentKafkaContainer> brokers;
26+
27+
public ConfluentKafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) {
28+
if (brokersNum < 0) {
29+
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
30+
}
31+
if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
32+
throw new IllegalArgumentException(
33+
"internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0"
34+
);
35+
}
36+
37+
this.brokersNum = brokersNum;
38+
this.network = Network.newNetwork();
39+
40+
String controllerQuorumVoters = IntStream
41+
.range(0, brokersNum)
42+
.mapToObj(brokerNum -> String.format("%d@broker-%d:9094", brokerNum, brokerNum))
43+
.collect(Collectors.joining(","));
44+
45+
String clusterId = Uuid.randomUuid().toString();
46+
47+
this.brokers =
48+
IntStream
49+
.range(0, brokersNum)
50+
.mapToObj(brokerNum -> {
51+
return new ConfluentKafkaContainer(
52+
DockerImageName.parse("confluentinc/cp-kafka").withTag(confluentPlatformVersion)
53+
)
54+
.withNetwork(this.network)
55+
.withNetworkAliases("broker-" + brokerNum)
56+
.withEnv("CLUSTER_ID", clusterId)
57+
.withEnv("KAFKA_BROKER_ID", brokerNum + "")
58+
.withEnv("KAFKA_NODE_ID", brokerNum + "")
59+
.withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", controllerQuorumVoters)
60+
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "")
61+
.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "")
62+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "")
63+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + "")
64+
.withStartupTimeout(Duration.ofMinutes(1));
65+
})
66+
.collect(Collectors.toList());
67+
}
68+
69+
public Collection<ConfluentKafkaContainer> getBrokers() {
70+
return this.brokers;
71+
}
72+
73+
public String getBootstrapServers() {
74+
return brokers.stream().map(ConfluentKafkaContainer::getBootstrapServers).collect(Collectors.joining(","));
75+
}
76+
77+
@Override
78+
public void start() {
79+
// Needs to start all the brokers at once
80+
brokers.parallelStream().forEach(GenericContainer::start);
81+
82+
Awaitility
83+
.await()
84+
.atMost(Duration.ofSeconds(30))
85+
.untilAsserted(() -> {
86+
Container.ExecResult result =
87+
this.brokers.stream()
88+
.findFirst()
89+
.get()
90+
.execInContainer(
91+
"sh",
92+
"-c",
93+
"kafka-metadata-shell --snapshot /var/lib/kafka/data/__cluster_metadata-0/00000000000000000000.log ls /brokers | wc -l"
94+
);
95+
String brokers = result.getStdout().replace("\n", "");
96+
97+
assertThat(brokers).asInt().isEqualTo(this.brokersNum);
98+
});
99+
}
100+
101+
@Override
102+
public void stop() {
103+
this.brokers.stream().parallel().forEach(GenericContainer::stop);
104+
}
105+
}

0 commit comments

Comments
 (0)