Skip to content

Commit dd0c96d

Browse files
committed
2.x: ExecutorScheduler.scheduleDirect to report isDisposed on complete
1 parent 9c34eb1 commit dd0c96d

File tree

4 files changed

+181
-15
lines changed

4 files changed

+181
-15
lines changed

src/main/java/io/reactivex/disposables/FutureDisposable.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@
1313
package io.reactivex.disposables;
1414

1515
import java.util.concurrent.Future;
16+
import java.util.concurrent.atomic.AtomicReference;
1617

1718
/**
1819
* A Disposable container that cancels a Future instance.
1920
*/
20-
final class FutureDisposable extends ReferenceDisposable<Future<?>> {
21+
final class FutureDisposable extends AtomicReference<Future<?>> implements Disposable {
2122

2223
private static final long serialVersionUID = 6545242830671168775L;
2324

@@ -29,7 +30,16 @@ final class FutureDisposable extends ReferenceDisposable<Future<?>> {
2930
}
3031

3132
@Override
32-
protected void onDisposed(Future<?> value) {
33-
value.cancel(allowInterrupt);
33+
public boolean isDisposed() {
34+
Future<?> f = get();
35+
return f == null || f.isDone();
36+
}
37+
38+
@Override
39+
public void dispose() {
40+
Future<?> f = getAndSet(null);
41+
if (f != null) {
42+
f.cancel(allowInterrupt);
43+
}
3444
}
3545
}

src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public Disposable scheduleDirect(Runnable run) {
6161
}
6262

6363
@Override
64-
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
64+
public Disposable scheduleDirect(Runnable run, final long delay, final TimeUnit unit) {
6565
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
6666
if (executor instanceof ScheduledExecutorService) {
6767
try {
@@ -72,20 +72,19 @@ public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
7272
return EmptyDisposable.INSTANCE;
7373
}
7474
}
75-
SequentialDisposable first = new SequentialDisposable();
7675

77-
final SequentialDisposable mar = new SequentialDisposable(first);
76+
final DelayedRunnable dr = new DelayedRunnable(decoratedRun);
7877

7978
Disposable delayed = HELPER.scheduleDirect(new Runnable() {
8079
@Override
8180
public void run() {
82-
mar.replace(scheduleDirect(decoratedRun));
81+
dr.direct.replace(scheduleDirect(dr));
8382
}
8483
}, delay, unit);
8584

86-
first.replace(delayed);
85+
dr.timed.replace(delayed);
8786

88-
return mar;
87+
return dr;
8988
}
9089

9190
@Override
@@ -253,7 +252,11 @@ public void run() {
253252
if (get()) {
254253
return;
255254
}
256-
actual.run();
255+
try {
256+
actual.run();
257+
} finally {
258+
lazySet(true);
259+
}
257260
}
258261

259262
@Override
@@ -266,6 +269,49 @@ public boolean isDisposed() {
266269
return get();
267270
}
268271
}
272+
273+
}
274+
275+
static final class DelayedRunnable extends AtomicReference<Runnable> implements Runnable, Disposable {
276+
277+
private static final long serialVersionUID = -4101336210206799084L;
278+
279+
final SequentialDisposable timed;
280+
281+
final SequentialDisposable direct;
282+
283+
DelayedRunnable(Runnable run) {
284+
super(run);
285+
this.timed = new SequentialDisposable();
286+
this.direct = new SequentialDisposable();
287+
}
288+
289+
@Override
290+
public void run() {
291+
Runnable r = get();
292+
if (r != null) {
293+
try {
294+
r.run();
295+
} finally {
296+
lazySet(null);
297+
timed.lazySet(DisposableHelper.DISPOSED);
298+
direct.lazySet(DisposableHelper.DISPOSED);
299+
}
300+
}
301+
}
302+
303+
@Override
304+
public boolean isDisposed() {
305+
return get() == null;
306+
}
307+
308+
@Override
309+
public void dispose() {
310+
if (getAndSet(null) != null) {
311+
timed.dispose();
312+
direct.dispose();
313+
}
314+
}
269315
}
270316

271317
}

src/test/java/io/reactivex/internal/schedulers/SingleSchedulerTest.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,16 @@
1515

1616
import static org.junit.Assert.*;
1717

18-
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.*;
1919

2020
import org.junit.Test;
2121

