Skip to content

Single and Completable converters #150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 64 additions & 2 deletions rxjava-reactive-streams/src/main/java/rx/RxReactiveStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package rx;

import org.reactivestreams.Publisher;
import rx.internal.reactivestreams.PublisherAdapter;
import rx.internal.reactivestreams.SubscriberAdapter;

import rx.internal.reactivestreams.*;

/**
* This type provides static factory methods for converting to and from RxJava types and Reactive Streams types.
Expand Down Expand Up @@ -70,4 +70,66 @@ public static <T> org.reactivestreams.Subscriber<T> toSubscriber(final rx.Subscr
return new SubscriberAdapter<T>(rxSubscriber);
}

/**
* Converts an RxJava Completable into a Publisher that emits only onError or onComplete.
* @param <T> the target value type
* @param completable the Completable instance to convert
* @return the new Publisher instance
* @since 1.1
* @throws NullPointerException if completable is null
*/
public static <T> Publisher<T> toPublisher(Completable completable) {
if (completable == null) {
throw new NullPointerException("completable");
}
return new CompletableAsPublisher<T>(completable);
}

/**
* Converst a Publisher into a Completable by ignoring all onNext values and emitting
* onError or onComplete only.
* @param publisher the Publisher instance to convert
* @return the Completable instance
* @since 1.1
* @throws NullPointerException if publisher is null
*/
public static Completable toCompletable(Publisher<?> publisher) {
if (publisher == null) {
throw new NullPointerException("publisher");
}
return Completable.create(new PublisherAsCompletable(publisher));
}

/**
* Converts a Single into a Publisher which emits an onNext+onComplete if
* the source Single signals a non-null onSuccess; or onError if the source signals
* onError(NullPointerException) or a null value.
* @param single the Single instance to convert
* @return the Publisher instance
* @since 1.1
* @throws NullPointerException if single is null
*/
public static <T> Publisher<T> toPublisher(Single<T> single) {
if (single == null) {
throw new NullPointerException("single");
}
return new SingleAsPublisher<T>(single);
}

/**
* Converts a Publisher into a Single which emits onSuccess if the
* Publisher signals an onNext+onComplete; or onError if the publisher signals an
* onError, the source Publisher is empty (NoSuchElementException) or the
* source Publisher signals more than one onNext (IndexOutOfBoundsException).
* @param publisher the Publisher instance to convert
* @return the Single instance
* @since 1.1
* @throws NullPointerException if publisher is null
*/
public static <T> Single<T> toSingle(Publisher<T> publisher) {
if (publisher == null) {
throw new NullPointerException("publisher");
}
return Single.create(new PublisherAsSingle<T>(publisher));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.internal.reactivestreams;

import org.reactivestreams.*;

import rx.Completable;

/**
* Wraps a Completable and exposes it as a Publisher.
*
* @param <T> the value type of the publisher
*/
public final class CompletableAsPublisher<T> implements Publisher<T> {

final Completable completable;

public CompletableAsPublisher(Completable completable) {
this.completable = completable;
}

@Override
public void subscribe(Subscriber<? super T> s) {
completable.subscribe(new CompletableAsPublisherSubscriber<T>(s));
}

static final class CompletableAsPublisherSubscriber<T>
implements Completable.CompletableSubscriber, Subscription {

final Subscriber<? super T> actual;

rx.Subscription d;

public CompletableAsPublisherSubscriber(Subscriber<? super T> actual) {
this.actual = actual;
}

@Override
public void onSubscribe(rx.Subscription d) {
this.d = d;
actual.onSubscribe(this);
}

@Override
public void onError(Throwable e) {
actual.onError(e);
}

@Override
public void onCompleted() {
actual.onComplete();
}

@Override
public void request(long n) {
// No values will be emitted
}

@Override
public void cancel() {
d.unsubscribe();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.internal.reactivestreams;

import org.reactivestreams.*;

import rx.Completable.CompletableSubscriber;

/**
* Wraps an arbitrary Publisher and exposes it as a Completable, ignoring any onNext events.
*/
public final class PublisherAsCompletable implements rx.Completable.CompletableOnSubscribe {

final Publisher<?> publisher;

public PublisherAsCompletable(Publisher<?> publisher) {
this.publisher = publisher;
}

@Override
public void call(CompletableSubscriber t) {
publisher.subscribe(new PublisherAsCompletableSubscriber(t));
}

static final class PublisherAsCompletableSubscriber implements Subscriber<Object>, rx.Subscription {

final CompletableSubscriber actual;

Subscription s;

volatile boolean unsubscribed;

public PublisherAsCompletableSubscriber(CompletableSubscriber actual) {
this.actual = actual;
}

@Override
public void onSubscribe(Subscription s) {
this.s = s;
actual.onSubscribe(this);
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Object t) {
// values are ignored
}

@Override
public void onError(Throwable t) {
actual.onError(t);
}

@Override
public void onComplete() {
actual.onCompleted();
}

@Override
public boolean isUnsubscribed() {
return unsubscribed;
}

@Override
public void unsubscribe() {
if (!unsubscribed) {
unsubscribed = true;
s.cancel();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.internal.reactivestreams;

import java.util.NoSuchElementException;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import rx.*;

/**
* Wraps a Publisher and exposes it as a Single, signalling NoSuchElementException
* if the Publisher is empty or IndexOutOfBoundsExcepion if the Publisher produces
* more than one element.
*
* @param <T> the value type
*/
public final class PublisherAsSingle<T> implements Single.OnSubscribe<T> {

final Publisher<T> publisher;

public PublisherAsSingle(Publisher<T> publisher) {
this.publisher = publisher;
}

@Override
public void call(SingleSubscriber<? super T> t) {
publisher.subscribe(new PublisherAsSingleSubscriber<T>(t));
}

static final class PublisherAsSingleSubscriber<T> implements Subscriber<T>, rx.Subscription {

final SingleSubscriber<? super T> actual;

Subscription s;

T value;

boolean hasValue;

boolean done;

public PublisherAsSingleSubscriber(SingleSubscriber<? super T> actual) {
this.actual = actual;
}

@Override
public void onSubscribe(Subscription s) {
this.s = s;

actual.add(this);

s.request(Long.MAX_VALUE);
}

@Override
public void onNext(T t) {
if (done) {
return;
}
if (hasValue) {
done = true;
s.cancel();
actual.onError(new IndexOutOfBoundsException("The source Publisher emitted multiple values"));
} else {
value = t;
hasValue = true;
}
}

@Override
public void onError(Throwable t) {
if (done) {
return;
}
actual.onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
if (hasValue) {
T v = value;
value = null;
actual.onSuccess(v);
} else {
actual.onError(new NoSuchElementException("The source Publisher was empty"));
}
}

@Override
public boolean isUnsubscribed() {
return actual.isUnsubscribed();
}

@Override
public void unsubscribe() {
s.cancel();
}
}
}
Loading