diff --git a/rxjava-reactive-streams/src/main/java/rx/RxReactiveStreams.java b/rxjava-reactive-streams/src/main/java/rx/RxReactiveStreams.java index 1046bb3..9cc504c 100644 --- a/rxjava-reactive-streams/src/main/java/rx/RxReactiveStreams.java +++ b/rxjava-reactive-streams/src/main/java/rx/RxReactiveStreams.java @@ -16,8 +16,8 @@ package rx; import org.reactivestreams.Publisher; -import rx.internal.reactivestreams.PublisherAdapter; -import rx.internal.reactivestreams.SubscriberAdapter; + +import rx.internal.reactivestreams.*; /** * This type provides static factory methods for converting to and from RxJava types and Reactive Streams types. @@ -70,4 +70,66 @@ public static org.reactivestreams.Subscriber toSubscriber(final rx.Subscr return new SubscriberAdapter(rxSubscriber); } + /** + * Converts an RxJava Completable into a Publisher that emits only onError or onComplete. + * @param the target value type + * @param completable the Completable instance to convert + * @return the new Publisher instance + * @since 1.1 + * @throws NullPointerException if completable is null + */ + public static Publisher toPublisher(Completable completable) { + if (completable == null) { + throw new NullPointerException("completable"); + } + return new CompletableAsPublisher(completable); + } + + /** + * Converst a Publisher into a Completable by ignoring all onNext values and emitting + * onError or onComplete only. + * @param publisher the Publisher instance to convert + * @return the Completable instance + * @since 1.1 + * @throws NullPointerException if publisher is null + */ + public static Completable toCompletable(Publisher publisher) { + if (publisher == null) { + throw new NullPointerException("publisher"); + } + return Completable.create(new PublisherAsCompletable(publisher)); + } + + /** + * Converts a Single into a Publisher which emits an onNext+onComplete if + * the source Single signals a non-null onSuccess; or onError if the source signals + * onError(NullPointerException) or a null value. + * @param single the Single instance to convert + * @return the Publisher instance + * @since 1.1 + * @throws NullPointerException if single is null + */ + public static Publisher toPublisher(Single single) { + if (single == null) { + throw new NullPointerException("single"); + } + return new SingleAsPublisher(single); + } + + /** + * Converts a Publisher into a Single which emits onSuccess if the + * Publisher signals an onNext+onComplete; or onError if the publisher signals an + * onError, the source Publisher is empty (NoSuchElementException) or the + * source Publisher signals more than one onNext (IndexOutOfBoundsException). + * @param publisher the Publisher instance to convert + * @return the Single instance + * @since 1.1 + * @throws NullPointerException if publisher is null + */ + public static Single toSingle(Publisher publisher) { + if (publisher == null) { + throw new NullPointerException("publisher"); + } + return Single.create(new PublisherAsSingle(publisher)); + } } diff --git a/rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/CompletableAsPublisher.java b/rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/CompletableAsPublisher.java new file mode 100644 index 0000000..b275b23 --- /dev/null +++ b/rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/CompletableAsPublisher.java @@ -0,0 +1,78 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.reactivestreams; + +import org.reactivestreams.*; + +import rx.Completable; + +/** + * Wraps a Completable and exposes it as a Publisher. + * + * @param the value type of the publisher + */ +public final class CompletableAsPublisher implements Publisher { + + final Completable completable; + + public CompletableAsPublisher(Completable completable) { + this.completable = completable; + } + + @Override + public void subscribe(Subscriber s) { + completable.subscribe(new CompletableAsPublisherSubscriber(s)); + } + + static final class CompletableAsPublisherSubscriber + implements Completable.CompletableSubscriber, Subscription { + + final Subscriber actual; + + rx.Subscription d; + + public CompletableAsPublisherSubscriber(Subscriber actual) { + this.actual = actual; + } + + @Override + public void onSubscribe(rx.Subscription d) { + this.d = d; + actual.onSubscribe(this); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void onCompleted() { + actual.onComplete(); + } + + @Override + public void request(long n) { + // No values will be emitted + } + + @Override + public void cancel() { + d.unsubscribe(); + } + } +} diff --git a/rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/PublisherAsCompletable.java b/rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/PublisherAsCompletable.java new file mode 100644 index 0000000..fb2b1d4 --- /dev/null +++ b/rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/PublisherAsCompletable.java @@ -0,0 +1,86 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.reactivestreams; + +import org.reactivestreams.*; + +import rx.Completable.CompletableSubscriber; + +/** + * Wraps an arbitrary Publisher and exposes it as a Completable, ignoring any onNext events. + */ +public final class PublisherAsCompletable implements rx.Completable.CompletableOnSubscribe { + + final Publisher publisher; + + public PublisherAsCompletable(Publisher publisher) { + this.publisher = publisher; + } + + @Override + public void call(CompletableSubscriber t) { + publisher.subscribe(new PublisherAsCompletableSubscriber(t)); + } + + static final class PublisherAsCompletableSubscriber implements Subscriber, rx.Subscription { + + final CompletableSubscriber actual; + + Subscription s; + + volatile boolean unsubscribed; + + public PublisherAsCompletableSubscriber(CompletableSubscriber actual) { + this.actual = actual; + } + + @Override + public void onSubscribe(Subscription s) { + this.s = s; + actual.onSubscribe(this); + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Object t) { + // values are ignored + } + + @Override + public void onError(Throwable t) { + actual.onError(t); + } + + @Override + public void onComplete() { + actual.onCompleted(); + } + + @Override + public boolean isUnsubscribed() { + return unsubscribed; + } + + @Override + public void unsubscribe() { + if (!unsubscribed) { + unsubscribed = true; + s.cancel(); + } + } + } +} diff --git a/rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/PublisherAsSingle.java b/rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/PublisherAsSingle.java new file mode 100644 index 0000000..faa0ea8 --- /dev/null +++ b/rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/PublisherAsSingle.java @@ -0,0 +1,119 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.reactivestreams; + +import java.util.NoSuchElementException; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import rx.*; + +/** + * Wraps a Publisher and exposes it as a Single, signalling NoSuchElementException + * if the Publisher is empty or IndexOutOfBoundsExcepion if the Publisher produces + * more than one element. + * + * @param the value type + */ +public final class PublisherAsSingle implements Single.OnSubscribe { + + final Publisher publisher; + + public PublisherAsSingle(Publisher publisher) { + this.publisher = publisher; + } + + @Override + public void call(SingleSubscriber t) { + publisher.subscribe(new PublisherAsSingleSubscriber(t)); + } + + static final class PublisherAsSingleSubscriber implements Subscriber, rx.Subscription { + + final SingleSubscriber actual; + + Subscription s; + + T value; + + boolean hasValue; + + boolean done; + + public PublisherAsSingleSubscriber(SingleSubscriber actual) { + this.actual = actual; + } + + @Override + public void onSubscribe(Subscription s) { + this.s = s; + + actual.add(this); + + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + if (hasValue) { + done = true; + s.cancel(); + actual.onError(new IndexOutOfBoundsException("The source Publisher emitted multiple values")); + } else { + value = t; + hasValue = true; + } + } + + @Override + public void onError(Throwable t) { + if (done) { + return; + } + actual.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + if (hasValue) { + T v = value; + value = null; + actual.onSuccess(v); + } else { + actual.onError(new NoSuchElementException("The source Publisher was empty")); + } + } + + @Override + public boolean isUnsubscribed() { + return actual.isUnsubscribed(); + } + + @Override + public void unsubscribe() { + s.cancel(); + } + } +} diff --git a/rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/SingleAsPublisher.java b/rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/SingleAsPublisher.java new file mode 100644 index 0000000..cefe44c --- /dev/null +++ b/rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/SingleAsPublisher.java @@ -0,0 +1,143 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.reactivestreams; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import rx.*; + +/** + * Wraps a Single and exposes it as a Publisher. + * + * @param the value type + */ +public final class SingleAsPublisher implements Publisher { + + final Single single; + + public SingleAsPublisher(Single single) { + this.single = single; + } + + @Override + public void subscribe(Subscriber s) { + SingleAsPublisherSubscriber parent = new SingleAsPublisherSubscriber(s); + s.onSubscribe(parent); + + single.subscribe(parent); + } + + static final class SingleAsPublisherSubscriber extends SingleSubscriber + implements Subscription { + + final Subscriber actual; + + final AtomicInteger state; + + T value; + + volatile boolean cancelled; + + static final int NO_REQUEST_NO_VALUE = 0; + static final int NO_REQUEST_HAS_VALUE = 1; + static final int HAS_REQUEST_NO_VALUE = 2; + static final int HAS_REQUEST_HAS_VALUE = 3; + + public SingleAsPublisherSubscriber(Subscriber actual) { + this.actual = actual; + this.state = new AtomicInteger(); + } + + @Override + public void onSuccess(T value) { + if (cancelled) { + return; + } + if (value == null) { + state.lazySet(HAS_REQUEST_HAS_VALUE); + actual.onError(new NullPointerException("value")); + return; + } + for (;;) { + int s = state.get(); + + if (s == NO_REQUEST_HAS_VALUE || s == HAS_REQUEST_HAS_VALUE || cancelled) { + break; + } else + if (s == HAS_REQUEST_NO_VALUE) { + actual.onNext(value); + if (!cancelled) { + actual.onComplete(); + } + } else { + this.value = value; + if (state.compareAndSet(s, NO_REQUEST_HAS_VALUE)) { + break; + } + } + } + } + + @Override + public void onError(Throwable error) { + if (cancelled) { + return; + } + state.lazySet(HAS_REQUEST_HAS_VALUE); + actual.onError(error); + } + + @Override + public void request(long n) { + if (n > 0) { + for (;;) { + int s = state.get(); + if (s == HAS_REQUEST_HAS_VALUE || s == HAS_REQUEST_NO_VALUE || cancelled) { + break; + } else + if (s == NO_REQUEST_HAS_VALUE) { + if (state.compareAndSet(s, HAS_REQUEST_HAS_VALUE)) { + T v = value; + value = null; + + actual.onNext(v); + if (!cancelled) { + actual.onComplete(); + } + } + break; + } + } + } + } + + @Override + public void cancel() { + if (!cancelled) { + cancelled = true; + if (state.getAndSet(HAS_REQUEST_HAS_VALUE) == NO_REQUEST_HAS_VALUE) { + value = null; + } + unsubscribe(); + } + } + } +} diff --git a/rxjava-reactive-streams/src/test/java/rx/reactivestreams/test/CompletableAsPublisherTest.java b/rxjava-reactive-streams/src/test/java/rx/reactivestreams/test/CompletableAsPublisherTest.java new file mode 100644 index 0000000..00b0f0b --- /dev/null +++ b/rxjava-reactive-streams/src/test/java/rx/reactivestreams/test/CompletableAsPublisherTest.java @@ -0,0 +1,60 @@ +package rx.reactivestreams.test; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import rx.*; +import rx.subjects.PublishSubject; + +import static rx.RxReactiveStreams.*; + +public class CompletableAsPublisherTest { + + @Test(expectedExceptions = { NullPointerException.class }) + public void nullCheck() { + toPublisher((Completable)null); + } + + @Test + public void empty() { + RsSubscriber ts = new RsSubscriber(); + + toPublisher(Completable.complete()).subscribe(ts); + + Assert.assertTrue(ts.complete); + Assert.assertNull(ts.error); + Assert.assertTrue(ts.received.isEmpty()); + } + + @Test + public void error() { + RsSubscriber ts = new RsSubscriber(); + + toPublisher(Completable.error(new RuntimeException("Forced failure"))).subscribe(ts); + + Assert.assertFalse(ts.complete); + Assert.assertNotNull(ts.error); + Assert.assertTrue(ts.error instanceof RuntimeException); + Assert.assertEquals(ts.error.getMessage(), "Forced failure"); + Assert.assertTrue(ts.received.isEmpty()); + } + + @Test + public void cancellation() { + RsSubscriber ts = new RsSubscriber(); + + PublishSubject ps = PublishSubject.create(); + + toPublisher(ps.toCompletable()).subscribe(ts); + + Assert.assertTrue(ps.hasObservers()); + + ts.subscription.cancel(); + + Assert.assertFalse(ts.complete); + Assert.assertNull(ts.error); + Assert.assertTrue(ts.received.isEmpty()); + + Assert.assertFalse(ps.hasObservers()); + } +} diff --git a/rxjava-reactive-streams/src/test/java/rx/reactivestreams/test/PublisherAsCompletableTest.java b/rxjava-reactive-streams/src/test/java/rx/reactivestreams/test/PublisherAsCompletableTest.java new file mode 100644 index 0000000..bb1b2e6 --- /dev/null +++ b/rxjava-reactive-streams/src/test/java/rx/reactivestreams/test/PublisherAsCompletableTest.java @@ -0,0 +1,103 @@ +package rx.reactivestreams.test; + +import static rx.RxReactiveStreams.*; + +import org.reactivestreams.*; +import org.testng.Assert; +import org.testng.annotations.Test; + +import rx.observers.TestSubscriber; +import rx.subjects.PublishSubject; + +public class PublisherAsCompletableTest { + + @Test(expectedExceptions = { NullPointerException.class }) + public void nullCheck() { + toCompletable(null); + } + + @Test + public void empty() { + TestSubscriber ts = new TestSubscriber(); + + toCompletable(new PublisherEmpty()).subscribe(ts); + + ts.assertNoValues(); + ts.assertCompleted(); + ts.assertNoErrors(); + } + + @Test + public void error() { + TestSubscriber ts = new TestSubscriber(); + + toCompletable(new PublisherFail()).subscribe(ts); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(RuntimeException.class); + Assert.assertEquals(ts.getOnErrorEvents().get(0).getMessage(), "Forced failure"); + } + + @Test + public void cancellation() { + PublishSubject ps = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + toCompletable(toPublisher(ps)).subscribe(ts); + + Assert.assertTrue(ps.hasObservers()); + + ts.unsubscribe(); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertNoErrors(); + + Assert.assertFalse(ps.hasObservers()); + } + + static final class PublisherEmpty implements Publisher { + @Override + public void subscribe(Subscriber s) { + final boolean[] cancelled = { false }; + s.onSubscribe(new Subscription() { + @Override + public void request(long n) { + + } + + @Override + public void cancel() { + cancelled[0] = true; + } + }); + if (!cancelled[0]) { + s.onComplete(); + } + } + } + + static final class PublisherFail implements Publisher { + @Override + public void subscribe(Subscriber s) { + final boolean[] cancelled = { false }; + s.onSubscribe(new Subscription() { + @Override + public void request(long n) { + + } + + @Override + public void cancel() { + cancelled[0] = true; + } + }); + if (!cancelled[0]) { + s.onError(new RuntimeException("Forced failure")); + } + } + } + +} diff --git a/rxjava-reactive-streams/src/test/java/rx/reactivestreams/test/PublisherAsSingleTest.java b/rxjava-reactive-streams/src/test/java/rx/reactivestreams/test/PublisherAsSingleTest.java new file mode 100644 index 0000000..44fc77f --- /dev/null +++ b/rxjava-reactive-streams/src/test/java/rx/reactivestreams/test/PublisherAsSingleTest.java @@ -0,0 +1,85 @@ +package rx.reactivestreams.test; + +import static rx.RxReactiveStreams.*; + +import java.util.NoSuchElementException; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import rx.*; +import rx.observers.TestSubscriber; +import rx.reactivestreams.test.PublisherAsCompletableTest.*; +import rx.subjects.PublishSubject; + +public class PublisherAsSingleTest { + + @Test(expectedExceptions = { NullPointerException.class }) + public void nullCheck() { + toPublisher((Single)null); + } + + @Test + public void just() { + TestSubscriber ts = new TestSubscriber(); + + toSingle(toPublisher(Observable.just(1))).subscribe(ts); + + ts.assertValue(1); + ts.assertCompleted(); + ts.assertNoErrors(); + } + + @Test + public void empty() { + TestSubscriber ts = new TestSubscriber(); + + toSingle(new PublisherEmpty()).subscribe(ts); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(NoSuchElementException.class); + } + + @Test + public void range() { + TestSubscriber ts = new TestSubscriber(); + + toSingle(toPublisher(Observable.range(1, 2))).subscribe(ts); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(IndexOutOfBoundsException.class); + } + + @Test + public void error() { + TestSubscriber ts = new TestSubscriber(); + + toSingle(new PublisherFail()).subscribe(ts); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(RuntimeException.class); + Assert.assertEquals(ts.getOnErrorEvents().get(0).getMessage(), "Forced failure"); + } + + @Test + public void cancellation() { + PublishSubject ps = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + toSingle(toPublisher(ps)).subscribe(ts); + + Assert.assertTrue(ps.hasObservers()); + + ts.unsubscribe(); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertNoErrors(); + + Assert.assertFalse(ps.hasObservers()); + } +} diff --git a/rxjava-reactive-streams/src/test/java/rx/reactivestreams/test/SingleAsPublisherTest.java b/rxjava-reactive-streams/src/test/java/rx/reactivestreams/test/SingleAsPublisherTest.java new file mode 100644 index 0000000..7799f28 --- /dev/null +++ b/rxjava-reactive-streams/src/test/java/rx/reactivestreams/test/SingleAsPublisherTest.java @@ -0,0 +1,79 @@ +package rx.reactivestreams.test; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import rx.*; +import rx.subjects.PublishSubject; + +import static rx.RxReactiveStreams.*; + +public class SingleAsPublisherTest { + + @Test(expectedExceptions = { NullPointerException.class }) + public void nullCheck() { + toPublisher((Single)null); + } + + @Test + public void just() { + RsSubscriber ts = new RsSubscriber(); + + toPublisher(Single.just(1)).subscribe(ts); + + Assert.assertFalse(ts.complete); + Assert.assertNull(ts.error); + Assert.assertTrue(ts.received.isEmpty()); + + ts.subscription.request(1); + + Assert.assertTrue(ts.complete); + Assert.assertNull(ts.error); + Assert.assertEquals(ts.received.size(), 1); + Assert.assertEquals(ts.received.poll(), 1); + } + + @Test + public void justNull() { + RsSubscriber ts = new RsSubscriber(); + + toPublisher(Single.just(null)).subscribe(ts); + + Assert.assertFalse(ts.complete); + Assert.assertNotNull(ts.error); + Assert.assertTrue(ts.error instanceof NullPointerException); + Assert.assertTrue(ts.received.isEmpty()); + } + + @Test + public void error() { + RsSubscriber ts = new RsSubscriber(); + + toPublisher(Single.error(new RuntimeException("Forced failure"))).subscribe(ts); + + Assert.assertFalse(ts.complete); + Assert.assertNotNull(ts.error); + Assert.assertTrue(ts.error instanceof RuntimeException); + Assert.assertEquals(ts.error.getMessage(), "Forced failure"); + Assert.assertTrue(ts.received.isEmpty()); + } + + @Test + public void cancellation() { + RsSubscriber ts = new RsSubscriber(); + + PublishSubject ps = PublishSubject.create(); + + toPublisher(ps.toSingle()).subscribe(ts); + + Assert.assertTrue(ps.hasObservers()); + + ts.subscription.cancel(); + + Assert.assertFalse(ts.complete); + Assert.assertNull(ts.error); + Assert.assertTrue(ts.received.isEmpty()); + + Assert.assertFalse(ps.hasObservers()); + } +}