diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 7754f87c43..aadab8fadb 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -785,6 +785,31 @@ public static Observable combineLates Func9 combineFunction) { return combineLatest(Arrays.asList(o1, o2, o3, o4, o5, o6, o7, o8, o9), Functions.fromFunc(combineFunction)); } + /** + * Combines a collection of source Observables by emitting an item that aggregates the latest values of each of + * the source Observables each time an item is received from any of the source Observables, where this + * aggregation is defined by a specified function. + *
+ *
Scheduler:
+ *
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the common base type of source values + * @param + * the result type + * @param sources + * the collection of source Observables + * @param combineFunction + * the aggregation function used to combine the items emitted by the source Observables + * @return an Observable that emits items that are the result of combining the items emitted by the source + * Observables by means of the given aggregation function + * @see ReactiveX operators documentation: CombineLatest + */ + public static Observable combineLatest(Collection> sources, FuncN combineFunction) { + return create(new OnSubscribeCombineLatest(sources, combineFunction)); + } + /** * Combines a list of source Observables by emitting an item that aggregates the latest values of each of * the source Observables each time an item is received from any of the source Observables, where this diff --git a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java index 5df99b2585..3617c448cd 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java +++ b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java @@ -16,7 +16,8 @@ package rx.internal.operators; import java.util.BitSet; -import java.util.List; +import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -42,10 +43,10 @@ * the result type of the combinator function */ public final class OnSubscribeCombineLatest implements OnSubscribe { - final List> sources; + final Collection> sources; final FuncN combinator; - public OnSubscribeCombineLatest(List> sources, FuncN combinator) { + public OnSubscribeCombineLatest(Collection> sources, FuncN combinator) { this.sources = sources; this.combinator = combinator; if (sources.size() > RxRingBuffer.SIZE) { @@ -62,7 +63,7 @@ public void call(final Subscriber child) { return; } if (sources.size() == 1) { - child.setProducer(new SingleSourceProducer(child, sources.get(0), combinator)); + child.setProducer(new SingleSourceProducer(child, sources.iterator().next(), combinator)); } else { child.setProducer(new MultiSourceProducer(child, sources, combinator)); } @@ -76,7 +77,7 @@ public void call(final Subscriber child) { final static class MultiSourceProducer implements Producer { private final AtomicBoolean started = new AtomicBoolean(); private final AtomicLong requested = new AtomicLong(); - private final List> sources; + private final Collection> sources; private final Subscriber child; private final FuncN combinator; private final MultiSourceRequestableSubscriber[] subscribers; @@ -92,7 +93,7 @@ final static class MultiSourceProducer implements Producer { private final AtomicLong counter = new AtomicLong(); @SuppressWarnings("unchecked") - public MultiSourceProducer(final Subscriber child, final List> sources, FuncN combinator) { + public MultiSourceProducer(final Subscriber child, final Collection> sources, FuncN combinator) { this.sources = sources; this.child = child; this.combinator = combinator; @@ -116,10 +117,11 @@ public void request(long n) { */ int sizePerSubscriber = RxRingBuffer.SIZE / sources.size(); int leftOver = RxRingBuffer.SIZE % sources.size(); - for (int i = 0; i < sources.size(); i++) { - Observable o = sources.get(i); + Iterator> iterator = sources.iterator(); + for (int i = 0; iterator.hasNext(); i++) { + Observable o = iterator.next(); int toRequest = sizePerSubscriber; - if (i == sources.size() - 1) { + if (!iterator.hasNext()) { toRequest += leftOver; } MultiSourceRequestableSubscriber s = new MultiSourceRequestableSubscriber(i, toRequest, child, this); diff --git a/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java b/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java index c28606cae0..fc7a033448 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -477,7 +478,7 @@ public List call(Object... args) { }; for (int i = 1; i <= n; i++) { System.out.println("test1ToNSources: " + i + " sources"); - List> sources = new ArrayList>(); + Collection> sources = new ArrayList>(); List values = new ArrayList(); for (int j = 0; j < i; j++) { sources.add(Observable.just(j)); @@ -509,7 +510,7 @@ public List call(Object... args) { }; for (int i = 1; i <= n; i++) { System.out.println("test1ToNSourcesScheduled: " + i + " sources"); - List> sources = new ArrayList>(); + Collection> sources = new ArrayList>(); List values = new ArrayList(); for (int j = 0; j < i; j++) { sources.add(Observable.just(j).subscribeOn(Schedulers.io()));