Skip to content

Commit 6051612

Browse files
committed
START: DRAFT: Custom retry delay provider
1 parent 411c925 commit 6051612

File tree

3 files changed

+25
-5
lines changed

3 files changed

+25
-5
lines changed

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import java.time.Duration;
1414
import java.util.Objects;
15+
import java.util.function.Function;
1516

1617
import static io.confluent.csid.utils.StringUtils.msg;
1718
import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER;
@@ -131,6 +132,16 @@ public enum CommitMode {
131132
@Builder.Default
132133
private final Duration defaultMessageRetryDelay = Duration.ofSeconds(1);
133134

135+
/**
136+
* When present, use this to generate the retry delay, instad of {@link #getDefaultMessageRetryDelay()}.
137+
* <p>
138+
* Overrides {@link #defaultMessageRetryDelay}, even if it's set.
139+
*/
140+
@Builder.Default
141+
private final Function<WorkContainer, Duration> retryDelayProvider;
142+
143+
static Function<WorkContainer, Duration> retryDelayProviderStatic;
144+
134145
public void validate() {
135146
Objects.requireNonNull(consumer, "A consumer must be supplied");
136147

@@ -141,6 +152,7 @@ public void validate() {
141152

142153
//
143154
WorkContainer.setDefaultRetryDelay(getDefaultMessageRetryDelay());
155+
ParallelConsumerOptions.retryDelayProviderStatic = getRetryDelayProvider();
144156
}
145157

146158
protected boolean isUsingTransactionalProducer() {

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import static io.confluent.csid.utils.StringUtils.msg;
3333
import static io.confluent.parallelconsumer.UserFunctions.carefullyRun;
3434
import static java.time.Duration.ofMillis;
35-
import static java.time.Duration.ofSeconds;
3635
import static java.util.concurrent.TimeUnit.MILLISECONDS;
3736
import static java.util.concurrent.TimeUnit.SECONDS;
3837

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/WorkContainer.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.Optional;
1919
import java.util.concurrent.Future;
2020
import java.util.concurrent.TimeUnit;
21+
import java.util.function.Function;
2122

2223
import static io.confluent.csid.utils.KafkaUtils.toTP;
2324

@@ -59,6 +60,9 @@ public class WorkContainer<K, V> implements Comparable<WorkContainer> {
5960
*/
6061
private Duration retryDelay;
6162

63+
/**
64+
* @see ParallelConsumerOptions#getDefaultMessageRetryDelay()
65+
*/
6266
@Setter
6367
static Duration defaultRetryDelay = Duration.ofSeconds(1);
6468

@@ -121,10 +125,15 @@ private Temporal tryAgainAt(WallClock clock) {
121125
}
122126

123127
public Duration getRetryDelay() {
124-
if (retryDelay == null)
125-
return defaultRetryDelay;
126-
else
127-
return retryDelay;
128+
var retryDelayProvider = ParallelConsumerOptions.retryDelayProviderStatic;
129+
if (retryDelayProvider != null) {
130+
return retryDelayProvider.apply(this);
131+
} else {
132+
if (retryDelay == null)
133+
return defaultRetryDelay;
134+
else
135+
return retryDelay;
136+
}
128137
}
129138

130139
@Override

0 commit comments

Comments
 (0)