diff --git a/src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java b/src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java index c976fba197..80254abf57 100644 --- a/src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java +++ b/src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java @@ -21,6 +21,7 @@ import rx.Observable.Operator; import rx.exceptions.Exceptions; import rx.functions.Action1; +import rx.plugins.RxJavaHooks; public class OperatorOnBackpressureDrop implements Operator { @@ -62,6 +63,9 @@ public void request(long n) { }); return new Subscriber(child) { + + boolean done; + @Override public void onStart() { request(Long.MAX_VALUE); @@ -69,16 +73,27 @@ public void onStart() { @Override public void onCompleted() { - child.onCompleted(); + if (!done) { + done = true; + child.onCompleted(); + } } @Override public void onError(Throwable e) { - child.onError(e); + if (!done) { + done = true; + child.onError(e); + } else { + RxJavaHooks.onError(e); + } } @Override public void onNext(T t) { + if (done) { + return; + } if (requested.get() > 0) { child.onNext(t); requested.decrementAndGet(); @@ -88,7 +103,7 @@ public void onNext(T t) { try { onDrop.call(t); } catch (Throwable e) { - Exceptions.throwOrReport(e, child, t); + Exceptions.throwOrReport(e, this, t); return; } } diff --git a/src/test/java/rx/internal/operators/OperatorOnBackpressureDropTest.java b/src/test/java/rx/internal/operators/OperatorOnBackpressureDropTest.java index 568ccfd985..c26df3e540 100644 --- a/src/test/java/rx/internal/operators/OperatorOnBackpressureDropTest.java +++ b/src/test/java/rx/internal/operators/OperatorOnBackpressureDropTest.java @@ -18,19 +18,24 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; -import rx.Observable; +import rx.*; import rx.Observable.OnSubscribe; import rx.Observer; import rx.Subscriber; import rx.functions.Action1; import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; +import rx.plugins.RxJavaHooks; import rx.schedulers.Schedulers; public class OperatorOnBackpressureDropTest { @@ -141,6 +146,144 @@ public void call(Throwable t) { assertFalse(errorOccurred.get()); } + @Test + public void testOnDropMethodIsCalled() { + final List list = new ArrayList(); + // request 0 + TestSubscriber ts = TestSubscriber.create(0); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber sub) { + sub.setProducer(new Producer() { + + @Override + public void request(long n) { + if (n > 1) { + sub.onNext(1); + sub.onNext(2); + sub.onCompleted(); + } + } + }); + } + }).onBackpressureDrop(new Action1() { + @Override + public void call(Integer t) { + list.add(t); + } + }).subscribe(ts); + assertEquals(Arrays.asList(1, 2), list); + } + + @Test + public void testUpstreamEmitsOnCompletedAfterFailureWithoutCheckingSubscription() { + TestSubscriber ts = TestSubscriber.create(0); + final RuntimeException e = new RuntimeException(); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber sub) { + sub.setProducer(new Producer() { + + @Override + public void request(long n) { + if (n > 1) { + sub.onNext(1); + sub.onCompleted(); + } + } + }); + } + }) + .onBackpressureDrop(new Action1() { + @Override + public void call(Integer t) { + throw e; + }}) + .unsafeSubscribe(ts); + ts.assertNoValues(); + ts.assertError(e); + ts.assertNotCompleted(); + } + + @Test + public void testUpstreamEmitsErrorAfterFailureWithoutCheckingSubscriptionResultsInHooksOnErrorCalled() { + try { + final List list = new CopyOnWriteArrayList(); + RxJavaHooks.setOnError(new Action1() { + + @Override + public void call(Throwable t) { + list.add(t); + } + }); + TestSubscriber ts = TestSubscriber.create(0); + final RuntimeException e1 = new RuntimeException(); + final RuntimeException e2 = new RuntimeException(); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber sub) { + sub.setProducer(new Producer() { + + @Override + public void request(long n) { + if (n > 1) { + sub.onNext(1); + sub.onError(e2); + } + } + }); + } + }).onBackpressureDrop(new Action1() { + @Override + public void call(Integer t) { + throw e1; + } + }).unsafeSubscribe(ts); + ts.assertNoValues(); + assertEquals(Arrays.asList(e1), ts.getOnErrorEvents()); + ts.assertNotCompleted(); + assertEquals(Arrays.asList(e2), list); + } finally { + RxJavaHooks.setOnError(null); + } + } + + @Test + public void testUpstreamEmitsOnNextAfterFailureWithoutCheckingSubscription() { + TestSubscriber ts = TestSubscriber.create(0); + final RuntimeException e = new RuntimeException(); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber sub) { + sub.setProducer(new Producer() { + + @Override + public void request(long n) { + if (n > 1) { + sub.onNext(1); + sub.onNext(2); + } + } + }); + } + }) + .onBackpressureDrop(new Action1() { + @Override + public void call(Integer t) { + throw e; + }}) + .unsafeSubscribe(ts); + ts.assertNoValues(); + ts.assertError(e); + ts.assertNotCompleted(); + } + + + private static final Action1 THROW_NON_FATAL = new Action1() { @Override public void call(Long n) {