diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 444f20a4d8..cf1686ad83 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -8176,10 +8176,8 @@ public final Subscription unsafeSubscribe(Subscriber subscriber) { // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); - } catch (OnErrorNotImplementedException e2) { - // special handling when onError is not implemented ... we just rethrow - throw e2; } catch (Throwable e2) { + Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); @@ -8271,10 +8269,8 @@ private static Subscription subscribe(Subscriber subscriber, Obse // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); - } catch (OnErrorNotImplementedException e2) { - // special handling when onError is not implemented ... we just rethrow - throw e2; } catch (Throwable e2) { + Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index e082daeaab..77e644bc3d 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -190,18 +190,14 @@ public void call(Subscriber o) { st.onStart(); onSubscribe.call(st); } catch (Throwable e) { - // localized capture of errors rather than it skipping all operators + Exceptions.throwIfFatal(e); + // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling - if (e instanceof OnErrorNotImplementedException) { - throw (OnErrorNotImplementedException) e; - } st.onError(e); } } catch (Throwable e) { - if (e instanceof OnErrorNotImplementedException) { - throw (OnErrorNotImplementedException) e; - } + Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); @@ -1507,10 +1503,8 @@ public final void unsafeSubscribe(Subscriber subscriber) { // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); - } catch (OnErrorNotImplementedException e2) { - // special handling when onError is not implemented ... we just rethrow - throw e2; } catch (Throwable e2) { + Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); @@ -1596,10 +1590,8 @@ public final Subscription subscribe(Subscriber subscriber) { // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); - } catch (OnErrorNotImplementedException e2) { - // special handling when onError is not implemented ... we just rethrow - throw e2; } catch (Throwable e2) { + Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); diff --git a/src/test/java/rx/exceptions/ExceptionsTest.java b/src/test/java/rx/exceptions/ExceptionsTest.java index 96396ccb75..5906a6d6f9 100644 --- a/src/test/java/rx/exceptions/ExceptionsTest.java +++ b/src/test/java/rx/exceptions/ExceptionsTest.java @@ -22,6 +22,9 @@ import org.junit.Test; +import rx.Single; +import rx.SingleSubscriber; +import rx.Subscriber; import rx.Observable; import rx.Observer; import rx.functions.Action1; @@ -226,4 +229,111 @@ public void onNext(Integer integer) { } }); } + + @Test(expected = OnErrorFailedException.class) + public void testOnErrorExceptionIsThrownFromSubscribe() { + Observable.create(new Observable.OnSubscribe() { + @Override + public void call(Subscriber s1) { + Observable.create(new Observable.OnSubscribe() { + @Override + public void call(Subscriber s2) { + throw new IllegalArgumentException("original exception"); + } + }).subscribe(s1); + } + } + ).subscribe(new OnErrorFailedSubscriber()); + } + + @Test(expected = OnErrorFailedException.class) + public void testOnErrorExceptionIsThrownFromUnsafeSubscribe() { + Observable.create(new Observable.OnSubscribe() { + @Override + public void call(Subscriber s1) { + Observable.create(new Observable.OnSubscribe() { + @Override + public void call(Subscriber s2) { + throw new IllegalArgumentException("original exception"); + } + }).unsafeSubscribe(s1); + } + } + ).subscribe(new OnErrorFailedSubscriber()); + } + + @Test(expected = OnErrorFailedException.class) + public void testOnErrorExceptionIsThrownFromSingleDoOnSuccess() throws Exception { + Single.just(1) + .doOnSuccess(new Action1() { + @Override + public void call(Integer integer) { + throw new RuntimeException(); + } + }) + .subscribe(new OnErrorFailedSubscriber()); + } + + @Test(expected = OnErrorFailedException.class) + public void testOnErrorExceptionIsThrownFromSingleSubscribe() { + Single.create(new Single.OnSubscribe() { + @Override + public void call(SingleSubscriber s1) { + Single.create(new Single.OnSubscribe() { + @Override + public void call(SingleSubscriber s2) { + throw new IllegalArgumentException("original exception"); + } + }).subscribe(s1); + } + } + ).subscribe(new OnErrorFailedSubscriber()); + } + + @Test(expected = OnErrorFailedException.class) + public void testOnErrorExceptionIsThrownFromSingleUnsafeSubscribe() { + Single.create(new Single.OnSubscribe() { + @Override + public void call(final SingleSubscriber s1) { + Single.create(new Single.OnSubscribe() { + @Override + public void call(SingleSubscriber s2) { + throw new IllegalArgumentException("original exception"); + } + }).unsafeSubscribe(new Subscriber() { + + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + s1.onError(e); + } + + @Override + public void onNext(Integer v) { + s1.onSuccess(v); + } + + }); + } + } + ).subscribe(new OnErrorFailedSubscriber()); + } + + private class OnErrorFailedSubscriber extends Subscriber { + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + throw new RuntimeException(); + } + + @Override + public void onNext(Integer value) { + } + } }