Skip to content

Add Single.zip() for Iterable of Singles #3539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 9, 2015

Conversation

artem-zinnatullin
Copy link
Contributor

No description provided.

* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
public static <R> Single<R> zip(Iterable<? extends Single<?>> singles, FuncN<? extends R> zipFunction) {
List<Observable<?>> listOfSingles = new ArrayList<Observable<?>>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although not required, I prefer iterating such sources when the child subscribes and not when the sequence is assembled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I am afraid that iterating over sequence when child subscribes may lead to data inconsistency if user will pass the collection of Singles and then change it before subscribe().

@akarnokd
Copy link
Member

Did you consider adding zip(Single[], FuncN) overload as well?

Otherwise, looks good. 👍

@artem-zinnatullin
Copy link
Contributor Author

I'll add zin(Single[], FuncN) and zipWith() in separate PRs if you don't mind.

Btw, idk if @benjchristensen remembers how I was arguing against adding Single to the RxJava. My apologies 😅. I'm starting loving Single in our codebase!

@akarnokd
Copy link
Member

Here is an implementation that avoids conversion to Observable:

@SuppressWarnings("unchecked")
public static <T, R> Single<R> zip(
        Iterable<? extends Single<? extends T>> singles, 
                FuncN<? extends R> zipper) {
    return Single.create(f -> {
        Single<? extends T>[] sa;
        int count;
        if (singles instanceof List) {
            List<? extends Single<? extends T>> list = 
                    (List<? extends Single<? extends T>>)singles;
            count = list.size();
            sa = list.toArray(new Single[count]);
        } else {
            sa = new Single[8];
            count = 0;
            for (Single<? extends T> s : singles) {
                if (count == sa.length) {
                    Single<? extends T>[] sb = new Single[count + (count >> 2)];
                    System.arraycopy(sa, 0, sb, 0, count);
                    sa = sb;
                }
                sa[count] = s;
                count++;
            }
        }

        final AtomicInteger wip = new AtomicInteger(count);
        final AtomicBoolean once = new AtomicBoolean();
        final Object[] values = new Object[count];

        CompositeSubscription csub = new CompositeSubscription();
        f.add(csub);

        for (int i = 0; i < count; i++) {
            if (csub.isUnsubscribed() || once.get()) {
                break;
            }

            final int j = i;
            SingleSubscriber<T> te = new SingleSubscriber<T>() {
                @Override
                public void onSuccess(T value) {
                    values[j] = value;
                    if (wip.decrementAndGet() == 0) {
                        R r;

                        try {
                            r = zipper.call(values);
                        } catch (Throwable e) {
                            Exceptions.throwIfFatal(e);
                            f.onError(e);
                            return;
                        }

                        f.onSuccess(r);
                    }
                }

                @Override
                public void onError(Throwable error) {
                    if (once.compareAndSet(false, true)) {
                        f.onError(error);
                    } else {
                        RxJavaPlugins.getInstance()
                        .getErrorHandler().handleError(error);
                    }
                }
            };
            csub.add(te);
            if (csub.isUnsubscribed() || once.get()) {
                break;
            }
            sa[i].subscribe(te);
        }
    });
}

@artem-zinnatullin artem-zinnatullin force-pushed the single-zip-iterable branch 4 times, most recently from c40010a to 0dbdc1b Compare November 29, 2015 14:20
@artem-zinnatullin
Copy link
Contributor Author

@akarnokd I've used your code (rewrote some parts and fixed one bug) and now we don't have to convert Singles to Observables!

Later we can switch other Single.zip overloads to SingleOperatorZip.

r = zipper.call(values);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
subscriber.onError(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be just this.onError to avoid error races.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@artem-zinnatullin
Copy link
Contributor Author

@akarnokd changed instanceof List to instanceof Collection so we are now really efficient for all collections and fall back to slightly less fast code for other Iterables, rebased.

PTAL

*/
@SuppressWarnings("unchecked")
public static <R> Single<R> zip(Iterable<? extends Single<?>> singles, FuncN<? extends R> zipFunction) {
return SingleOperatorZip.zip(iterableToArray(singles), zipFunction);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterables and Observables are mainly for deferred data streaming. This will iterate at assembly time instead on Subscription time, losing the duality aspects. For example, an Observable source turned into an Iterable via toBlocking().getIterable() is no longer lazy when this zip is applied.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is compatible with the behavior of Observable.zip.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep currently Observable serializes all values in the iterable into an array before zipping them so you can't append after lifting. IMO this is probably intentional since appending to some iterables while iterating results in ConcurrentModificationExceptions. Today the Observable<Observable<T>> overload can receive new observables to zip but the zip function won't be called until the outer observable calls onCompleted(). This is slightly later than the Iterable overload.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, @akarnokd, @stealthcode let's use current implementation from this PR to be consistent with Observable.zip(Iterable<Observable>)?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

@artem-zinnatullin
Copy link
Contributor Author

Rebased!

@artem-zinnatullin
Copy link
Contributor Author

Blocked by #3569.

@akarnokd
Copy link
Member

akarnokd commented Dec 9, 2015

This needs to be rebased again.

@akarnokd
Copy link
Member

akarnokd commented Dec 9, 2015

👍

1 similar comment
@stealthcode
Copy link

👍

stealthcode pushed a commit that referenced this pull request Dec 9, 2015
Add Single.zip() for Iterable of Singles
@stealthcode stealthcode merged commit 2af58cd into ReactiveX:1.x Dec 9, 2015
@stevegury
Copy link
Member

👍 (for posterity)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants