Skip to content

Commit 74e76e2

Browse files
committed
Fixed schedule race and task retention with ExecutorScheduler.
1 parent 51e03cc commit 74e76e2

File tree

2 files changed

+226
-75
lines changed

2 files changed

+226
-75
lines changed

src/main/java/rx/schedulers/ExecutorScheduler.java

Lines changed: 58 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,15 @@
1515
*/
1616
package rx.schedulers;
1717

18-
import java.util.concurrent.ConcurrentLinkedQueue;
19-
import java.util.concurrent.Executor;
20-
import java.util.concurrent.Future;
21-
import java.util.concurrent.RejectedExecutionException;
22-
import java.util.concurrent.ScheduledExecutorService;
23-
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.*;
2419
import java.util.concurrent.atomic.AtomicInteger;
25-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
26-
import rx.Scheduler;
27-
import rx.Subscription;
20+
21+
import rx.*;
2822
import rx.functions.Action0;
23+
import rx.internal.schedulers.ScheduledAction;
24+
import rx.internal.util.SubscriptionList;
2925
import rx.plugins.RxJavaPlugins;
30-
import rx.subscriptions.CompositeSubscription;
31-
import rx.subscriptions.MultipleAssignmentSubscription;
32-
import rx.subscriptions.Subscriptions;
26+
import rx.subscriptions.*;
3327

3428
/**
3529
* Scheduler that wraps an Executor instance and establishes the Scheduler contract upon it.
@@ -58,12 +52,12 @@ static final class ExecutorSchedulerWorker extends Scheduler.Worker implements R
5852
// TODO: use a better performing structure for task tracking
5953
final CompositeSubscription tasks;
6054
// TODO: use MpscLinkedQueue once available
61-
final ConcurrentLinkedQueue<ExecutorAction> queue;
55+
final ConcurrentLinkedQueue<ScheduledAction> queue;
6256
final AtomicInteger wip;
6357

6458
public ExecutorSchedulerWorker(Executor executor) {
6559
this.executor = executor;
66-
this.queue = new ConcurrentLinkedQueue<ExecutorAction>();
60+
this.queue = new ConcurrentLinkedQueue<ScheduledAction>();
6761
this.wip = new AtomicInteger();
6862
this.tasks = new CompositeSubscription();
6963
}
@@ -73,11 +67,15 @@ public Subscription schedule(Action0 action) {
7367
if (isUnsubscribed()) {
7468
return Subscriptions.unsubscribed();
7569
}
76-
ExecutorAction ea = new ExecutorAction(action, tasks);
70+
ScheduledAction ea = new ScheduledAction(action, tasks);
7771
tasks.add(ea);
7872
queue.offer(ea);
7973
if (wip.getAndIncrement() == 0) {
8074
try {
75+
// note that since we schedule the emission of potentially multiple tasks
76+
// there is no clear way to cancel this schedule from individual tasks
77+
// so even if executor is an ExecutorService, we can't associate the future
78+
// returned by submit() with any particular ScheduledAction
8179
executor.execute(this);
8280
} catch (RejectedExecutionException t) {
8381
// cleanup if rejected
@@ -96,7 +94,10 @@ public Subscription schedule(Action0 action) {
9694
@Override
9795
public void run() {
9896
do {
99-
queue.poll().run();
97+
ScheduledAction sa = queue.poll();
98+
if (!sa.isUnsubscribed()) {
99+
sa.run();
100+
}
100101
} while (wip.decrementAndGet() > 0);
101102
}
102103

@@ -115,28 +116,56 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
115116
service = GenericScheduledExecutorService.getInstance();
116117
}
117118

119+
final MultipleAssignmentSubscription first = new MultipleAssignmentSubscription();
118120
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
119-
// tasks.add(mas); // Needs a removal without unsubscription
121+
mas.set(first);
122+
tasks.add(mas);
123+
final Subscription removeMas = Subscriptions.create(new Action0() {
124+
@Override
125+
public void call() {
126+
tasks.remove(mas);
127+
}
128+
});
120129

121-
try {
122-
Future<?> f = service.schedule(new Runnable() {
123-
@Override
124-
public void run() {
125-
if (mas.isUnsubscribed()) {
126-
return;
127-
}
128-
mas.set(schedule(action));
129-
// tasks.delete(mas); // Needs a removal without unsubscription
130+
ScheduledAction ea = new ScheduledAction(new Action0() {
131+
@Override
132+
public void call() {
133+
if (mas.isUnsubscribed()) {
134+
return;
130135
}
131-
}, delayTime, unit);
132-
mas.set(Subscriptions.from(f));
136+
// schedule the real action untimed
137+
Subscription s2 = schedule(action);
138+
mas.set(s2);
139+
// unless the worker is unsubscribed, we should get a new ScheduledAction
140+
if (s2.getClass() == ScheduledAction.class) {
141+
// when this ScheduledAction completes, we need to remove the
142+
// MAS referencing the whole setup to avoid leaks
143+
((ScheduledAction)s2).add(removeMas);
144+
}
145+
}
146+
});
147+
// This will make sure if ea.call() gets executed before this line
148+
// we don't override the current task in mas.
149+
first.set(ea);
150+
// we don't need to add ea to tasks because it will be tracked through mas/first
151+
152+
153+
try {
154+
Future<?> f = service.schedule(ea, delayTime, unit);
155+
ea.add(f);
133156
} catch (RejectedExecutionException t) {
134157
// report the rejection to plugins
135158
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
136159
throw t;
137160
}
138161

139-
return mas;
162+
/*
163+
* This allows cancelling either the delayed schedule or the actual schedule referenced
164+
* by mas and makes sure mas is removed from the tasks composite to avoid leaks.
165+
*/
166+
SubscriptionList result = new SubscriptionList(mas, removeMas);
167+
168+
return result;
140169
}
141170

