Skip to content

Commit 5717827

Browse files
authored
2.x: fix Observable.concatMapEager bad logic for immediate scalars (#4982)
* 2.x: fix Observable.concatMapEager bad logic for immediate scalars * Don't print-log
1 parent 0a254a8 commit 5717827

File tree

3 files changed

+36
-19
lines changed

3 files changed

+36
-19
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapEager.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package io.reactivex.internal.operators.observable;
1515

1616
import java.util.ArrayDeque;
17-
import java.util.concurrent.Callable;
1817
import java.util.concurrent.atomic.AtomicInteger;
1918

2019
import io.reactivex.*;
@@ -218,7 +217,6 @@ public void innerComplete(InnerQueuedObserver<R> inner) {
218217
drain();
219218
}
220219

221-
@SuppressWarnings("unchecked")
222220
@Override
223221
public void drain() {
224222
if (getAndIncrement() != 0) {
@@ -276,23 +274,6 @@ public void drain() {
276274
return;
277275
}
278276

279-
if (source instanceof Callable) {
280-
R w;
281-
282-
try {
283-
w = ((Callable<R>)source).call();
284-
} catch (Throwable ex) {
285-
Exceptions.throwIfFatal(ex);
286-
error.addThrowable(ex);
287-
continue;
288-
}
289-
290-
if (w != null) {
291-
a.onNext(w);
292-
}
293-
continue;
294-
}
295-
296277
InnerQueuedObserver<R> inner = new InnerQueuedObserver<R>(this, prefetch);
297278

298279
observers.offer(inner);

src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1175,4 +1175,22 @@ public void innerLong() {
11751175
.assertComplete()
11761176
.assertNoErrors();
11771177
}
1178+
1179+
@Test
1180+
public void oneDelayed() {
1181+
Flowable.just(1, 2, 3, 4, 5)
1182+
.concatMapEager(new Function<Integer, Flowable<Integer>>() {
1183+
@Override
1184+
public Flowable<Integer> apply(Integer i) throws Exception {
1185+
return i == 3 ? Flowable.just(i) : Flowable
1186+
.just(i)
1187+
.delay(1, TimeUnit.MILLISECONDS, Schedulers.io());
1188+
}
1189+
})
1190+
.observeOn(Schedulers.io())
1191+
.test()
1192+
.awaitDone(5, TimeUnit.SECONDS)
1193+
.assertResult(1, 2, 3, 4, 5)
1194+
;
1195+
}
11781196
}

src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -983,4 +983,22 @@ public ObservableSource<Object> apply(Object v) throws Exception {
983983
}
984984
});
985985
}
986+
987+
@Test
988+
public void oneDelayed() {
989+
Observable.just(1, 2, 3, 4, 5)
990+
.concatMapEager(new Function<Integer, ObservableSource<Integer>>() {
991+
@Override
992+
public ObservableSource<Integer> apply(Integer i) throws Exception {
993+
return i == 3 ? Observable.just(i) : Observable
994+
.just(i)
995+
.delay(1, TimeUnit.MILLISECONDS, Schedulers.io());
996+
}
997+
})
998+
.observeOn(Schedulers.io())
999+
.test()
1000+
.awaitDone(5, TimeUnit.SECONDS)
1001+
.assertResult(1, 2, 3, 4, 5)
1002+
;
1003+
}
9861004
}

0 commit comments

Comments
 (0)