Skip to content

New experimental operator to switch the types on the completion of empty Observables. #3056

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3908,6 +3908,26 @@ public final Observable<T> switchIfEmpty(Observable<? extends T> alternate) {
return lift(new OperatorSwitchIfEmpty<T>(alternate));
}

/**
* Returns an Observable that only emits the items emitted by the alternate Observable if the source Observable
* is empty. If the source Observable errors or emit any values then an error is propagated and alternate Observable is
* never subscribed to.
* <p/>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchEmpty} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param alternate
* the alternate Observable to subscribe to if the source does not emit any items
* @return an Observable that only emits the items of an alternate Observable if the source Observable is empty.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final <R> Observable<R> switchEmpty(Observable<? extends R> alternate) {
return lift(new OperatorSwitchEmpty<R, T>(alternate));
}

/**
* Returns an Observable that delays the subscription to and emissions from the souce Observable via another
* Observable on a per-item basis.
Expand Down
121 changes: 121 additions & 0 deletions src/main/java/rx/internal/operators/OperatorSwitchEmpty.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;


import rx.*;
import rx.internal.producers.ProducerArbiter;
import rx.subscriptions.SerialSubscription;

/**
* If the Observable completes without emitting any items, subscribe to an alternate Observable. Allows for similar
* functionality to {@link Observable#cast(Class)} followed by {@link Observable#concatWith(Observable)} except it
* errors if the source Observable is not empty.
*/
public final class OperatorSwitchEmpty<R, T> implements Observable.Operator<R, T> {
private final Observable<? extends R> alternate;

public OperatorSwitchEmpty(Observable<? extends R> alternate) {
this.alternate = alternate;
}

@Override
public Subscriber<? super T> call(Subscriber<? super R> child) {
final SerialSubscription ssub = new SerialSubscription();
ProducerArbiter arbiter = new ProducerArbiter();
final ParentSubscriber<R, T> parent = new ParentSubscriber<R, T>(child, ssub, arbiter, alternate);
ssub.set(parent);
child.add(ssub);
child.setProducer(arbiter);
return parent;
}

private static final class ParentSubscriber<R, T> extends Subscriber<T> {

private final Subscriber<? super R> child;
private final SerialSubscription ssub;
private final ProducerArbiter arbiter;
private final Observable<? extends R> alternate;

ParentSubscriber(Subscriber<? super R> child, final SerialSubscription ssub, ProducerArbiter arbiter, Observable<? extends R> alternate) {
this.child = child;
this.ssub = ssub;
this.arbiter = arbiter;
this.alternate = alternate;
}

@Override
public void setProducer(final Producer producer) {
arbiter.setProducer(producer);
}

@Override
public void onCompleted() {
if (!child.isUnsubscribed()) {
subscribeToAlternate();
}
}

private void subscribeToAlternate() {
AlternateSubscriber<R> as = new AlternateSubscriber<R>(child, arbiter);
ssub.set(as);
alternate.unsafeSubscribe(as);
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(T t) {
child.onError(new RuntimeException("switchEmpty used on a non empty observable. Possible fix is to add .ignoreElements() before .switchEmpty()."));
arbiter.produced(1);
}
}

private static final class AlternateSubscriber<T> extends Subscriber<T> {

private final ProducerArbiter arbiter;
private final Subscriber<? super T> child;

AlternateSubscriber(Subscriber<? super T> child, ProducerArbiter arbiter) {
this.child = child;
this.arbiter = arbiter;
}

@Override
public void setProducer(final Producer producer) {
arbiter.setProducer(producer);
}

@Override
public void onCompleted() {
child.onCompleted();
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(T t) {
child.onNext(t);
arbiter.produced(1);
}
}
}
247 changes: 247 additions & 0 deletions src/test/java/rx/internal/operators/OperatorSwitchEmptyTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import static org.junit.Assert.*;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;

import rx.*;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;

public class OperatorSwitchEmptyTest {

@Test(expected=RuntimeException.class)
public void testSwitchWhenNotEmpty() throws Exception {
final AtomicBoolean subscribed = new AtomicBoolean(false);
final Observable<Integer> observable = Observable.just(4).switchEmpty(Observable.just(2)
.doOnSubscribe(new Action0() {
@Override
public void call() {
subscribed.set(true);
}
}));

assertFalse(subscribed.get());
observable.toBlocking().single();
}

@Test(expected = IllegalArgumentException.class)
public void testSwitchWhenError() throws Exception {
final AtomicBoolean subscribed = new AtomicBoolean(false);
final Observable<Integer> observable = Observable.error(new IllegalArgumentException()).switchEmpty(Observable.just(2).doOnSubscribe(new Action0() {
@Override
public void call() {
subscribed.set(true);
}
}));

try {
observable.toBlocking().single();
} catch (Exception e) {
assertFalse(subscribed.get());
throw e;
}
}

@Test(expected = IllegalArgumentException.class)
public void testSwitchWhenAlternateError() throws Exception {
final AtomicBoolean subscribed = new AtomicBoolean(false);
final Observable<Integer> observable = Observable.empty().switchEmpty(Observable.<Integer> error(new IllegalArgumentException()).doOnSubscribe(new Action0() {
@Override
public void call() {
subscribed.set(true);
}
}));

try {
observable.toBlocking().single();
} catch (Exception e) {
assertTrue(subscribed.get());
throw e;
}
}

@Test
public void testSwitchWhenEmpty() throws Exception {
final Observable<Integer> observable = Observable.<Integer>empty().switchEmpty(Observable.from(Arrays.asList(42)));

assertEquals(42, observable.toBlocking().single().intValue());
}

@Test
public void testSwitchWithProducer() throws Exception {
final AtomicBoolean emitted = new AtomicBoolean(false);
Observable<Long> withProducer = Observable.create(new Observable.OnSubscribe<Long>() {
@Override
public void call(final Subscriber<? super Long> subscriber) {
subscriber.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0 && !emitted.get()) {
emitted.set(true);
subscriber.onNext(42L);
subscriber.onCompleted();
}
}
});
}
});

