Skip to content

Commit ea5da9c

Browse files
committed
Add support for using retries with Kafka
This commit makes it possible to use the Kafka integration of the Flowable event registry by using retries. The retries mechanism is a mechanism that is build on top of the Spring Kafka Retry mechanism. Retries are done by sending the records that failed to a different topic depending on a picked strategy. Once the number of retries reaches the configured max retry count the message will be send to a dead letter topic.
1 parent 226c543 commit ea5da9c

File tree

17 files changed

+2004
-20
lines changed

17 files changed

+2004
-20
lines changed

modules/flowable-event-registry-model/src/main/java/org/flowable/eventregistry/model/KafkaInboundChannelModel.java

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class KafkaInboundChannelModel extends InboundChannelModel {
3030
protected String topicPattern;
3131
protected String clientIdPrefix;
3232
protected String concurrency;
33+
protected RetryConfiguration retry;
3334
protected List<CustomProperty> customProperties;
3435

3536
public KafkaInboundChannelModel() {
@@ -77,6 +78,14 @@ public void setConcurrency(String concurrency) {
7778
this.concurrency = concurrency;
7879
}
7980

81+
public RetryConfiguration getRetry() {
82+
return retry;
83+
}
84+
85+
public void setRetry(RetryConfiguration retry) {
86+
this.retry = retry;
87+
}
88+
8089
public List<CustomProperty> getCustomProperties() {
8190
return customProperties;
8291
}
@@ -120,4 +129,130 @@ public void setValue(String value) {
120129
}
121130
}
122131

132+
public static class RetryConfiguration {
133+
134+
protected String attempts;
135+
protected String dltTopicSuffix;
136+
protected String retryTopicSuffix;
137+
protected String fixedDelayTopicStrategy;
138+
protected String topicSuffixingStrategy;
139+
protected NonBlockingRetryBackOff nonBlockingBackOff;
140+
141+
protected String autoCreateTopics;
142+
protected String numPartitions;
143+
protected String replicationFactor;
144+
145+
public String getAttempts() {
146+
return attempts;
147+
}
148+
149+
public void setAttempts(String attempts) {
150+
this.attempts = attempts;
151+
}
152+
153+
public String getDltTopicSuffix() {
154+
return dltTopicSuffix;
155+
}
156+
157+
public void setDltTopicSuffix(String dltTopicSuffix) {
158+
this.dltTopicSuffix = dltTopicSuffix;
159+
}
160+
161+
public String getRetryTopicSuffix() {
162+
return retryTopicSuffix;
163+
}
164+
165+
public void setRetryTopicSuffix(String retryTopicSuffix) {
166+
this.retryTopicSuffix = retryTopicSuffix;
167+
}
168+
169+
public String getFixedDelayTopicStrategy() {
170+
return fixedDelayTopicStrategy;
171+
}
172+
173+
public void setFixedDelayTopicStrategy(String fixedDelayTopicStrategy) {
174+
this.fixedDelayTopicStrategy = fixedDelayTopicStrategy;
175+
}
176+
177+
public String getTopicSuffixingStrategy() {
178+
return topicSuffixingStrategy;
179+
}
180+
181+
public void setTopicSuffixingStrategy(String topicSuffixingStrategy) {
182+
this.topicSuffixingStrategy = topicSuffixingStrategy;
183+
}
184+
185+
public NonBlockingRetryBackOff getNonBlockingBackOff() {
186+
return nonBlockingBackOff;
187+
}
188+
189+
public void setNonBlockingBackOff(NonBlockingRetryBackOff nonBlockingBackOff) {
190+
this.nonBlockingBackOff = nonBlockingBackOff;
191+
}
192+
193+
public String getAutoCreateTopics() {
194+
return autoCreateTopics;
195+
}
196+
197+
public void setAutoCreateTopics(String autoCreateTopics) {
198+
this.autoCreateTopics = autoCreateTopics;
199+
}
200+
201+
public String getNumPartitions() {
202+
return numPartitions;
203+
}
204+
205+
public void setNumPartitions(String numPartitions) {
206+
this.numPartitions = numPartitions;
207+
}
208+
209+
public String getReplicationFactor() {
210+
return replicationFactor;
211+
}
212+
213+
public void setReplicationFactor(String replicationFactor) {
214+
this.replicationFactor = replicationFactor;
215+
}
216+
}
217+
218+
public static class NonBlockingRetryBackOff {
219+
220+
protected String delay;
221+
protected String maxDelay;
222+
protected String multiplier;
223+
protected String random;
224+
225+
public String getDelay() {
226+
return delay;
227+
}
228+
229+
public void setDelay(String delay) {
230+
this.delay = delay;
231+
}
232+
233+
public String getMaxDelay() {
234+
return maxDelay;
235+
}
236+
237+
public void setMaxDelay(String maxDelay) {
238+
this.maxDelay = maxDelay;
239+
}
240+
241+
public String getMultiplier() {
242+
return multiplier;
243+
}
244+
245+
public void setMultiplier(String multiplier) {
246+
this.multiplier = multiplier;
247+
}
248+
249+
public String getRandom() {
250+
return random;
251+
}
252+
253+
public void setRandom(String random) {
254+
this.random = random;
255+
}
256+
}
257+
123258
}

0 commit comments

Comments
 (0)