diff --git a/src/main/java/rx/Completable.java b/src/main/java/rx/Completable.java index 5fd7216a3b..db4c5bde97 100644 --- a/src/main/java/rx/Completable.java +++ b/src/main/java/rx/Completable.java @@ -568,7 +568,7 @@ public void onNext(Object t) { } }; cs.onSubscribe(subscriber); - flowable.subscribe(subscriber); + flowable.unsafeSubscribe(subscriber); } }); } diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 04551d98af..bd8facd575 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -232,6 +232,31 @@ public Single toSingle() { return new Single(OnSubscribeSingle.create(this)); } + /** + * Returns a Completable that discards all onNext emissions (similar to + * {@code ignoreAllElements()}) and calls onCompleted when this source observable calls + * onCompleted. Error terminal events are propagated. + *

+ * + *

+ *
Scheduler:
+ *
{@code toCompletable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return a Completable that calls onCompleted on it's subscriber when the source Observable + * calls onCompleted + * @see ReactiveX documentation: + * Completable + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical + * with the release number) + */ + @Experimental + public Completable toCompletable() { + return Completable.fromObservable(this); + } + /* ********************************************************************************************************* * Operators Below Here diff --git a/src/test/java/rx/internal/operators/OnSubscribeCompletableTest.java b/src/test/java/rx/internal/operators/OnSubscribeCompletableTest.java new file mode 100644 index 0000000000..e30bb78062 --- /dev/null +++ b/src/test/java/rx/internal/operators/OnSubscribeCompletableTest.java @@ -0,0 +1,98 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import static org.junit.Assert.assertFalse; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; + +import rx.Completable; +import rx.Observable; +import rx.functions.Action0; +import rx.observers.TestSubscriber; + +public class OnSubscribeCompletableTest { + + @Test + public void testJustSingleItemObservable() { + TestSubscriber subscriber = TestSubscriber.create(); + Completable cmp = Observable.just("Hello World!").toCompletable(); + cmp.subscribe(subscriber); + + subscriber.assertNoValues(); + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + } + + @Test + public void testErrorObservable() { + TestSubscriber subscriber = TestSubscriber.create(); + IllegalArgumentException error = new IllegalArgumentException("Error"); + Completable cmp = Observable.error(error).toCompletable(); + cmp.subscribe(subscriber); + + subscriber.assertError(error); + subscriber.assertNoValues(); + } + + @Test + public void testJustTwoEmissionsObservableThrowsError() { + TestSubscriber subscriber = TestSubscriber.create(); + Completable cmp = Observable.just("First", "Second").toCompletable(); + cmp.subscribe(subscriber); + + subscriber.assertNoErrors(); + subscriber.assertNoValues(); + } + + @Test + public void testEmptyObservable() { + TestSubscriber subscriber = TestSubscriber.create(); + Completable cmp = Observable.empty().toCompletable(); + cmp.subscribe(subscriber); + + subscriber.assertNoErrors(); + subscriber.assertNoValues(); + subscriber.assertCompleted(); + } + + @Test + public void testNeverObservable() { + TestSubscriber subscriber = TestSubscriber.create(); + Completable cmp = Observable.never().toCompletable(); + cmp.subscribe(subscriber); + + subscriber.assertNoTerminalEvent(); + subscriber.assertNoValues(); + } + + @Test + public void testShouldUseUnsafeSubscribeInternallyNotSubscribe() { + TestSubscriber subscriber = TestSubscriber.create(); + final AtomicBoolean unsubscribed = new AtomicBoolean(false); + Completable cmp = Observable.just("Hello World!").doOnUnsubscribe(new Action0() { + + @Override + public void call() { + unsubscribed.set(true); + }}).toCompletable(); + cmp.subscribe(subscriber); + subscriber.assertCompleted(); + assertFalse(unsubscribed.get()); + } +}