142171
@Override
@@ -150,46 +179,4 @@ public void unsubscribe() {
150179
}
151180

152181
}
153-
154-
/** Runs the actual action and maintains an unsubscription state. */
155-
static final class ExecutorAction implements Runnable, Subscription {
156-
final Action0 actual;
157-
final CompositeSubscription parent;
158-
volatile int unsubscribed;
159-
static final AtomicIntegerFieldUpdater<ExecutorAction> UNSUBSCRIBED_UPDATER
160-
= AtomicIntegerFieldUpdater.newUpdater(ExecutorAction.class, "unsubscribed");
161-
162-
public ExecutorAction(Action0 actual, CompositeSubscription parent) {
163-
this.actual = actual;
164-
this.parent = parent;
165-
}
166-
167-
@Override
168-
public void run() {
169-
if (isUnsubscribed()) {
170-
return;
171-
}
172-
try {
173-
actual.call();
174-
} catch (Throwable t) {
175-
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
176-
Thread thread = Thread.currentThread();
177-
thread.getUncaughtExceptionHandler().uncaughtException(thread, t);
178-
} finally {
179-
unsubscribe();
180-
}
181-
}
182-
@Override
183-
public boolean isUnsubscribed() {
184-
return unsubscribed != 0;
185-
}
186-
187-
@Override
188-
public void unsubscribe() {
189-
if (UNSUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
190-
parent.remove(this);
191-
}
192-
}
193-
194-
}
195182
}

src/test/java/rx/schedulers/ExecutorSchedulerTest.java

Lines changed: 168 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,20 @@
1515
*/
1616
package rx.schedulers;
1717

18+
import static org.junit.Assert.*;
19+
20+
import java.lang.management.*;
21+
import java.util.concurrent.*;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
1824
import org.junit.Test;
19-
import rx.Scheduler;
20-
import rx.internal.util.RxThreadFactory;
2125

22-
import java.util.concurrent.Executor;
23-
import java.util.concurrent.Executors;
26+
import rx.*;
27+
import rx.Scheduler.Worker;
28+
import rx.functions.*;
29+
import rx.internal.schedulers.NewThreadWorker;
30+
import rx.internal.util.RxThreadFactory;
31+
import rx.schedulers.ExecutorScheduler.ExecutorSchedulerWorker;
2432

