Skip to content

1.x: Fix other places that may swallow OnErrorFailedException #3468

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8176,10 +8176,8 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (OnErrorNotImplementedException e2) {
// special handling when onError is not implemented ... we just rethrow
throw e2;
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
Expand Down Expand Up @@ -8271,10 +8269,8 @@ private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Obse
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (OnErrorNotImplementedException e2) {
// special handling when onError is not implemented ... we just rethrow
throw e2;
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
Expand Down
18 changes: 5 additions & 13 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,14 @@ public void call(Subscriber<? super R> o) {
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
Exceptions.throwIfFatal(e);
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
st.onError(e);
}
} catch (Throwable e) {
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
Expand Down Expand Up @@ -1507,10 +1503,8 @@ public final void unsafeSubscribe(Subscriber<? super T> subscriber) {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (OnErrorNotImplementedException e2) {
// special handling when onError is not implemented ... we just rethrow
throw e2;
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
Expand Down Expand Up @@ -1596,10 +1590,8 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (OnErrorNotImplementedException e2) {
// special handling when onError is not implemented ... we just rethrow
throw e2;
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
Expand Down
110 changes: 110 additions & 0 deletions src/test/java/rx/exceptions/ExceptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

import org.junit.Test;

import rx.Single;
import rx.SingleSubscriber;
import rx.Subscriber;
import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
Expand Down Expand Up @@ -226,4 +229,111 @@ public void onNext(Integer integer) {
}
});
}

@Test(expected = OnErrorFailedException.class)
public void testOnErrorExceptionIsThrownFromSubscribe() {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> s1) {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> s2) {
throw new IllegalArgumentException("original exception");
}
}).subscribe(s1);
}
}
).subscribe(new OnErrorFailedSubscriber());
}

@Test(expected = OnErrorFailedException.class)
public void testOnErrorExceptionIsThrownFromUnsafeSubscribe() {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> s1) {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> s2) {
throw new IllegalArgumentException("original exception");
}
}).unsafeSubscribe(s1);
}
}
).subscribe(new OnErrorFailedSubscriber());
}

@Test(expected = OnErrorFailedException.class)
public void testOnErrorExceptionIsThrownFromSingleDoOnSuccess() throws Exception {
Single.just(1)
.doOnSuccess(new Action1<Integer>() {
@Override
public void call(Integer integer) {
throw new RuntimeException();
}
})
.subscribe(new OnErrorFailedSubscriber());
}

@Test(expected = OnErrorFailedException.class)
public void testOnErrorExceptionIsThrownFromSingleSubscribe() {
Single.create(new Single.OnSubscribe<Integer>() {
@Override
public void call(SingleSubscriber<? super Integer> s1) {
Single.create(new Single.OnSubscribe<Integer>() {
@Override
public void call(SingleSubscriber<? super Integer> s2) {
throw new IllegalArgumentException("original exception");
}
}).subscribe(s1);
}
}
).subscribe(new OnErrorFailedSubscriber());
}

@Test(expected = OnErrorFailedException.class)
public void testOnErrorExceptionIsThrownFromSingleUnsafeSubscribe() {
Single.create(new Single.OnSubscribe<Integer>() {
@Override
public void call(final SingleSubscriber<? super Integer> s1) {
Single.create(new Single.OnSubscribe<Integer>() {
@Override
public void call(SingleSubscriber<? super Integer> s2) {
throw new IllegalArgumentException("original exception");
}
}).unsafeSubscribe(new Subscriber<Integer>() {

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
s1.onError(e);
}

@Override
public void onNext(Integer v) {
s1.onSuccess(v);
}

});
}
}
).subscribe(new OnErrorFailedSubscriber());
}

private class OnErrorFailedSubscriber extends Subscriber<Integer> {
@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
throw new RuntimeException();
}

@Override
public void onNext(Integer value) {
}
}
}