Skip to content

Commit 0dbdc1b

Browse files
Add Single.zip() for Iterable of Singles
1 parent 9fddbd9 commit 0dbdc1b

File tree

3 files changed

+230
-0
lines changed

3 files changed

+230
-0
lines changed

src/main/java/rx/Single.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package rx;
1414

15+
import java.util.List;
1516
import java.util.concurrent.Callable;
1617
import java.util.concurrent.Future;
1718
import java.util.concurrent.TimeUnit;
@@ -31,6 +32,7 @@
3132
import rx.functions.Func7;
3233
import rx.functions.Func8;
3334
import rx.functions.Func9;
35+
import rx.functions.FuncN;
3436
import rx.internal.operators.OnSubscribeToObservableFuture;
3537
import rx.internal.operators.OperatorDelay;
3638
import rx.internal.operators.OperatorDoOnEach;
@@ -40,6 +42,7 @@
4042
import rx.internal.operators.OperatorSubscribeOn;
4143
import rx.internal.operators.OperatorTimeout;
4244
import rx.internal.operators.OperatorZip;
45+
import rx.internal.operators.SingleOperatorZip;
4346
import rx.internal.producers.SingleDelayedProducer;
4447
import rx.singles.BlockingSingle;
4548
import rx.observers.SafeSubscriber;
@@ -1203,6 +1206,31 @@ public final static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Single<R> zip(Single
12031206
return just(new Observable<?>[] { asObservable(o1), asObservable(o2), asObservable(o3), asObservable(o4), asObservable(o5), asObservable(o6), asObservable(o7), asObservable(o8), asObservable(o9) }).lift(new OperatorZip<R>(zipFunction));
12041207
}
12051208

