diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java b/src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java index 7433309780..88fdd5d64e 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java @@ -147,7 +147,7 @@ public void onSubscribe(Subscription s) { @Override public void onNext(Object t) { if (SubscriptionHelper.cancel(this)) { - onComplete(); + parent.otherError(new CancellationException()); } } @@ -158,7 +158,10 @@ public void onError(Throwable t) { @Override public void onComplete() { - parent.otherError(new CancellationException()); + if (get() != SubscriptionHelper.CANCELLED) { + lazySet(SubscriptionHelper.CANCELLED); + parent.otherError(new CancellationException()); + } } public void dispose() { diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeTakeUntilPublisherTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeTakeUntilPublisherTest.java index 5cb8b8f2d8..dc7efcabda 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeTakeUntilPublisherTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeTakeUntilPublisherTest.java @@ -184,4 +184,18 @@ public void run() { to.assertResult(); } } + + @Test + public void otherSignalsAndCompletes() { + List errors = TestHelper.trackPluginErrors(); + try { + Maybe.just(1).takeUntil(Flowable.just(1).take(1)) + .test() + .assertResult(); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java index fc28aff0db..402a38d5ca 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java @@ -16,6 +16,7 @@ import java.util.List; import java.util.concurrent.CancellationException; +import static org.junit.Assert.*; import org.junit.Test; import io.reactivex.*; @@ -259,4 +260,18 @@ public void run() { } } } + + @Test + public void otherSignalsAndCompletes() { + List errors = TestHelper.trackPluginErrors(); + try { + Single.just(1).takeUntil(Flowable.just(1).take(1)) + .test() + .assertFailure(CancellationException.class); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } }