Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/** The default delaying queue implementation. */
public class DefaultDelayingQueue<T> extends DefaultWorkQueue<T> implements DelayingQueue<T> {
Expand All @@ -34,8 +35,10 @@ public class DefaultDelayingQueue<T> extends DefaultWorkQueue<T> implements Dela
private DelayQueue<WaitForEntry<T>> delayQueue;
private ConcurrentMap<T, WaitForEntry<T>> waitingEntryByData;
protected BlockingQueue<WaitForEntry<T>> waitingForAddQueue;
private Supplier<Instant> timeSource;

public DefaultDelayingQueue(ExecutorService waitingWorker) {
this.timeSource = Instant::now;
this.delayQueue = new DelayQueue<>();
this.waitingEntryByData = new ConcurrentHashMap<>();
this.waitingForAddQueue = new LinkedBlockingQueue<>(1000);
Expand All @@ -57,10 +60,16 @@ public void addAfter(T item, Duration duration) {
super.add(item);
return;
}
WaitForEntry<T> entry = new WaitForEntry<>(item, duration.addTo(Instant.now()));
WaitForEntry<T> entry =
new WaitForEntry<>(item, duration.addTo(this.timeSource.get()), this.timeSource);
this.waitingForAddQueue.offer(entry);
}

// Visible for testing
protected void injectTimeSource(Supplier<Instant> fn) {
this.timeSource = fn;
}

private void waitingLoop() {
try {
while (true) {
Expand All @@ -78,7 +87,7 @@ private void waitingLoop() {
// a. if ready, remove it from the delay-queue and push it into underlying
// work-queue
// b. if not, refresh the next ready-at time.
Instant now = Instant.now();
Instant now = this.timeSource.get();
if (!Duration.between(entry.readyAtMillis, now).isNegative()) {
delayQueue.remove(entry);
super.add(entry.data);
Expand All @@ -92,7 +101,7 @@ private void waitingLoop() {
WaitForEntry<T> waitForEntry =
waitingForAddQueue.poll(nextReadyAt.toMillis(), TimeUnit.MILLISECONDS);
if (waitForEntry != null) {
if (Duration.between(waitForEntry.readyAtMillis, Instant.now()).isNegative()) {
if (Duration.between(waitForEntry.readyAtMillis, this.timeSource.get()).isNegative()) {
// the item is not yet ready, insert it to the delay-queue
insert(this.delayQueue, this.waitingEntryByData, waitForEntry);
} else {
Expand Down Expand Up @@ -126,17 +135,19 @@ private void insert(
// WaitForEntry holds the data to add and the time it should be added.
private static class WaitForEntry<T> implements Delayed {

private WaitForEntry(T data, Temporal readyAtMillis) {
private WaitForEntry(T data, Temporal readyAtMillis, Supplier<Instant> timeSource) {
this.data = data;
this.readyAtMillis = readyAtMillis;
this.timeSource = timeSource;
}

private T data;
private Temporal readyAtMillis;
private Supplier<Instant> timeSource;

@Override
public long getDelay(TimeUnit unit) {
Duration duration = Duration.between(Instant.now(), readyAtMillis);
Duration duration = Duration.between(this.timeSource.get(), readyAtMillis);
return unit.convert(duration.toMillis(), TimeUnit.MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,37 @@

import io.kubernetes.client.extended.wait.Wait;
import java.time.Duration;
import java.time.Instant;
import org.junit.Test;

public class DefaultDelayingQueueTest {

@Test
public void testSimpleDelayingQueue() throws Exception {
final Instant staticTime = Instant.now();
DefaultDelayingQueue<String> queue = new DefaultDelayingQueue<>();
// Hold time still
queue.injectTimeSource(
() -> {
return staticTime;
});
queue.addAfter("foo", Duration.ofMillis(50));

// Verify that we haven't released it
assertTrue(waitForWaitingQueueToFill(queue));
assertTrue(queue.length() == 0);
assertEquals(queue.length(), 0);

Thread.sleep(60L);
// Advance time
queue.injectTimeSource(
() -> {
return staticTime.plusMillis(100);
});
assertTrue(waitForAdded(queue, 1));
String item = queue.get();
queue.done(item);

Thread.sleep(10 * 1000L);
assertTrue(queue.length() == 0);
assertEquals(queue.length(), 0);
}

@Test
Expand Down