1209+
/**
1210+
* Returns a Single that emits the result of specified combiner function applied to combination of
1211+
* items emitted, in sequence, by an Iterable of other Singles.
1212+
* <p>
1213+
* {@code zip} applies this function in strict sequence.
1214+
* <p>
1215+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/zip.png" alt="">
1216+
* <dl>
1217+
* <dt><b>Scheduler:</b></dt>
1218+
* <dd>{@code zip} does not operate by default on a particular {@link Scheduler}.</dd>
1219+
* </dl>
1220+
*
1221+
* @param singles
1222+
* an Iterable of source Singles
1223+
* @param zipFunction
1224+
* a function that, when applied to an item emitted by each of the source Singles, results in
1225+
* an item that will be emitted by the resulting Single
1226+
* @return a Single that emits the zipped results
1227+
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
1228+
*/
1229+
@SuppressWarnings("unchecked")
1230+
public static <R> Single<R> zip(Iterable<? extends Single<?>> singles, FuncN<? extends R> zipFunction) {
1231+
return SingleOperatorZip.zip(iterableToArray(singles), zipFunction);
1232+
}
1233+
12061234
/**
12071235
* Returns an Observable that emits the item emitted by the source Single, then the item emitted by the
12081236
* specified Single.
@@ -1997,4 +2025,46 @@ public void call(SingleSubscriber<? super T> singleSubscriber) {
19972025
}
19982026
});
19992027
}
2028+
2029+
/**
2030+
* FOR INTERNAL USE ONLY.
2031+
* <p>
2032+
* Converts {@link Iterable} of {@link Single} to array of {@link Single}.
2033+
*
2034+
* @param singlesIterable
2035+
* non null iterable of {@link Single}.
2036+
* @return array of {@link Single} with same length as passed iterable.
2037+
*/
2038+
@SuppressWarnings("unchecked")
2039+
static <T> Single<? extends T>[] iterableToArray(final Iterable<? extends Single<? extends T>> singlesIterable) {
2040+
final Single<? extends T>[] singlesArray;
2041+
int count;
2042+
2043+
if (singlesIterable instanceof List) {
2044+
List<? extends Single<? extends T>> list = (List<? extends Single<? extends T>>) singlesIterable;
2045+
count = list.size();
2046+
singlesArray = list.toArray(new Single[count]);
2047+
} else {
2048+
Single<? extends T>[] tempArray = new Single[8]; // Magic number used just to reduce number of allocations.
2049+
count = 0;
2050+
for (Single<? extends T> s : singlesIterable) {
2051+
if (count == tempArray.length) {
2052+
Single<? extends T>[] sb = new Single[count + (count >> 2)];
2053+
System.arraycopy(tempArray, 0, sb, 0, count);
2054+
tempArray = sb;
2055+
}
2056+
tempArray[count] = s;
2057+
count++;
2058+
}
2059+
2060+
if (tempArray.length == count) {
2061+
singlesArray = tempArray;
2062+
} else {
2063+
singlesArray = new Single[count];
2064+
System.arraycopy(tempArray, 0, singlesArray, 0, count);
2065+
}
2066+
}
2067+
2068+
return singlesArray;
2069+
}
20002070
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package rx.internal.operators;
2+
3+
import rx.Single;
4+
import rx.SingleSubscriber;
5+
import rx.exceptions.Exceptions;
6+
import rx.functions.FuncN;
7+
import rx.plugins.RxJavaPlugins;
8+
import rx.subscriptions.CompositeSubscription;
9+
10+
import java.util.concurrent.atomic.AtomicBoolean;
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
13+
public class SingleOperatorZip {
14+
15+
public static <T, R> Single<R> zip(final Single<? extends T>[] singles, final FuncN<? extends R> zipper) {
16+
return Single.create(new Single.OnSubscribe<R>() {
17+
@Override
18+
public void call(final SingleSubscriber<? super R> subscriber) {
19+
final AtomicInteger wip = new AtomicInteger(singles.length);
20+
final AtomicBoolean once = new AtomicBoolean();
21+
final Object[] values = new Object[singles.length];
22+
23+
CompositeSubscription compositeSubscription = new CompositeSubscription();
24+
subscriber.add(compositeSubscription);
25+
26+
for (int i = 0; i < singles.length; i++) {
27+
if (compositeSubscription.isUnsubscribed() || once.get()) {
28+
break;
29+
}
30+
31+
final int j = i;
32+
SingleSubscriber<T> singleSubscriber = new SingleSubscriber<T>() {
33+
@Override
34+
public void onSuccess(T value) {
35+
values[j] = value;
36+
if (wip.decrementAndGet() == 0) {
37+
R r;
38+
39+
try {
40+
r = zipper.call(values);
41+
} catch (Throwable e) {
42+
Exceptions.throwIfFatal(e);
43+
subscriber.onError(e);
44+
return;
45+
}
46+
47+
subscriber.onSuccess(r);
48+
}
49+
}
50+
51+
@Override
52+
public void onError(Throwable error) {
53+
if (once.compareAndSet(false, true)) {
54+
subscriber.onError(error);
55+
} else {
56+
RxJavaPlugins.getInstance().getErrorHandler().handleError(error);
57+
}
58+
}
59+
};
60+
61+
compositeSubscription.add(singleSubscriber);
62+
63+
if (compositeSubscription.isUnsubscribed() || once.get()) {
64+
break;
65+
}
66+
67+
singles[i].subscribe(singleSubscriber);
68+
}
69+
}
70+
});
71+
}
72+
}

src/test/java/rx/SingleTest.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
import static org.mockito.Mockito.when;
2727

2828
import java.util.Arrays;
29+
import java.util.Collections;
30+
import java.util.HashSet;
31+
import java.util.LinkedHashMap;
32+
import java.util.List;
33+
import java.util.Set;
2934
import java.util.concurrent.Callable;
3035
import java.util.concurrent.CountDownLatch;
3136
import java.util.concurrent.TimeUnit;
@@ -44,6 +49,7 @@
4449
import rx.functions.Action1;
4550
import rx.functions.Func1;
4651
import rx.functions.Func2;
52+
import rx.functions.FuncN;
4753
import rx.schedulers.TestScheduler;
4854
import rx.singles.BlockingSingle;
4955
import rx.observers.TestSubscriber;
@@ -113,6 +119,57 @@ public String call(String a, String b) {
113119
ts.assertReceivedOnNext(Arrays.asList("AB"));
114120
}
115121

