Skip to content

Commit 5944e17

Browse files
garyrussellartembilan
authored andcommitted
Replying template - add getter for reply topic(s)
- supports the SIK outbound gateway to determine if there's a default reply topic. - supports the SIK validating a reply topic header if present in the outbound message. * Use `assignedPartitions` instead * Polishing
1 parent 94d4fce commit 5944e17

File tree

7 files changed

+76
-26
lines changed

7 files changed

+76
-26
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ public int getPhase() {
196196
return this.phase;
197197
}
198198

199+
@Override
199200
public ContainerProperties getContainerProperties() {
200201
return this.containerProperties;
201202
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2017 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.util.ArrayList;
2020
import java.util.Arrays;
21+
import java.util.Collection;
2122
import java.util.Collections;
2223
import java.util.HashMap;
2324
import java.util.List;
@@ -26,6 +27,7 @@
2627

2728
import org.apache.kafka.common.Metric;
2829
import org.apache.kafka.common.MetricName;
30+
import org.apache.kafka.common.TopicPartition;
2931

3032
import org.springframework.kafka.core.ConsumerFactory;
3133
import org.springframework.kafka.listener.config.ContainerProperties;
@@ -95,6 +97,18 @@ public List<KafkaMessageListenerContainer<K, V>> getContainers() {
9597
return Collections.unmodifiableList(this.containers);
9698
}
9799

100+
@Override
101+
public Collection<TopicPartition> getAssignedPartitions() {
102+
List<TopicPartition> assigned = new ArrayList<>();
103+
this.containers.forEach(c -> {
104+
Collection<TopicPartition> assignedPartitions = c.getAssignedPartitions();
105+
if (assignedPartitions != null) {
106+
assigned.addAll(assignedPartitions);
107+
}
108+
});
109+
return assigned;
110+
}
111+
98112
@Override
99113
public Map<String, Map<MetricName, ? extends Metric>> metrics() {
100114
Map<String, Map<MetricName, ? extends Metric>> metrics = new HashMap<>();

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ public void setClientIdSuffix(String clientIdSuffix) {
180180
* @return the {@link TopicPartition}s currently assigned to this container,
181181
* either explicitly or by Kafka; may be null if not assigned yet.
182182
*/
183+
@Override
183184
public Collection<TopicPartition> getAssignedPartitions() {
184185
ListenerConsumer listenerConsumer = this.listenerConsumer;
185186
if (listenerConsumer != null) {

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,12 +16,15 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Collection;
1920
import java.util.Map;
2021

2122
import org.apache.kafka.common.Metric;
2223
import org.apache.kafka.common.MetricName;
24+
import org.apache.kafka.common.TopicPartition;
2325

2426
import org.springframework.context.SmartLifecycle;
27+
import org.springframework.kafka.listener.config.ContainerProperties;
2528

2629
/**
2730
* Internal abstraction used by the framework representing a message
@@ -48,4 +51,22 @@ public interface MessageListenerContainer extends SmartLifecycle {
4851
*/
4952
Map<String, Map<MetricName, ? extends Metric>> metrics();
5053

54+
/**
55+
* Return the container properties for this container.
56+
* @return the properties.
57+
* @since 2.1.3
58+
*/
59+
default ContainerProperties getContainerProperties() {
60+
throw new UnsupportedOperationException("This container doesn't support retrieving its properties");
61+
}
62+
63+
/**
64+
* Return the assigned topics/partitions for this container.
65+
* @return the topics/partitions.
66+
* @since 2.1.3
67+
*/
68+
default Collection<TopicPartition> getAssignedPartitions() {
69+
throw new UnsupportedOperationException("This container doesn't support retrieving its assigned partitions");
70+
}
71+
5172
}

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.nio.ByteBuffer;
2020
import java.time.Instant;
21+
import java.util.Collection;
2122
import java.util.Iterator;
2223
import java.util.List;
2324
import java.util.UUID;
@@ -26,6 +27,7 @@
2627

2728
import org.apache.kafka.clients.consumer.ConsumerRecord;
2829
import org.apache.kafka.clients.producer.ProducerRecord;
30+
import org.apache.kafka.common.TopicPartition;
2931
import org.apache.kafka.common.header.Header;
3032
import org.apache.kafka.common.header.internals.RecordHeader;
3133

@@ -100,6 +102,37 @@ public void setReplyTimeout(long replyTimeout) {
100102
this.replyTimeout = replyTimeout;
101103
}
102104

105+
@Override
106+
public boolean isRunning() {
107+
return this.running;
108+
}
109+
110+
@Override
111+
public int getPhase() {
112+
return this.phase;
113+
}
114+
115+
public void setPhase(int phase) {
116+
this.phase = phase;
117+
}
118+
119+
@Override
120+
public boolean isAutoStartup() {
121+
return this.autoStartup;
122+
}
123+
124+
public void setAutoStartup(boolean autoStartup) {
125+
this.autoStartup = autoStartup;
126+
}
127+
128+
/**
129+
* Return the topics/partitions assigned to the replying listener container.
130+
* @return the topics/partitions.
131+
*/
132+
public Collection<TopicPartition> getAssignedReplyTopicPartitions() {
133+
return this.replyContainer.getAssignedPartitions();
134+
}
135+
103136
@Override
104137
public void afterPropertiesSet() throws Exception {
105138
if (!this.schedulerSet) {
@@ -130,29 +163,6 @@ public synchronized void stop() {
130163
}
131164
}
132165

133-
@Override
134-
public boolean isRunning() {
135-
return this.running;
136-
}
137-
138-
@Override
139-
public int getPhase() {
140-
return this.phase;
141-
}
142-
143-
public void setPhase(int phase) {
144-
this.phase = phase;
145-
}
146-
147-
@Override
148-
public boolean isAutoStartup() {
149-
return this.autoStartup;
150-
}
151-
152-
public void setAutoStartup(boolean autoStartup) {
153-
this.autoStartup = autoStartup;
154-
}
155-
156166
@Override
157167
public void stop(Runnable callback) {
158168
stop();

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -119,6 +119,7 @@ public void testAutoCommit() throws Exception {
119119
container.start();
120120

121121
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
122+
assertThat(container.getAssignedPartitions().size()).isEqualTo(2);
122123

123124
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
124125
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
190190
ReplyingKafkaTemplate<Integer, String, String> template = new ReplyingKafkaTemplate<>(this.config.pf(), container);
191191
template.start();
192192
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
193+
assertThat(template.getAssignedReplyTopicPartitions().size()).isEqualTo(5);
194+
assertThat(template.getAssignedReplyTopicPartitions().iterator().next().topic()).isEqualTo(topic);
193195
return template;
194196
}
195197

0 commit comments

Comments
 (0)