From 30f890a13611a5fed1f8fc605b10ce8eca856642 Mon Sep 17 00:00:00 2001 From: George Campbell Date: Tue, 30 Jun 2015 12:52:55 -0700 Subject: [PATCH] New experimental operator to switch the types on the completion of empty Observables. --- src/main/java/rx/Observable.java | 20 ++ .../operators/OperatorSwitchEmpty.java | 121 +++++++++ .../operators/OperatorSwitchEmptyTest.java | 247 ++++++++++++++++++ .../operators/OperatorSwitchIfEmptyTest.java | 36 +++ 4 files changed, 424 insertions(+) create mode 100644 src/main/java/rx/internal/operators/OperatorSwitchEmpty.java create mode 100644 src/test/java/rx/internal/operators/OperatorSwitchEmptyTest.java diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 3f27cb0a83..8e626944d5 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -3908,6 +3908,26 @@ public final Observable switchIfEmpty(Observable alternate) { return lift(new OperatorSwitchIfEmpty(alternate)); } + /** + * Returns an Observable that only emits the items emitted by the alternate Observable if the source Observable + * is empty. If the source Observable errors or emit any values then an error is propagated and alternate Observable is + * never subscribed to. + *

+ *

+ *
Scheduler:
+ *
{@code switchEmpty} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param alternate + * the alternate Observable to subscribe to if the source does not emit any items + * @return an Observable that only emits the items of an alternate Observable if the source Observable is empty. + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public final Observable switchEmpty(Observable alternate) { + return lift(new OperatorSwitchEmpty(alternate)); + } + /** * Returns an Observable that delays the subscription to and emissions from the souce Observable via another * Observable on a per-item basis. diff --git a/src/main/java/rx/internal/operators/OperatorSwitchEmpty.java b/src/main/java/rx/internal/operators/OperatorSwitchEmpty.java new file mode 100644 index 0000000000..4f0121cdc1 --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorSwitchEmpty.java @@ -0,0 +1,121 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + + +import rx.*; +import rx.internal.producers.ProducerArbiter; +import rx.subscriptions.SerialSubscription; + +/** + * If the Observable completes without emitting any items, subscribe to an alternate Observable. Allows for similar + * functionality to {@link Observable#cast(Class)} followed by {@link Observable#concatWith(Observable)} except it + * errors if the source Observable is not empty. + */ +public final class OperatorSwitchEmpty implements Observable.Operator { + private final Observable alternate; + + public OperatorSwitchEmpty(Observable alternate) { + this.alternate = alternate; + } + + @Override + public Subscriber call(Subscriber child) { + final SerialSubscription ssub = new SerialSubscription(); + ProducerArbiter arbiter = new ProducerArbiter(); + final ParentSubscriber parent = new ParentSubscriber(child, ssub, arbiter, alternate); + ssub.set(parent); + child.add(ssub); + child.setProducer(arbiter); + return parent; + } + + private static final class ParentSubscriber extends Subscriber { + + private final Subscriber child; + private final SerialSubscription ssub; + private final ProducerArbiter arbiter; + private final Observable alternate; + + ParentSubscriber(Subscriber child, final SerialSubscription ssub, ProducerArbiter arbiter, Observable alternate) { + this.child = child; + this.ssub = ssub; + this.arbiter = arbiter; + this.alternate = alternate; + } + + @Override + public void setProducer(final Producer producer) { + arbiter.setProducer(producer); + } + + @Override + public void onCompleted() { + if (!child.isUnsubscribed()) { + subscribeToAlternate(); + } + } + + private void subscribeToAlternate() { + AlternateSubscriber as = new AlternateSubscriber(child, arbiter); + ssub.set(as); + alternate.unsafeSubscribe(as); + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(T t) { + child.onError(new RuntimeException("switchEmpty used on a non empty observable. Possible fix is to add .ignoreElements() before .switchEmpty().")); + arbiter.produced(1); + } + } + + private static final class AlternateSubscriber extends Subscriber { + + private final ProducerArbiter arbiter; + private final Subscriber child; + + AlternateSubscriber(Subscriber child, ProducerArbiter arbiter) { + this.child = child; + this.arbiter = arbiter; + } + + @Override + public void setProducer(final Producer producer) { + arbiter.setProducer(producer); + } + + @Override + public void onCompleted() { + child.onCompleted(); + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(T t) { + child.onNext(t); + arbiter.produced(1); + } + } +} diff --git a/src/test/java/rx/internal/operators/OperatorSwitchEmptyTest.java b/src/test/java/rx/internal/operators/OperatorSwitchEmptyTest.java new file mode 100644 index 0000000000..9376f2c809 --- /dev/null +++ b/src/test/java/rx/internal/operators/OperatorSwitchEmptyTest.java @@ -0,0 +1,247 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import static org.junit.Assert.*; + +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; + +import rx.*; +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; +import rx.subscriptions.Subscriptions; + +public class OperatorSwitchEmptyTest { + + @Test(expected=RuntimeException.class) + public void testSwitchWhenNotEmpty() throws Exception { + final AtomicBoolean subscribed = new AtomicBoolean(false); + final Observable observable = Observable.just(4).switchEmpty(Observable.just(2) + .doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.set(true); + } + })); + + assertFalse(subscribed.get()); + observable.toBlocking().single(); + } + + @Test(expected = IllegalArgumentException.class) + public void testSwitchWhenError() throws Exception { + final AtomicBoolean subscribed = new AtomicBoolean(false); + final Observable observable = Observable.error(new IllegalArgumentException()).switchEmpty(Observable.just(2).doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.set(true); + } + })); + + try { + observable.toBlocking().single(); + } catch (Exception e) { + assertFalse(subscribed.get()); + throw e; + } + } + + @Test(expected = IllegalArgumentException.class) + public void testSwitchWhenAlternateError() throws Exception { + final AtomicBoolean subscribed = new AtomicBoolean(false); + final Observable observable = Observable.empty().switchEmpty(Observable. error(new IllegalArgumentException()).doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.set(true); + } + })); + + try { + observable.toBlocking().single(); + } catch (Exception e) { + assertTrue(subscribed.get()); + throw e; + } + } + + @Test + public void testSwitchWhenEmpty() throws Exception { + final Observable observable = Observable.empty().switchEmpty(Observable.from(Arrays.asList(42))); + + assertEquals(42, observable.toBlocking().single().intValue()); + } + + @Test + public void testSwitchWithProducer() throws Exception { + final AtomicBoolean emitted = new AtomicBoolean(false); + Observable withProducer = Observable.create(new Observable.OnSubscribe() { + @Override + public void call(final Subscriber subscriber) { + subscriber.setProducer(new Producer() { + @Override + public void request(long n) { + if (n > 0 && !emitted.get()) { + emitted.set(true); + subscriber.onNext(42L); + subscriber.onCompleted(); + } + } + }); + } + }); + + final Observable observable = Observable.empty().switchEmpty(withProducer); + assertEquals(42, observable.toBlocking().single().intValue()); + } + + @Test + public void testSwitchTriggerUnsubscribe() throws Exception { + final Subscription empty = Subscriptions.empty(); + + Observable withProducer = Observable.create(new Observable.OnSubscribe() { + @Override + public void call(final Subscriber subscriber) { + subscriber.add(empty); + subscriber.onNext(42L); + } + }); + + final Subscription sub = Observable.empty().switchEmpty(withProducer).lift(new Observable.Operator() { + @Override + public Subscriber call(final Subscriber child) { + return new Subscriber(child) { + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Long aLong) { + unsubscribe(); + } + }; + } + }).subscribe(); + + + assertTrue(empty.isUnsubscribed()); + assertTrue(sub.isUnsubscribed()); + } + + @Test + public void testSwitchShouldTriggerUnsubscribe() { + final Subscription s = Subscriptions.empty(); + + Observable.create(new Observable.OnSubscribe() { + @Override + public void call(final Subscriber subscriber) { + subscriber.add(s); + subscriber.onCompleted(); + } + }).switchEmpty(Observable.never()).subscribe(); + assertTrue(s.isUnsubscribed()); + } + + @Test + public void testSwitchRequestAlternativeObservableWithBackpressure() { + + TestSubscriber ts = new TestSubscriber() { + + @Override + public void onStart() { + request(1); + } + }; + Observable.empty().switchEmpty(Observable.just(1, 2, 3)).subscribe(ts); + + assertEquals(Arrays.asList(1), ts.getOnNextEvents()); + ts.assertNoErrors(); + ts.requestMore(1); + ts.assertValueCount(2); + ts.requestMore(1); + ts.assertValueCount(3); + } + @Test + public void testBackpressureNoRequest() { + TestSubscriber ts = new TestSubscriber() { + + @Override + public void onStart() { + request(0); + } + }; + Observable.empty().switchEmpty(Observable.just(1, 2, 3)).subscribe(ts); + assertTrue(ts.getOnNextEvents().isEmpty()); + ts.assertNoErrors(); + } + + @Test + public void testBackpressureOnFirstObservable() { + TestSubscriber ts = new TestSubscriber(0); + Observable.just(1,2,3).switchEmpty(Observable.just(4, 5, 6)).subscribe(ts); + ts.assertNotCompleted(); + ts.assertNoErrors(); + ts.assertNoValues(); + } + + @Test(timeout = 10000) + public void testRequestsNotLost() throws InterruptedException { + final TestSubscriber ts = new TestSubscriber(0); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber subscriber) { + subscriber.setProducer(new Producer() { + final AtomicBoolean completed = new AtomicBoolean(false); + @Override + public void request(long n) { + if (n > 0 && completed.compareAndSet(false, true)) { + Schedulers.io().createWorker().schedule(new Action0() { + @Override + public void call() { + subscriber.onCompleted(); + }}, 100, TimeUnit.MILLISECONDS); + } + }}); + }}) + .switchEmpty(Observable.from(Arrays.asList(1L, 2L, 3L))) + .subscribeOn(Schedulers.computation()) + .subscribe(ts); + ts.requestMore(0); + Thread.sleep(50); + //request while first observable is still finishing (as empty) + ts.requestMore(1); + ts.requestMore(1); + Thread.sleep(500); + ts.assertNotCompleted(); + ts.assertNoErrors(); + ts.assertValueCount(2); + ts.unsubscribe(); + } +} \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java b/src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java index 2534613ab4..af086ef5bd 100644 --- a/src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java +++ b/src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java @@ -49,6 +49,42 @@ public void call() { assertFalse(subscribed.get()); } + @Test(expected = IllegalArgumentException.class) + public void testSwitchWhenError() throws Exception { + final AtomicBoolean subscribed = new AtomicBoolean(false); + final Observable observable = Observable.error(new IllegalArgumentException()).switchIfEmpty(Observable.just(2).doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.set(true); + } + })); + + try { + observable.toBlocking().single(); + } catch (Exception e) { + assertFalse(subscribed.get()); + throw e; + } + } + + @Test(expected = IllegalArgumentException.class) + public void testSwitchWhenAlternateError() throws Exception { + final AtomicBoolean subscribed = new AtomicBoolean(false); + final Observable observable = Observable.empty().switchIfEmpty(Observable. error(new IllegalArgumentException()).doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.set(true); + } + })); + + try { + observable.toBlocking().single(); + } catch (Exception e) { + assertTrue(subscribed.get()); + throw e; + } + } + @Test public void testSwitchWhenEmpty() throws Exception { final Observable observable = Observable.empty().switchIfEmpty(Observable.from(Arrays.asList(42)));