Skip to content

Commit 2e6f94c

Browse files
authored
2.x: A.flatMapB to eagerly check for cancellations before subscribing (#4992)
1 parent a8ba158 commit 2e6f94c

17 files changed

+659
-40
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,24 +152,25 @@ public void onNext(T t) {
152152
}
153153
} else {
154154
InnerSubscriber<T, U> inner = new InnerSubscriber<T, U>(this, uniqueId++);
155-
addInner(inner);
156-
p.subscribe(inner);
155+
if (addInner(inner)) {
156+
p.subscribe(inner);
157+
}
157158
}
158159
}
159160

160-
void addInner(InnerSubscriber<T, U> inner) {
161+
boolean addInner(InnerSubscriber<T, U> inner) {
161162
for (;;) {
162163
InnerSubscriber<?, ?>[] a = subscribers.get();
163164
if (a == CANCELLED) {
164165
inner.dispose();
165-
return;
166+
return false;
166167
}
167168
int n = a.length;
168169
InnerSubscriber<?, ?>[] b = new InnerSubscriber[n + 1];
169170
System.arraycopy(a, 0, b, 0, n);
170171
b[n] = inner;
171172
if (subscribers.compareAndSet(a, b)) {
172-
return;
173+
return true;
173174
}
174175
}
175176
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,9 @@ public void onNext(T value) {
116116

117117
InnerConsumer inner = new InnerConsumer();
118118

119-
set.add(inner);
120-
121-
cs.subscribe(inner);
119+
if (set.add(inner)) {
120+
cs.subscribe(inner);
121+
}
122122
}
123123

124124
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,9 @@ public void onNext(T value) {
124124

125125
InnerObserver inner = new InnerObserver();
126126

127-
set.add(inner);
128-
129-
cs.subscribe(inner);
127+
if (set.add(inner)) {
128+
cs.subscribe(inner);
129+
}
130130
}
131131

132132
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,9 @@ public void onNext(T t) {
128128

129129
InnerObserver inner = new InnerObserver();
130130

131-
set.add(inner);
132-
133-
ms.subscribe(inner);
131+
if (set.add(inner)) {
132+
ms.subscribe(inner);
133+
}
134134
}
135135

136136
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,9 @@ public void onNext(T t) {
128128

129129
InnerObserver inner = new InnerObserver();
130130

131-
set.add(inner);
132-
133-
ms.subscribe(inner);
131+
if (set.add(inner)) {
132+
ms.subscribe(inner);
133+
}
134134
}
135135

136136
@Override

src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapCompletable.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ public void onSuccess(T value) {
8787
return;
8888
}
8989

90-
cs.subscribe(this);
90+
if (!isDisposed()) {
91+
cs.subscribe(this);
92+
}
9193
}
9294

9395
@Override

src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingle.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,9 @@ public void onSuccess(T value) {
9191
return;
9292
}
9393

94-
ss.subscribe(new FlatMapSingleObserver<R>(this, actual));
94+
if (!isDisposed()) {
95+
ss.subscribe(new FlatMapSingleObserver<R>(this, actual));
96+
}
9597
}
9698

9799
@Override

src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatten.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,9 @@ public void onSuccess(T value) {
9393
return;
9494
}
9595

96-
source.subscribe(new InnerObserver());
96+
if (!isDisposed()) {
97+
source.subscribe(new InnerObserver());
98+
}
9799
}
98100

99101
@Override

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,26 +158,27 @@ void subscribeInner(ObservableSource<? extends U> p) {
158158
}
159159
} else {
160160
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
161-
addInner(inner);
162-
p.subscribe(inner);
161+
if (addInner(inner)) {
162+
p.subscribe(inner);
163+
}
163164
break;
164165
}
165166
}
166167
}
167168

168-
void addInner(InnerObserver<T, U> inner) {
169+
boolean addInner(InnerObserver<T, U> inner) {
169170
for (;;) {
170171
InnerObserver<?, ?>[] a = observers.get();
171172
if (a == CANCELLED) {
172173
inner.dispose();
173-
return;
174+
return false;
174175
}
175176
int n = a.length;
176177
InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
177178
System.arraycopy(a, 0, b, 0, n);
178179
b[n] = inner;
179180
if (observers.compareAndSet(a, b)) {
180-
return;
181+
return true;
181182
}
182183
}
183184
}

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ public void onNext(T value) {
9898

9999
InnerObserver inner = new InnerObserver();
100100

101-
set.add(inner);
102-
103-
cs.subscribe(inner);
101+
if (set.add(inner)) {
102+
cs.subscribe(inner);
103+
}
104104
}
105105

106106
@Override

0 commit comments

Comments
 (0)