Skip to content

Commit 41959f7

Browse files
davidmotenakarnokd
authored andcommitted
onBackpressureDrop - prevent multiple terminal events (#4250)
1 parent 60dbbed commit 41959f7

File tree

2 files changed

+162
-4
lines changed

2 files changed

+162
-4
lines changed

src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import rx.Observable.Operator;
2222
import rx.exceptions.Exceptions;
2323
import rx.functions.Action1;
24+
import rx.plugins.RxJavaHooks;
2425

2526
public class OperatorOnBackpressureDrop<T> implements Operator<T, T> {
2627

@@ -62,23 +63,37 @@ public void request(long n) {
6263

6364
});
6465
return new Subscriber<T>(child) {
66+
67+
boolean done;
68+
6569
@Override
6670
public void onStart() {
6771
request(Long.MAX_VALUE);
6872
}
6973

7074
@Override
7175
public void onCompleted() {
72-
child.onCompleted();
76+
if (!done) {
77+
done = true;
78+
child.onCompleted();
79+
}
7380
}
7481

7582
@Override
7683
public void onError(Throwable e) {
77-
child.onError(e);
84+
if (!done) {
85+
done = true;
86+
child.onError(e);
87+
} else {
88+
RxJavaHooks.onError(e);
89+
}
7890
}
7991

8092
@Override
8193
public void onNext(T t) {
94+
if (done) {
95+
return;
96+
}
8297
if (requested.get() > 0) {
8398
child.onNext(t);
8499
requested.decrementAndGet();
@@ -88,7 +103,7 @@ public void onNext(T t) {
88103
try {
89104
onDrop.call(t);
90105
} catch (Throwable e) {
91-
Exceptions.throwOrReport(e, child, t);
106+
Exceptions.throwOrReport(e, this, t);
92107
return;
93108
}
94109
}

src/test/java/rx/internal/operators/OperatorOnBackpressureDropTest.java

Lines changed: 144 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,24 @@
1818
import static org.junit.Assert.assertEquals;
1919
import static org.junit.Assert.assertFalse;
2020

21+
import java.util.ArrayList;
22+
import java.util.Arrays;
23+
import java.util.List;
24+
import java.util.concurrent.CopyOnWriteArrayList;
2125
import java.util.concurrent.CountDownLatch;
2226
import java.util.concurrent.atomic.AtomicBoolean;
2327
import java.util.concurrent.atomic.AtomicInteger;
2428

2529
import org.junit.Test;
2630

27-
import rx.Observable;
31+
import rx.*;
2832
import rx.Observable.OnSubscribe;
2933
import rx.Observer;
3034
import rx.Subscriber;
3135
import rx.functions.Action1;
3236
import rx.internal.util.RxRingBuffer;
3337
import rx.observers.TestSubscriber;
38+
import rx.plugins.RxJavaHooks;
3439
import rx.schedulers.Schedulers;
3540

3641
public class OperatorOnBackpressureDropTest {
@@ -141,6 +146,144 @@ public void call(Throwable t) {
141146
assertFalse(errorOccurred.get());
142147
}
143148

149+
@Test
150+
public void testOnDropMethodIsCalled() {
151+
final List<Integer> list = new ArrayList<Integer>();
152+
// request 0
153+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
154+
Observable.create(new OnSubscribe<Integer>() {
155+
156+
@Override
157+
public void call(final Subscriber<? super Integer> sub) {
158+
sub.setProducer(new Producer() {
159+
160+
@Override
161+
public void request(long n) {
162+
if (n > 1) {
163+
sub.onNext(1);
164+
sub.onNext(2);
165+
sub.onCompleted();
166+
}
167+
}
168+
});
169+
}
170+
}).onBackpressureDrop(new Action1<Integer>() {
171+
@Override
172+
public void call(Integer t) {
173+
list.add(t);
174+
}
175+
}).subscribe(ts);
176+
assertEquals(Arrays.asList(1, 2), list);
177+
}
178+
179+
@Test
180+
public void testUpstreamEmitsOnCompletedAfterFailureWithoutCheckingSubscription() {
181+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
182+
final RuntimeException e = new RuntimeException();
183+
Observable.create(new OnSubscribe<Integer>() {
184+
185+
@Override
186+
public void call(final Subscriber<? super Integer> sub) {
187+
sub.setProducer(new Producer() {
188+
189+
@Override
190+
public void request(long n) {
191+
if (n > 1) {
192+
sub.onNext(1);
193+
sub.onCompleted();
194+
}
195+
}
196+
});
197+
}
198+
})
199+
.onBackpressureDrop(new Action1<Integer>() {
200+
@Override
201+
public void call(Integer t) {
202+
throw e;
203+
}})
204+
.unsafeSubscribe(ts);
205+
ts.assertNoValues();
206+
ts.assertError(e);
207+
ts.assertNotCompleted();
208+
}
209+
210+
@Test
211+
public void testUpstreamEmitsErrorAfterFailureWithoutCheckingSubscriptionResultsInHooksOnErrorCalled() {
212+
try {
213+
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
214+
RxJavaHooks.setOnError(new Action1<Throwable>() {
215+
216+
@Override
217+
public void call(Throwable t) {
218+
list.add(t);
219+
}
220+
});
221+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
222+
final RuntimeException e1 = new RuntimeException();
223+
final RuntimeException e2 = new RuntimeException();
224+
Observable.create(new OnSubscribe<Integer>() {
225+
226+
@Override
227+
public void call(final Subscriber<? super Integer> sub) {
228+
sub.setProducer(new Producer() {
229+
230+
@Override
231+
public void request(long n) {
232+
if (n > 1) {
233+
sub.onNext(1);
234+
sub.onError(e2);
235+
}
236+
}
237+
});
238+
}
239+
}).onBackpressureDrop(new Action1<Integer>() {
240+
@Override
241+
public void call(Integer t) {
242+
throw e1;
243+
}
244+
}).unsafeSubscribe(ts);
245+
ts.assertNoValues();
246+
assertEquals(Arrays.asList(e1), ts.getOnErrorEvents());
247+
ts.assertNotCompleted();
248+
assertEquals(Arrays.asList(e2), list);
249+
} finally {
250+
RxJavaHooks.setOnError(null);
251+
}
252+
}
253+
254+
@Test
255+
public void testUpstreamEmitsOnNextAfterFailureWithoutCheckingSubscription() {
256+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
257+
final RuntimeException e = new RuntimeException();
258+
Observable.create(new OnSubscribe<Integer>() {
259+
260+
@Override
261+
public void call(final Subscriber<? super Integer> sub) {
262+
sub.setProducer(new Producer() {
263+
264+
@Override
265+
public void request(long n) {
266+
if (n > 1) {
267+
sub.onNext(1);
268+
sub.onNext(2);
269+
}
270+
}
271+
});
272+
}
273+
})
274+
.onBackpressureDrop(new Action1<Integer>() {
275+
@Override
276+
public void call(Integer t) {
277+
throw e;
278+
}})
279+
.unsafeSubscribe(ts);
280+
ts.assertNoValues();
281+
ts.assertError(e);
282+
ts.assertNotCompleted();
283+
}
284+
285+
286+
144287
private static final Action1<Long> THROW_NON_FATAL = new Action1<Long>() {
145288
@Override
146289
public void call(Long n) {

0 commit comments

Comments
 (0)