diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 6aa6dbd839..11aa88fe90 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -1586,6 +1586,24 @@ public final Flowable startWith(Publisher other) { return this.toFlowable().startWith(other); } + /** + * Hides the identity of this Completable and its Disposable. + *

Allows preventing certain identity-based + * optimizations (fusion). + *

+ *
Scheduler:
+ *
{@code hide} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new Completable instance + * @since 2.0.5 - experimental + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final Completable hide() { + return RxJavaPlugins.onAssembly(new CompletableHide(this)); + } + /** * Subscribes to this CompletableConsumable and returns a Disposable which can be used to cancel * the subscription. diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableHide.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableHide.java new file mode 100644 index 0000000000..747f65a3e0 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableHide.java @@ -0,0 +1,78 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import io.reactivex.Completable; +import io.reactivex.CompletableObserver; +import io.reactivex.CompletableSource; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; + +/** + * Hides the identity of the upstream Completable and its Disposable sent through onSubscribe. + */ +public final class CompletableHide extends Completable { + + final CompletableSource source; + + public CompletableHide(CompletableSource source) { + this.source = source; + } + + @Override + protected void subscribeActual(CompletableObserver observer) { + source.subscribe(new HideCompletableObserver(observer)); + } + + static final class HideCompletableObserver implements CompletableObserver, Disposable { + + final CompletableObserver actual; + + Disposable d; + + HideCompletableObserver(CompletableObserver actual) { + this.actual = actual; + } + + @Override + public void dispose() { + d.dispose(); + d = DisposableHelper.DISPOSED; + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + } +} diff --git a/src/test/java/io/reactivex/TestHelper.java b/src/test/java/io/reactivex/TestHelper.java index f40fcb8a7c..ffd1dfd072 100644 --- a/src/test/java/io/reactivex/TestHelper.java +++ b/src/test/java/io/reactivex/TestHelper.java @@ -32,6 +32,7 @@ import io.reactivex.functions.*; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.fuseable.*; +import io.reactivex.internal.operators.completable.CompletableToFlowable; import io.reactivex.internal.operators.maybe.MaybeToFlowable; import io.reactivex.internal.operators.single.SingleToFlowable; import io.reactivex.internal.subscriptions.BooleanSubscription; @@ -1962,6 +1963,28 @@ public static void checkDisposedMaybe(Function, ? extends MaybeS assertFalse("Dispose not propagated!", pp.hasSubscribers()); } + /** + * Check if the operator applied to a Completable source propagates dispose properly. + * @param composer the function to apply an operator to the provided Completable source + */ + public static void checkDisposedCompletable(Function composer) { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = new TestSubscriber(); + + try { + new CompletableToFlowable(composer.apply(pp.ignoreElements())).subscribe(ts); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertTrue("Not subscribed to source!", pp.hasSubscribers()); + + ts.cancel(); + + assertFalse("Dispose not propagated!", pp.hasSubscribers()); + } + /** * Check if the operator applied to a Maybe source propagates dispose properly. * @param the source value type diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableHideTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableHideTest.java new file mode 100644 index 0000000000..47e3d763e5 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableHideTest.java @@ -0,0 +1,85 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import io.reactivex.Completable; +import io.reactivex.CompletableSource; +import io.reactivex.TestHelper; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.subjects.CompletableSubject; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; + +public class CompletableHideTest { + + @Test + public void never() { + Completable.never() + .hide() + .test() + .assertNotComplete() + .assertNoErrors(); + } + + @Test + public void complete() { + Completable.complete() + .hide() + .test() + .assertResult(); + } + + @Test + public void error() { + Completable.error(new TestException()) + .hide() + .test() + .assertFailure(TestException.class); + } + + @Test + public void hidden() { + assertFalse(CompletableSubject.create().hide() instanceof CompletableSubject); + } + + @Test + public void dispose() { + TestHelper.checkDisposedCompletable(new Function() { + @Override + public CompletableSource apply(Completable m) throws Exception { + return m.hide(); + } + }); + } + + @Test + public void isDisposed() { + PublishProcessor pp = PublishProcessor.create(); + + TestHelper.checkDisposed(pp.ignoreElements().hide()); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeCompletable(new Function() { + @Override + public Completable apply(Completable f) throws Exception { + return f.hide(); + } + }); + } +}