2222
import io.reactivex.*;
2323
import io.reactivex.Scheduler.Worker;
24-
import io.reactivex.disposables.Disposables;
24+
import io.reactivex.disposables.*;
25+
import io.reactivex.internal.functions.Functions;
2526
import io.reactivex.internal.schedulers.SingleScheduler.ScheduledWorker;
27+
import io.reactivex.schedulers.Schedulers;
2628

2729
public class SingleSchedulerTest {
2830

@@ -78,4 +80,41 @@ public void run() {
7880
TestHelper.race(r1, r1);
7981
}
8082
}
83+
84+
@Test(timeout = 1000)
85+
public void runnableDisposedAsync() throws Exception {
86+
final Scheduler s = Schedulers.single();
87+
Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE);
88+
89+
while (!d.isDisposed()) {
90+
Thread.sleep(1);
91+
}
92+
}
93+
94+
@Test(timeout = 1000)
95+
public void runnableDisposedAsyncCrash() throws Exception {
96+
final Scheduler s = Schedulers.single();
97+
98+
Disposable d = s.scheduleDirect(new Runnable() {
99+
@Override
100+
public void run() {
101+
throw new IllegalStateException();
102+
}
103+
});
104+
105+
while (!d.isDisposed()) {
106+
Thread.sleep(1);
107+
}
108+
}
109+
110+
@Test(timeout = 1000)
111+
public void runnableDisposedAsyncTimed() throws Exception {
112+
final Scheduler s = Schedulers.single();
113+
114+
Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MILLISECONDS);
115+
116+
while (!d.isDisposed()) {
117+
Thread.sleep(1);
118+
}
119+
}
81120
}

src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -484,10 +484,81 @@ public void execute(Runnable r) {
484484
});
485485
Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE);
486486

487-
assertFalse(d.isDisposed());
487+
assertTrue(d.isDisposed());
488+
}
488489

489-
d.dispose();
490+
@Test(timeout = 1000)
491+
public void runnableDisposedAsync() throws Exception {
492+
final Scheduler s = Schedulers.from(new Executor() {
493+
@Override
494+
public void execute(Runnable r) {
495+
new Thread(r).start();
496+
}
497+
});
498+
Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE);
490499

491-
assertTrue(d.isDisposed());
500+
while (!d.isDisposed()) {
501+
Thread.sleep(1);
502+
}
503+
}
504+
505+
@Test(timeout = 1000)
506+
public void runnableDisposedAsync2() throws Exception {
507+
final Scheduler s = Schedulers.from(executor);
508+
Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE);
509+
510+
while (!d.isDisposed()) {
511+
Thread.sleep(1);
512+
}
513+
}
514+
515+
@Test(timeout = 1000)
516+
public void runnableDisposedAsyncCrash() throws Exception {
517+
final Scheduler s = Schedulers.from(new Executor() {
518+
@Override
519+
public void execute(Runnable r) {
520+
new Thread(r).start();
521+
}
522+
});
523+
Disposable d = s.scheduleDirect(new Runnable() {
524+
@Override
525+
public void run() {
526+
throw new IllegalStateException();
527+
}
528+
});
529+
530+
while (!d.isDisposed()) {
531+
Thread.sleep(1);
532+
}
533+
}
534+
535+
@Test(timeout = 1000)
536+
public void runnableDisposedAsyncTimed() throws Exception {
537+
final Scheduler s = Schedulers.from(new Executor() {
538+
@Override
539+
public void execute(Runnable r) {
540+
new Thread(r).start();
541+
}
542+
});
543+
Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MILLISECONDS);
544+
545+
while (!d.isDisposed()) {
546+
Thread.sleep(1);
547+
}
548+
}
549+
550+
@Test(timeout = 1000)
551+
public void runnableDisposedAsyncTimed2() throws Exception {
552+
ExecutorService executorScheduler = Executors.newScheduledThreadPool(1, new RxThreadFactory("TestCustomPoolTimed"));
553+
try {
554+
final Scheduler s = Schedulers.from(executorScheduler);
555+
Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MILLISECONDS);
556+
557+
while (!d.isDisposed()) {
558+
Thread.sleep(1);
559+
}
560+
} finally {
561+
executorScheduler.shutdownNow();
562+
}
492563
}
493564
}

0 commit comments

Comments
 (0)