Skip to content

Commit 8ec6e81

Browse files
committed
1.x: compensation for significant clock drifts in schedulePeriodically
The solution checks the wall clock difference between the last run and the current run and if it went back or forward significantly, it rebases the timer period and schedules the next execution relative to now. If the clock goes back, the original code scheduled the next invocation way into the future. This PR will schedule it after the period. If the clock goes forward, the original code scheduled executions for all the missed time between the last run and the new time immediately, yielding a bunch of 0 delays. This PR will simply schedule the next invocation after the period. The algorithm for both cases is the same: make sure the next invocation is scheduled relative to now and recalculate the start timestamp as if the whole sequence run under the new drifted clock all along. The subsequent invocations will be scheduled at a fixed rate again.
1 parent 92fe02d commit 8ec6e81

File tree

2 files changed

+190
-4
lines changed

2 files changed

+190
-4
lines changed

src/main/java/rx/Scheduler.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,17 @@ public abstract class Scheduler {
4343
* maintenance.
4444
*/
4545

46+
/**
47+
* The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
48+
* <p>
49+
* The associated system parameter, {@code rx.scheduler.drift-tolerance}, expects its value in minutes.
50+
*/
51+
static final long CLOCK_DRIFT_TOLERANCE_NANOS;
52+
static {
53+
CLOCK_DRIFT_TOLERANCE_NANOS = TimeUnit.MINUTES.toNanos(
54+
Long.getLong("rx.scheduler.drift-tolerance", 15));
55+
}
56+
4657
/**
4758
* Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
4859
* <p>
@@ -109,17 +120,39 @@ public abstract static class Worker implements Subscription {
109120
*/
110121
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
111122
final long periodInNanos = unit.toNanos(period);
112-
final long startInNanos = TimeUnit.MILLISECONDS.toNanos(now()) + unit.toNanos(initialDelay);
123+
final long firstNowNanos = TimeUnit.MILLISECONDS.toNanos(now());
124+
final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay);
113125

114126
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
115127
final Action0 recursiveAction = new Action0() {
116-
long count = 0;
128+
long count;
129+
long lastNowNanos = firstNowNanos;
130+
long startInNanos = firstStartInNanos;
117131
@Override
118132
public void call() {
119133
if (!mas.isUnsubscribed()) {
120134
action.call();
121-
long nextTick = startInNanos + (++count * periodInNanos);
122-
mas.set(schedule(this, nextTick - TimeUnit.MILLISECONDS.toNanos(now()), TimeUnit.NANOSECONDS));
135+
136+
long nextTick;
137+
138+
long nowNanos = TimeUnit.MILLISECONDS.toNanos(now());
139+
// If the clock moved in a direction quite a bit, rebase the repetition period
140+
if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos
141+
|| nowNanos >= lastNowNanos + periodInNanos + CLOCK_DRIFT_TOLERANCE_NANOS) {
142+
nextTick = nowNanos + periodInNanos;
143+
/*
144+
* Shift the start point back by the drift as if the whole thing
145+
* started with the in the current reference time.
146+
*/
147+
startInNanos -= (lastNowNanos - nowNanos) + periodInNanos;
148+
++count;
149+
} else {
150+
nextTick = startInNanos + (++count * periodInNanos);
151+
}
152+
lastNowNanos = nowNanos;
153+
154+
long delay = nextTick - nowNanos;
155+
mas.set(schedule(this, delay, TimeUnit.NANOSECONDS));
123156
}
124157
}
125158
};
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx;
17+
18+
import static org.junit.Assert.assertTrue;
19+
20+
import java.util.*;
21+
import java.util.concurrent.TimeUnit;
22+
23+
import org.junit.Test;
24+
25+
import rx.functions.Action0;
26+
import rx.schedulers.Schedulers;
27+
28+
public class SchedulerWorkerTest {
29+
30+
static final class CustomDriftScheduler extends Scheduler {
31+
public volatile long drift;
32+
@Override
33+
public Worker createWorker() {
34+
final Worker w = Schedulers.computation().createWorker();
35+
return new Worker() {
36+
37+
@Override
38+
public void unsubscribe() {
39+
w.unsubscribe();
40+
}
41+
42+
@Override
43+
public boolean isUnsubscribed() {
44+
return w.isUnsubscribed();
45+
}
46+
47+
@Override
48+
public Subscription schedule(Action0 action) {
49+
return w.schedule(action);
50+
}
51+
52+
@Override
53+
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
54+
return w.schedule(action, delayTime, unit);
55+
}
56+
57+
@Override
58+
public long now() {
59+
return super.now() + drift;
60+
}
61+
};
62+
}
63+
64+
@Override
65+
public long now() {
66+
return super.now() + drift;
67+
}
68+
}
69+
70+
@Test
71+
public void testCurrentTimeDriftBackwards() throws Exception {
72+
CustomDriftScheduler s = new CustomDriftScheduler();
73+
74+
Scheduler.Worker w = s.createWorker();
75+
76+
try {
77+
final List<Long> times = new ArrayList<Long>();
78+
79+
Subscription d = w.schedulePeriodically(new Action0() {
80+
@Override
81+
public void call() {
82+
times.add(System.currentTimeMillis());
83+
}
84+
}, 100, 100, TimeUnit.MILLISECONDS);
85+
86+
Thread.sleep(150);
87+
88+
s.drift = -1000 - TimeUnit.NANOSECONDS.toMillis(Scheduler.CLOCK_DRIFT_TOLERANCE_NANOS);
89+
90+
Thread.sleep(400);
91+
92+
d.unsubscribe();
93+
94+
Thread.sleep(150);
95+
96+
System.out.println("Runs: " + times.size());
97+
98+
for (int i = 0; i < times.size() - 1 ; i++) {
99+
long diff = times.get(i + 1) - times.get(i);
100+
System.out.println("Diff #" + i + ": " + diff);
101+
assertTrue("" + i + ":" + diff, diff < 150 && diff > 50);
102+
}
103+
104+
assertTrue("Too few invocations: " + times.size(), times.size() > 2);
105+
106+
} finally {
107+
w.unsubscribe();
108+
}
109+
110+
}
111+
112+
@Test
113+
public void testCurrentTimeDriftForwards() throws Exception {
114+
CustomDriftScheduler s = new CustomDriftScheduler();
115+
116+
Scheduler.Worker w = s.createWorker();
117+
118+
try {
119+
final List<Long> times = new ArrayList<Long>();
120+
121+
Subscription d = w.schedulePeriodically(new Action0() {
122+
@Override
123+
public void call() {
124+
times.add(System.currentTimeMillis());
125+
}
126+
}, 100, 100, TimeUnit.MILLISECONDS);
127+
128+
Thread.sleep(150);
129+
130+
s.drift = 1000 + TimeUnit.NANOSECONDS.toMillis(Scheduler.CLOCK_DRIFT_TOLERANCE_NANOS);
131+
132+
Thread.sleep(400);
133+
134+
d.unsubscribe();
135+
136+
Thread.sleep(150);
137+
138+
System.out.println("Runs: " + times.size());
139+
140+
assertTrue(times.size() > 2);
141+
142+
for (int i = 0; i < times.size() - 1 ; i++) {
143+
long diff = times.get(i + 1) - times.get(i);
144+
System.out.println("Diff #" + i + ": " + diff);
145+
assertTrue("Diff out of range: " + diff, diff < 250 && diff > 50);
146+
}
147+
148+
} finally {
149+
w.unsubscribe();
150+
}
151+
152+
}
153+
}

0 commit comments

Comments
 (0)