2533
public class ExecutorSchedulerTest extends AbstractSchedulerConcurrencyTests {
2634

@@ -40,4 +48,160 @@ public final void testUnhandledErrorIsDeliveredToThreadHandler() throws Interrup
4048
public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
4149
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
4250
}
51+
@Test(timeout = 30000)
52+
public void testCancelledTaskRetention() throws InterruptedException {
53+
System.out.println("Wait before GC");
54+
Thread.sleep(1000);
55+
56+
System.out.println("GC");
57+
System.gc();
58+
59+
Thread.sleep(1000);
60+
61+
62+
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
63+
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
64+
long initial = memHeap.getUsed();
65+
66+
System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0);
67+
68+
Scheduler.Worker w = Schedulers.io().createWorker();
69+
for (int i = 0; i < 500000; i++) {
70+
if (i % 50000 == 0) {
71+
System.out.println(" -> still scheduling: " + i);
72+
}
73+
w.schedule(Actions.empty(), 1, TimeUnit.DAYS);
74+
}
75+
76+
memHeap = memoryMXBean.getHeapMemoryUsage();
77+
long after = memHeap.getUsed();
78+
System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0);
79+
80+
w.unsubscribe();
81+
82+
System.out.println("Wait before second GC");
83+
Thread.sleep(NewThreadWorker.PURGE_FREQUENCY + 2000);
84+
85+
System.out.println("Second GC");
86+
System.gc();
87+
88+
Thread.sleep(1000);
89+
90+
memHeap = memoryMXBean.getHeapMemoryUsage();
91+
long finish = memHeap.getUsed();
92+
System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0);
93+
94+
if (finish > initial * 5) {
95+
fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d));
96+
}
97+
}
98+
99+
/** A simple executor which queues tasks and executes them one-by-one if executeOne() is called. */
100+
static final class TestExecutor implements Executor {
101+
final ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<Runnable>();
102+
@Override
103+
public void execute(Runnable command) {
104+
queue.offer(command);
105+
}
106+
public void executeOne() {
107+
Runnable r = queue.poll();
108+
if (r != null) {
109+
r.run();
110+
}
111+
}
112+
public void executeAll() {
113+
Runnable r;
114+
while ((r = queue.poll()) != null) {
115+
r.run();
116+
}
117+
}
118+
}
119+
120+
@Test
121+
public void testCancelledTasksDontRun() {
122+
final AtomicInteger calls = new AtomicInteger();
123+
Action0 task = new Action0() {
124+
@Override
125+
public void call() {
126+
calls.getAndIncrement();
127+
}
128+
};
129+
TestExecutor exec = new TestExecutor();
130+
Scheduler custom = Schedulers.from(exec);
131+
Worker w = custom.createWorker();
132+
try {
133+
Subscription s1 = w.schedule(task);
134+
Subscription s2 = w.schedule(task);
135+
Subscription s3 = w.schedule(task);
136+
137+
s1.unsubscribe();
138+
s2.unsubscribe();
139+
s3.unsubscribe();
140+
141+
exec.executeAll();
142+
143+
assertEquals(0, calls.get());
144+
} finally {
145+
w.unsubscribe();
146+
}
147+
}
148+
@Test
149+
public void testCancelledWorkerDoesntRunTasks() {
150+
final AtomicInteger calls = new AtomicInteger();
151+
Action0 task = new Action0() {
152+
@Override
153+
public void call() {
154+
calls.getAndIncrement();
155+
}
156+
};
157+
TestExecutor exec = new TestExecutor();
158+
Scheduler custom = Schedulers.from(exec);
159+
Worker w = custom.createWorker();
160+
try {
161+
w.schedule(task);
162+
w.schedule(task);
163+
w.schedule(task);
164+
} finally {
165+
w.unsubscribe();
166+
}
167+
exec.executeAll();
168+
assertEquals(0, calls.get());
169+
}
170+
@Test
171+
public void testNoTimedTaskAfterScheduleRetention() throws InterruptedException {
172+
Executor e = new Executor() {
173+
@Override
174+
public void execute(Runnable command) {
175+
command.run();
176+
}
177+
};
178+
ExecutorSchedulerWorker w = (ExecutorSchedulerWorker)Schedulers.from(e).createWorker();
179+
180+
w.schedule(Actions.empty(), 1, TimeUnit.MILLISECONDS);
181+
182+
assertTrue(w.tasks.hasSubscriptions());
183+
184+
Thread.sleep(100);
185+
186+
assertFalse(w.tasks.hasSubscriptions());
187+
}
188+
189+
@Test
190+
public void testNoTimedTaskPartRetention() {
191+
Executor e = new Executor() {
192+
@Override
193+
public void execute(Runnable command) {
194+
195+
}
196+
};
197+
ExecutorSchedulerWorker w = (ExecutorSchedulerWorker)Schedulers.from(e).createWorker();
198+
199+
Subscription s = w.schedule(Actions.empty(), 1, TimeUnit.DAYS);
200+
201+
assertTrue(w.tasks.hasSubscriptions());
202+
203+
s.unsubscribe();
204+
205+
assertFalse(w.tasks.hasSubscriptions());
206+
}
43207
}

0 commit comments

Comments
 (0)