Skip to content

Commit 8ff2bb0

Browse files
committed
OperatorAll - prevent multiple terminal events
1 parent 0bdff66 commit 8ff2bb0

File tree

2 files changed

+35
-1
lines changed

2 files changed

+35
-1
lines changed

src/main/java/rx/internal/operators/OperatorAll.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ public void onNext(T t) {
6161

6262
@Override
6363
public void onError(Throwable e) {
64-
child.onError(e);
64+
if (!done) {
65+
done = true;
66+
child.onError(e);
67+
}
6568
}
6669

6770
@Override

src/test/java/rx/internal/operators/OperatorAllTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.junit.Test;
2626

2727
import rx.*;
28+
import rx.Observable.OnSubscribe;
2829
import rx.functions.Func1;
2930
import rx.observers.TestSubscriber;
3031

@@ -178,4 +179,34 @@ public Boolean call(Object object) {
178179
assertEquals(ex, errors.get(0));
179180
assertTrue(ex.getCause().getMessage().contains("Boo!"));
180181
}
182+
183+
@Test
184+
public void testDoesNotEmitMultipleTerminalEvents() {
185+
TestSubscriber<Boolean> ts = TestSubscriber.create();
186+
Observable.create(new OnSubscribe<Integer>() {
187+
188+
@Override
189+
public void call(final Subscriber<? super Integer> sub) {
190+
sub.setProducer(new Producer() {
191+
192+
@Override
193+
public void request(long n) {
194+
if (n > 0) {
195+
sub.onNext(1);
196+
sub.onCompleted();
197+
}
198+
}
199+
});
200+
}
201+
})
202+
.all(new Func1<Integer,Boolean>() {
203+
204+
@Override
205+
public Boolean call(Integer t) {
206+
throw new RuntimeException("boo");
207+
}})
208+
.unsafeSubscribe(ts);
209+
ts.assertError(RuntimeException.class);
210+
ts.assertNotCompleted();
211+
}
181212
}

0 commit comments

Comments
 (0)