diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 776026fdb4..3f27cb0a83 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -195,8 +195,28 @@ public Observable compose(Transformer transformer public interface Transformer extends Func1, Observable> { // cover for generics insanity } - - + + /** + * Returns a Single that emits the single item emitted by the source Observable, if that Observable + * emits only a single item. If the source Observable emits more than one item or no items, notify of an + * {@code IllegalArgumentException} or {@code NoSuchElementException} respectively. + *

+ *

+ *
Scheduler:
+ *
{@code toSingle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return a Single that emits the single item emitted by the source Observable + * @throws IllegalArgumentException + * if the source observable emits more than one item + * @throws NoSuchElementException + * if the source observable emits no items + */ + @Experimental + public Single toSingle() { + return new Single(OnSubscribeSingle.create(this)); + } + /* ********************************************************************************************************* * Operators Below Here diff --git a/src/main/java/rx/internal/operators/OnSubscribeSingle.java b/src/main/java/rx/internal/operators/OnSubscribeSingle.java new file mode 100644 index 0000000000..63d4d0a49a --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeSingle.java @@ -0,0 +1,89 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import rx.Observable; +import rx.Single; +import rx.SingleSubscriber; +import rx.Subscriber; + +import java.util.NoSuchElementException; + +/** + * Allows conversion of an Observable to a Single ensuring that exactly one item is emitted - no more and no less. + * Also forwards errors as appropriate. + */ +public class OnSubscribeSingle implements Single.OnSubscribe { + + private final Observable observable; + + public OnSubscribeSingle(Observable observable) { + this.observable = observable; + } + + @Override + public void call(final SingleSubscriber child) { + Subscriber parent = new Subscriber() { + private boolean emittedTooMany = false; + private boolean itemEmitted = false; + private T emission = null; + + @Override + public void onStart() { + // We request 2 here since we need 1 for the single and 1 to check that the observable + // doesn't emit more than one item + request(2); + } + + @Override + public void onCompleted() { + if (emittedTooMany) { + // Don't need to do anything here since we already sent an error downstream + } else { + if (itemEmitted) { + child.onSuccess(emission); + } else { + child.onError(new NoSuchElementException("Observable emitted no items")); + } + } + } + + @Override + public void onError(Throwable e) { + child.onError(e); + unsubscribe(); + } + + @Override + public void onNext(T t) { + if (itemEmitted) { + emittedTooMany = true; + child.onError(new IllegalArgumentException("Observable emitted too many elements")); + unsubscribe(); + } else { + itemEmitted = true; + emission = t; + } + } + }; + child.add(parent); + observable.subscribe(parent); + } + + public static OnSubscribeSingle create(Observable observable) { + return new OnSubscribeSingle(observable); + } +} diff --git a/src/test/java/rx/internal/operators/OnSubscribeSingleTest.java b/src/test/java/rx/internal/operators/OnSubscribeSingleTest.java new file mode 100644 index 0000000000..6bc24dbe75 --- /dev/null +++ b/src/test/java/rx/internal/operators/OnSubscribeSingleTest.java @@ -0,0 +1,73 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import org.junit.Test; +import rx.Observable; +import rx.Single; +import rx.observers.TestSubscriber; + +import java.util.Collections; +import java.util.NoSuchElementException; + +public class OnSubscribeSingleTest { + + @Test + public void testJustSingleItemObservable() { + TestSubscriber subscriber = TestSubscriber.create(); + Single single = Observable.just("Hello World!").toSingle(); + single.subscribe(subscriber); + + subscriber.assertReceivedOnNext(Collections.singletonList("Hello World!")); + } + + @Test + public void testErrorObservable() { + TestSubscriber subscriber = TestSubscriber.create(); + IllegalArgumentException error = new IllegalArgumentException("Error"); + Single single = Observable.error(error).toSingle(); + single.subscribe(subscriber); + + subscriber.assertError(error); + } + + @Test + public void testJustTwoEmissionsObservableThrowsError() { + TestSubscriber subscriber = TestSubscriber.create(); + Single single = Observable.just("First", "Second").toSingle(); + single.subscribe(subscriber); + + subscriber.assertError(IllegalArgumentException.class); + } + + @Test + public void testEmptyObservable() { + TestSubscriber subscriber = TestSubscriber.create(); + Single single = Observable.empty().toSingle(); + single.subscribe(subscriber); + + subscriber.assertError(NoSuchElementException.class); + } + + @Test + public void testRepeatObservableThrowsError() { + TestSubscriber subscriber = TestSubscriber.create(); + Single single = Observable.just("First", "Second").repeat().toSingle(); + single.subscribe(subscriber); + + subscriber.assertError(IllegalArgumentException.class); + } +}