Skip to content

Commit d22ba1d

Browse files
committed
Avoid swallowing errors in Completable
Instead, deliver them up to the thread's uncaught exception handler. Fixes #3726
1 parent a57bccc commit d22ba1d

File tree

4 files changed

+109
-17
lines changed

4 files changed

+109
-17
lines changed

src/main/java/rx/Completable.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1835,6 +1835,7 @@ public void onCompleted() {
18351835
public void onError(Throwable e) {
18361836
ERROR_HANDLER.handleError(e);
18371837
mad.unsubscribe();
1838+
deliverUncaughtException(e);
18381839
}
18391840

18401841
@Override
@@ -1864,14 +1865,17 @@ public void onCompleted() {
18641865
onComplete.call();
18651866
} catch (Throwable e) {
18661867
ERROR_HANDLER.handleError(e);
1868+
deliverUncaughtException(e);
1869+
} finally {
1870+
mad.unsubscribe();
18671871
}
1868-
mad.unsubscribe();
18691872
}
18701873

18711874
@Override
18721875
public void onError(Throwable e) {
18731876
ERROR_HANDLER.handleError(e);
18741877
mad.unsubscribe();
1878+
deliverUncaughtException(e);
18751879
}
18761880

18771881
@Override
@@ -1915,8 +1919,10 @@ public void onError(Throwable e) {
19151919
} catch (Throwable ex) {
19161920
e = new CompositeException(Arrays.asList(e, ex));
19171921
ERROR_HANDLER.handleError(e);
1922+
deliverUncaughtException(e);
1923+
} finally {
1924+
mad.unsubscribe();
19181925
}
1919-
mad.unsubscribe();
19201926
}
19211927

19221928
@Override
@@ -1927,7 +1933,12 @@ public void onSubscribe(Subscription d) {
19271933

19281934
return mad;
19291935
}
1930-
1936+
1937+
private static void deliverUncaughtException(Throwable e) {
1938+
Thread thread = Thread.currentThread();
1939+
thread.getUncaughtExceptionHandler().uncaughtException(thread, e);
1940+
}
1941+
19311942
/**
19321943
* Subscribes the given CompletableSubscriber to this Completable instance.
19331944
* @param s the CompletableSubscriber, not null
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package rx;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
5+
public final class CapturingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
6+
public int count = 0;
7+
public Throwable caught;
8+
public CountDownLatch completed = new CountDownLatch(1);
9+
10+
@Override
11+
public void uncaughtException(Thread t, Throwable e) {
12+
count++;
13+
caught = e;
14+
completed.countDown();
15+
}
16+
}

src/test/java/rx/CompletableTest.java

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2700,7 +2700,64 @@ public void call(CompletableSubscriber s) {
27002700

27012701
Assert.assertTrue(name.get().startsWith("RxComputation"));
27022702
}
2703-
2703+
2704+
@Test(timeout = 1000)
2705+
public void subscribeEmptyOnError() {
2706+
expectUncaughtTestException(new Action0() {
2707+
@Override public void call() {
2708+
error.completable.subscribe();
2709+
}
2710+
});
2711+
}
2712+
2713+
@Test(timeout = 1000)
2714+
public void subscribeOneActionOnError() {
2715+
expectUncaughtTestException(new Action0() {
2716+
@Override
2717+
public void call() {
2718+
error.completable.subscribe(new Action0() {
2719+
@Override
2720+
public void call() {
2721+
}
2722+
});
2723+
}
2724+
});
2725+
}
2726+
2727+
@Test(timeout = 1000)
2728+
public void subscribeOneActionThrowFromOnCompleted() {
2729+
expectUncaughtTestException(new Action0() {
2730+
@Override
2731+
public void call() {
2732+
normal.completable.subscribe(new Action0() {
2733+
@Override
2734+
public void call() {
2735+
throw new TestException();
2736+
}
2737+
});
2738+
}
2739+
});
2740+
}
2741+
2742+
@Test(timeout = 1000)
2743+
public void subscribeTwoActionsThrowFromOnError() {
2744+
expectUncaughtTestException(new Action0() {
2745+
@Override
2746+
public void call() {
2747+
error.completable.subscribe(new Action1<Throwable>() {
2748+
@Override
2749+
public void call(Throwable throwable) {
2750+
throw new TestException();
2751+
}
2752+
}, new Action0() {
2753+
@Override
2754+
public void call() {
2755+
}
2756+
});
2757+
}
2758+
});
2759+
}
2760+
27042761
@Test(timeout = 1000)
27052762
public void timeoutEmitError() {
27062763
Throwable e = Completable.never().timeout(100, TimeUnit.MILLISECONDS).get();
@@ -3742,4 +3799,24 @@ public void call(Throwable e) {
37423799
assertNotNull("Unsubscribed before the call to onError", subscriptionRef.get());
37433800
}
37443801

3802+
private static void expectUncaughtTestException(Action0 action) {
3803+
Thread.UncaughtExceptionHandler originalHandler = Thread.getDefaultUncaughtExceptionHandler();
3804+
CapturingUncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler();
3805+
Thread.setDefaultUncaughtExceptionHandler(handler);
3806+
try {
3807+
action.call();
3808+
assertEquals("Should have received exactly 1 exception", 1, handler.count);
3809+
Throwable caught = handler.caught;
3810+
while (caught != null) {
3811+
if (caught instanceof TestException) break;
3812+
if (caught == caught.getCause()) break;
3813+
caught = caught.getCause();
3814+
}
3815+
assertTrue("A TestException should have been delivered to the handler",
3816+
caught instanceof TestException);
3817+
} finally {
3818+
Thread.setDefaultUncaughtExceptionHandler(originalHandler);
3819+
}
3820+
}
3821+
37453822
}

src/test/java/rx/schedulers/SchedulerTests.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package rx.schedulers;
22

3+
import rx.CapturingUncaughtExceptionHandler;
34
import rx.Observable;
45
import rx.Observer;
56
import rx.Scheduler;
@@ -87,19 +88,6 @@ static void testHandledErrorIsNotDeliveredToThreadHandler(Scheduler scheduler) t
8788
}
8889
}
8990

90-
private static final class CapturingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
91-
int count = 0;
92-
Throwable caught;
93-
CountDownLatch completed = new CountDownLatch(1);
94-
95-
@Override
96-
public void uncaughtException(Thread t, Throwable e) {
97-
count++;
98-
caught = e;
99-
completed.countDown();
100-
}
101-
}
102-
10391
private static final class CapturingObserver<T> implements Observer<T> {
10492
CountDownLatch completed = new CountDownLatch(1);
10593
int errorCount = 0;

0 commit comments

Comments
 (0)