final Observable<Long> observable = Observable.<Long>empty().switchEmpty(withProducer);
assertEquals(42, observable.toBlocking().single().intValue());
}

@Test
public void testSwitchTriggerUnsubscribe() throws Exception {
final Subscription empty = Subscriptions.empty();

Observable<Long> withProducer = Observable.create(new Observable.OnSubscribe<Long>() {
@Override
public void call(final Subscriber<? super Long> subscriber) {
subscriber.add(empty);
subscriber.onNext(42L);
}
});

final Subscription sub = Observable.<Long>empty().switchEmpty(withProducer).lift(new Observable.Operator<Long, Long>() {
@Override
public Subscriber<? super Long> call(final Subscriber<? super Long> child) {
return new Subscriber<Long>(child) {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Long aLong) {
unsubscribe();
}
};
}
}).subscribe();


assertTrue(empty.isUnsubscribed());
assertTrue(sub.isUnsubscribed());
}

@Test
public void testSwitchShouldTriggerUnsubscribe() {
final Subscription s = Subscriptions.empty();

Observable.create(new Observable.OnSubscribe<Long>() {
@Override
public void call(final Subscriber<? super Long> subscriber) {
subscriber.add(s);
subscriber.onCompleted();
}
}).switchEmpty(Observable.<Long>never()).subscribe();
assertTrue(s.isUnsubscribed());
}

@Test
public void testSwitchRequestAlternativeObservableWithBackpressure() {

TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {

@Override
public void onStart() {
request(1);
}
};
Observable.<Integer>empty().switchEmpty(Observable.just(1, 2, 3)).subscribe(ts);

assertEquals(Arrays.asList(1), ts.getOnNextEvents());
ts.assertNoErrors();
ts.requestMore(1);
ts.assertValueCount(2);
ts.requestMore(1);
ts.assertValueCount(3);
}
@Test
public void testBackpressureNoRequest() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {

@Override
public void onStart() {
request(0);
}
};
Observable.<Integer>empty().switchEmpty(Observable.just(1, 2, 3)).subscribe(ts);
assertTrue(ts.getOnNextEvents().isEmpty());
ts.assertNoErrors();
}

@Test
public void testBackpressureOnFirstObservable() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0);
Observable.just(1,2,3).switchEmpty(Observable.just(4, 5, 6)).subscribe(ts);
ts.assertNotCompleted();
ts.assertNoErrors();
ts.assertNoValues();
}

@Test(timeout = 10000)
public void testRequestsNotLost() throws InterruptedException {
final TestSubscriber<Long> ts = new TestSubscriber<Long>(0);
Observable.create(new OnSubscribe<Long>() {

@Override
public void call(final Subscriber<? super Long> subscriber) {
subscriber.setProducer(new Producer() {
final AtomicBoolean completed = new AtomicBoolean(false);
@Override
public void request(long n) {
if (n > 0 && completed.compareAndSet(false, true)) {
Schedulers.io().createWorker().schedule(new Action0() {
@Override
public void call() {
subscriber.onCompleted();
}}, 100, TimeUnit.MILLISECONDS);
}
}});
}})
.switchEmpty(Observable.from(Arrays.asList(1L, 2L, 3L)))
.subscribeOn(Schedulers.computation())
.subscribe(ts);
ts.requestMore(0);
Thread.sleep(50);
//request while first observable is still finishing (as empty)
ts.requestMore(1);
ts.requestMore(1);
Thread.sleep(500);
ts.assertNotCompleted();
ts.assertNoErrors();
ts.assertValueCount(2);
ts.unsubscribe();
}
}
Loading