122+
@Test
123+
public void zipIterableShouldZipListOfSingles() {
124+
TestSubscriber<String> ts = new TestSubscriber<String>();
125+
Iterable<Single<Integer>> singles = Arrays.asList(Single.just(1), Single.just(2), Single.just(3));
126+
127+
Single
128+
.zip(singles, new FuncN<String>() {
129+
@Override
130+
public String call(Object... args) {
131+
StringBuilder stringBuilder = new StringBuilder();
132+
for (Object arg : args) {
133+
stringBuilder.append(arg);
134+
}
135+
return stringBuilder.toString();
136+
}
137+
}).subscribe(ts);
138+
139+
ts.assertValue("123");
140+
ts.assertNoErrors();
141+
ts.assertCompleted();
142+
}
143+
144+
@Test
145+
public void zipIterableShouldZipSetOfSingles() {
146+
TestSubscriber<String> ts = new TestSubscriber<String>();
147+
Set<Single<String>> singlesSet = Collections.newSetFromMap(new LinkedHashMap<Single<String>, Boolean>(2));
148+
Single<String> s1 = Single.just("1");
149+
Single<String> s2 = Single.just("2");
150+
Single<String> s3 = Single.just("3");
151+
152+
singlesSet.add(s1);
153+
singlesSet.add(s2);
154+
singlesSet.add(s3);
155+
156+
Single
157+
.zip(singlesSet, new FuncN<String>() {
158+
@Override
159+
public String call(Object... args) {
160+
StringBuilder stringBuilder = new StringBuilder();
161+
for (Object arg : args) {
162+
stringBuilder.append(arg);
163+
}
164+
return stringBuilder.toString();
165+
}
166+
}).subscribe(ts);
167+
168+
ts.assertValue("123");
169+
ts.assertNoErrors();
170+
ts.assertCompleted();
171+
}
172+
116173
@Test
117174
public void testZipWith() {
118175
TestSubscriber<String> ts = new TestSubscriber<String>();
@@ -828,4 +885,35 @@ public void deferShouldPassNullPointerExceptionToTheSubscriberIfSingleFactoryRet
828885

829886
verify(singleFactory).call();
830887
}
888+
889+
@Test(expected = NullPointerException.class)
890+
public void iterableToArrayShouldThrowNullPointerExceptionIfIterableNull() {
891+
Single.iterableToArray(null);
892+
}
893+
894+
@Test
895+
public void iterableToArrayShouldConvertList() {
896+
List<Single<String>> singlesList = Arrays.asList(Single.just("1"), Single.just("2"));
897+
898+
Single<? extends String>[] singlesArray = Single.iterableToArray(singlesList);
899+
assertEquals(2, singlesArray.length);
900+
assertSame(singlesList.get(0), singlesArray[0]);
901+
assertSame(singlesList.get(1), singlesArray[1]);
902+
}
903+
904+
@Test
905+
public void iterableToArrayShouldConvertSet() {
906+
// Just to trigger different path of the code that handles non-list iterables.
907+
Set<Single<String>> singlesSet = Collections.newSetFromMap(new LinkedHashMap<Single<String>, Boolean>(2));
908+
Single<String> s1 = Single.just("1");
909+
Single<String> s2 = Single.just("2");
910+
911+
singlesSet.add(s1);
912+
singlesSet.add(s2);
913+
914+
Single<? extends String>[] singlesArray = Single.iterableToArray(singlesSet);
915+
assertEquals(2, singlesArray.length);
916+
assertSame(s1, singlesArray[0]);
917+
assertSame(s2, singlesArray[1]);
918+
}
831919
}

0 commit comments

Comments
 (0)