Skip to content

Commit 3768a53

Browse files
authored
2.x: fix cross-boundary invalid fusion with observeOn & zip (#4984)
1 parent 5717827 commit 3768a53

File tree

7 files changed

+135
-10
lines changed

7 files changed

+135
-10
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,7 @@ public void onSubscribe(Subscription s) {
600600
if (s instanceof QueueSubscription) {
601601
@SuppressWarnings("unchecked")
602602
QueueSubscription<U> qs = (QueueSubscription<U>) s;
603-
int m = qs.requestFusion(QueueSubscription.ANY);
603+
int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);
604604
if (m == QueueSubscription.SYNC) {
605605
fusionMode = m;
606606
queue = qs;

src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ public void onSubscribe(Subscription s) {
357357
if (s instanceof QueueSubscription) {
358358
QueueSubscription<T> f = (QueueSubscription<T>) s;
359359

360-
int m = f.requestFusion(QueueSubscription.ANY);
360+
int m = f.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);
361361

362362
if (m == QueueSubscription.SYNC) {
363363
sourceMode = m;

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ public void onSubscribe(Disposable s) {
532532
@SuppressWarnings("unchecked")
533533
QueueDisposable<U> qd = (QueueDisposable<U>) s;
534534

535-
int m = qd.requestFusion(QueueDisposable.ANY);
535+
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
536536
if (m == QueueDisposable.SYNC) {
537537
fusionMode = m;
538538
queue = qd;

src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.internal.operators.flowable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.any;
1718
import static org.mockito.Mockito.*;
1819

1920
import java.util.*;
@@ -897,4 +898,32 @@ public void scalarXMap() {
897898
.test()
898899
.assertResult(2);
899900
}
901+
902+
@Test
903+
public void noCrossBoundaryFusion() {
904+
for (int i = 0; i < 500; i++) {
905+
TestSubscriber<Object> ts = Flowable.merge(
906+
Flowable.just(1).observeOn(Schedulers.single()).map(new Function<Integer, Object>() {
907+
@Override
908+
public Object apply(Integer v) throws Exception {
909+
return Thread.currentThread().getName().substring(0, 4);
910+
}
911+
}),
912+
Flowable.just(1).observeOn(Schedulers.computation()).map(new Function<Integer, Object>() {
913+
@Override
914+
public Object apply(Integer v) throws Exception {
915+
return Thread.currentThread().getName().substring(0, 4);
916+
}
917+
})
918+
)
919+
.test()
920+
.awaitDone(5, TimeUnit.SECONDS)
921+
.assertValueCount(2);
922+
923+
List<Object> list = ts.values();
924+
925+
assertTrue(list.toString(), list.contains("RxSi"));
926+
assertTrue(list.toString(), list.contains("RxCo"));
927+
}
928+
}
900929
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1767,4 +1767,38 @@ public Integer apply(Integer a, Integer b) throws Exception {
17671767
.test(0L)
17681768
.assertFailure(TestException.class);
17691769
}
1770+
1771+
@Test
1772+
public void noCrossBoundaryFusion() {
1773+
for (int i = 0; i < 500; i++) {
1774+
TestSubscriber<List<Object>> ts = Flowable.zip(
1775+
Flowable.just(1).observeOn(Schedulers.single()).map(new Function<Integer, Object>() {
1776+
@Override
1777+
public Object apply(Integer v) throws Exception {
1778+
return Thread.currentThread().getName().substring(0, 4);
1779+
}
1780+
}),
1781+
Flowable.just(1).observeOn(Schedulers.computation()).map(new Function<Integer, Object>() {
1782+
@Override
1783+
public Object apply(Integer v) throws Exception {
1784+
return Thread.currentThread().getName().substring(0, 4);
1785+
}
1786+
}),
1787+
new BiFunction<Object, Object, List<Object>>() {
1788+
@Override
1789+
public List<Object> apply(Object t1, Object t2) throws Exception {
1790+
return Arrays.asList(t1, t2);
1791+
}
1792+
}
1793+
)
1794+
.test()
1795+
.awaitDone(5, TimeUnit.SECONDS)
1796+
.assertValueCount(1);
1797+
1798+
List<Object> list = ts.values().get(0);
1799+
1800+
assertTrue(list.toString(), list.contains("RxSi"));
1801+
assertTrue(list.toString(), list.contains("RxCo"));
1802+
}
1803+
}
17701804
}

src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -756,4 +756,32 @@ public Integer apply(Integer w) throws Exception {
756756

757757
TestHelper.assertError(errors, 1, TestException.class);
758758
}
759+
760+
@Test
761+
public void noCrossBoundaryFusion() {
762+
for (int i = 0; i < 500; i++) {
763+
TestObserver<Object> ts = Observable.merge(
764+
Observable.just(1).observeOn(Schedulers.single()).map(new Function<Integer, Object>() {
765+
@Override
766+
public Object apply(Integer v) throws Exception {
767+
return Thread.currentThread().getName().substring(0, 4);
768+
}
769+
}),
770+
Observable.just(1).observeOn(Schedulers.computation()).map(new Function<Integer, Object>() {
771+
@Override
772+
public Object apply(Integer v) throws Exception {
773+
return Thread.currentThread().getName().substring(0, 4);
774+
}
775+
})
776+
)
777+
.test()
778+
.awaitDone(5, TimeUnit.SECONDS)
779+
.assertValueCount(2);
780+
781+
List<Object> list = ts.values();
782+
783+
assertTrue(list.toString(), list.contains("RxSi"));
784+
assertTrue(list.toString(), list.contains("RxCo"));
785+
}
786+
}
759787
}

src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.internal.operators.observable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.*;
1718
import static org.mockito.Mockito.*;
1819

1920
import java.util.*;
@@ -999,8 +1000,8 @@ public Object apply(final Object[] args) {
9991000
public void testDownstreamBackpressureRequestsWithFiniteSyncObservables() {
10001001
AtomicInteger generatedA = new AtomicInteger();
10011002
AtomicInteger generatedB = new AtomicInteger();
1002-
Observable<Integer> o1 = createInfiniteObservable(generatedA).take(Flowable.bufferSize() * 2);
1003-
Observable<Integer> o2 = createInfiniteObservable(generatedB).take(Flowable.bufferSize() * 2);
1003+
Observable<Integer> o1 = createInfiniteObservable(generatedA).take(Observable.bufferSize() * 2);
1004+
Observable<Integer> o2 = createInfiniteObservable(generatedB).take(Observable.bufferSize() * 2);
10041005

10051006
TestObserver<String> ts = new TestObserver<String>();
10061007
Observable.zip(o1, o2, new BiFunction<Integer, Integer, String>() {
@@ -1010,14 +1011,14 @@ public String apply(Integer t1, Integer t2) {
10101011
return t1 + "-" + t2;
10111012
}
10121013

1013-
}).observeOn(Schedulers.computation()).take(Flowable.bufferSize() * 2).subscribe(ts);
1014+
}).observeOn(Schedulers.computation()).take(Observable.bufferSize() * 2).subscribe(ts);
10141015

10151016
ts.awaitTerminalEvent();
10161017
ts.assertNoErrors();
1017-
assertEquals(Flowable.bufferSize() * 2, ts.valueCount());
1018+
assertEquals(Observable.bufferSize() * 2, ts.valueCount());
10181019
System.out.println("Generated => A: " + generatedA.get() + " B: " + generatedB.get());
1019-
assertTrue(generatedA.get() < (Flowable.bufferSize() * 3));
1020-
assertTrue(generatedB.get() < (Flowable.bufferSize() * 3));
1020+
assertTrue(generatedA.get() < (Observable.bufferSize() * 3));
1021+
assertTrue(generatedB.get() < (Observable.bufferSize() * 3));
10211022
}
10221023

10231024
private Observable<Integer> createInfiniteObservable(final AtomicInteger generated) {
@@ -1358,4 +1359,37 @@ public Object apply(Integer a, Integer b) throws Exception {
13581359
}
13591360
}));
13601361
}
1361-
}
1362+
1363+
@Test
1364+
public void noCrossBoundaryFusion() {
1365+
for (int i = 0; i < 500; i++) {
1366+
TestObserver<List<Object>> ts = Observable.zip(
1367+
Observable.just(1).observeOn(Schedulers.single()).map(new Function<Integer, Object>() {
1368+
@Override
1369+
public Object apply(Integer v) throws Exception {
1370+
return Thread.currentThread().getName().substring(0, 4);
1371+
}
1372+
}),
1373+
Observable.just(1).observeOn(Schedulers.computation()).map(new Function<Integer, Object>() {
1374+
@Override
1375+
public Object apply(Integer v) throws Exception {
1376+
return Thread.currentThread().getName().substring(0, 4);
1377+
}
1378+
}),
1379+
new BiFunction<Object, Object, List<Object>>() {
1380+
@Override
1381+
public List<Object> apply(Object t1, Object t2) throws Exception {
1382+
return Arrays.asList(t1, t2);
1383+
}
1384+
}
1385+
)
1386+
.test()
1387+
.awaitDone(5, TimeUnit.SECONDS)
1388+
.assertValueCount(1);
1389+
1390+
List<Object> list = ts.values().get(0);
1391+
1392+
assertTrue(list.toString(), list.contains("RxSi"));
1393+
assertTrue(list.toString(), list.contains("RxCo"));
1394+
}
1395+
}}

0 commit comments

Comments
 (0)