diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 64991569db..edc1a124d3 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -3889,6 +3889,29 @@ public final Observable concatMap(Func1 + * + *
+ *
Scheduler:
+ *
{@code concatMapIterable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of item emitted by the resulting Observable + * @param collectionSelector + * a function that returns an Iterable sequence of values for when given an item emitted by the + * source Observable + * @return an Observable that emits the results of concatenating the items emitted by the source Observable with + * the values in the Iterables corresponding to those items, as generated by {@code collectionSelector} + * @see ReactiveX operators documentation: FlatMap + */ + public final Observable concatMapIterable(Func1> collectionSelector) { + return concat(map(OperatorMapPair.convertSelector(collectionSelector))); + } + /** * Returns an Observable that emits the items emitted from the current Observable, then the next, one after * the other, without interleaving them. diff --git a/src/test/java/rx/internal/operators/OperatorConcatTest.java b/src/test/java/rx/internal/operators/OperatorConcatTest.java index 8812dd50eb..a54c435432 100644 --- a/src/test/java/rx/internal/operators/OperatorConcatTest.java +++ b/src/test/java/rx/internal/operators/OperatorConcatTest.java @@ -82,6 +82,34 @@ public void testConcatWithList() { verify(observer, times(7)).onNext(anyString()); } + + @Test + public void testConcatMapIterable() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + + final String[] l = { "a", "b", "c", "d", "e" }; + + Func1,List> identity = new Func1, List>() { + @Override + public List call(List t) { + return t; + } + }; + + final Observable> listObs = Observable.just(Arrays.asList(l)); + final Observable concatMap = listObs.concatMapIterable(identity); + + concatMap.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("a"); + inOrder.verify(observer, times(1)).onNext("b"); + inOrder.verify(observer, times(1)).onNext("c"); + inOrder.verify(observer, times(1)).onNext("d"); + inOrder.verify(observer, times(1)).onNext("e"); + inOrder.verify(observer, times(1)).onCompleted(); + } @Test public void testConcatObservableOfObservables() {