From d51bb46182904e22decb7c361ce2fca8fee88728 Mon Sep 17 00:00:00 2001 From: David Moten Date: Fri, 29 Jul 2016 08:06:06 +1000 Subject: [PATCH] collect - prevent multiple terminal events --- .../DeferredScalarSubscriberSafe.java | 57 ++++ .../operators/OnSubscribeCollect.java | 7 +- src/test/java/rx/ObservableTests.java | 55 ---- .../operators/OnSubscribeCollectTest.java | 253 ++++++++++++++++++ .../operators/OnSubscribeReduceTest.java | 2 +- .../internal/operators/OperatorAllTest.java | 2 +- .../internal/operators/OperatorAnyTest.java | 2 +- 7 files changed, 318 insertions(+), 60 deletions(-) create mode 100644 src/main/java/rx/internal/operators/DeferredScalarSubscriberSafe.java create mode 100644 src/test/java/rx/internal/operators/OnSubscribeCollectTest.java diff --git a/src/main/java/rx/internal/operators/DeferredScalarSubscriberSafe.java b/src/main/java/rx/internal/operators/DeferredScalarSubscriberSafe.java new file mode 100644 index 0000000000..3ec25c404a --- /dev/null +++ b/src/main/java/rx/internal/operators/DeferredScalarSubscriberSafe.java @@ -0,0 +1,57 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package rx.internal.operators; + +import rx.Subscriber; +import rx.plugins.RxJavaHooks; + +/** + * Supplements {@code DeferredScalarSubscriber} with defensive behaviour that ensures no emissions + * occur after a terminal event. If {@code onError} is called more than once then errors after the first + * are reported to {@code RxJavaHooks.onError}. + * + * @param source value type + * @param result value type + */ +public abstract class DeferredScalarSubscriberSafe extends DeferredScalarSubscriber { + + protected boolean done; + + public DeferredScalarSubscriberSafe(Subscriber actual) { + super(actual); + } + + @Override + public void onError(Throwable ex) { + if (!done) { + done = true; + super.onError(ex); + } else { + RxJavaHooks.onError(ex); + } + } + + @Override + public void onCompleted() { + if (done) { + return; + } + done = true; + super.onCompleted(); + } + +} diff --git a/src/main/java/rx/internal/operators/OnSubscribeCollect.java b/src/main/java/rx/internal/operators/OnSubscribeCollect.java index 9e28ece0d7..94257e08c8 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeCollect.java +++ b/src/main/java/rx/internal/operators/OnSubscribeCollect.java @@ -50,7 +50,7 @@ public void call(Subscriber t) { new CollectSubscriber(t, initialValue, collector).subscribeTo(source); } - static final class CollectSubscriber extends DeferredScalarSubscriber { + static final class CollectSubscriber extends DeferredScalarSubscriberSafe { final Action2 collector; @@ -63,12 +63,15 @@ public CollectSubscriber(Subscriber actual, R initialValue, Action2> o = Observable.just(1, 2, 3).collect(new Func0>() { - - @Override - public List call() { - return new ArrayList(); - } - - }, new Action2, Integer>() { - - @Override - public void call(List list, Integer v) { - list.add(v); - } - }); - List list = o.toBlocking().last(); - - assertEquals(3, list.size()); - assertEquals(1, list.get(0).intValue()); - assertEquals(2, list.get(1).intValue()); - assertEquals(3, list.get(2).intValue()); - - // test multiple subscribe - List list2 = o.toBlocking().last(); - - assertEquals(3, list2.size()); - assertEquals(1, list2.get(0).intValue()); - assertEquals(2, list2.get(1).intValue()); - assertEquals(3, list2.get(2).intValue()); - } - - @Test - public void testCollectToString() { - String value = Observable.just(1, 2, 3).collect(new Func0() { - - @Override - public StringBuilder call() { - return new StringBuilder(); - } - - }, new Action2() { - - @Override - public void call(StringBuilder sb, Integer v) { - if (sb.length() > 0) { - sb.append("-"); - } - sb.append(v); - } - }).toBlocking().last().toString(); - - assertEquals("1-2-3", value); - } - @Test public void testMergeWith() { TestSubscriber ts = new TestSubscriber(); diff --git a/src/test/java/rx/internal/operators/OnSubscribeCollectTest.java b/src/test/java/rx/internal/operators/OnSubscribeCollectTest.java new file mode 100644 index 0000000000..c72be2cf7f --- /dev/null +++ b/src/test/java/rx/internal/operators/OnSubscribeCollectTest.java @@ -0,0 +1,253 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Assert; +import org.junit.Test; + +import rx.Observable; +import rx.Producer; +import rx.Subscriber; +import rx.Observable.OnSubscribe; +import rx.functions.Action1; +import rx.functions.Action2; +import rx.functions.Func0; +import rx.observers.TestSubscriber; +import rx.plugins.RxJavaHooks; + +public class OnSubscribeCollectTest { + + @Test + public void testCollectToList() { + Observable> o = Observable.just(1, 2, 3).collect(new Func0>() { + + @Override + public List call() { + return new ArrayList(); + } + + }, new Action2, Integer>() { + + @Override + public void call(List list, Integer v) { + list.add(v); + } + }); + + List list = o.toBlocking().last(); + + assertEquals(3, list.size()); + assertEquals(1, list.get(0).intValue()); + assertEquals(2, list.get(1).intValue()); + assertEquals(3, list.get(2).intValue()); + + // test multiple subscribe + List list2 = o.toBlocking().last(); + + assertEquals(3, list2.size()); + assertEquals(1, list2.get(0).intValue()); + assertEquals(2, list2.get(1).intValue()); + assertEquals(3, list2.get(2).intValue()); + } + + @Test + public void testCollectToString() { + String value = Observable.just(1, 2, 3).collect(new Func0() { + + @Override + public StringBuilder call() { + return new StringBuilder(); + } + + }, new Action2() { + + @Override + public void call(StringBuilder sb, Integer v) { + if (sb.length() > 0) { + sb.append("-"); + } + sb.append(v); + } + }).toBlocking().last().toString(); + + assertEquals("1-2-3", value); + } + + @Test + public void testFactoryFailureResultsInErrorEmission() { + TestSubscriber ts = TestSubscriber.create(); + final RuntimeException e = new RuntimeException(); + Observable.just(1).collect(new Func0>() { + + @Override + public List call() { + throw e; + } + }, new Action2, Integer>() { + + @Override + public void call(List list, Integer t) { + list.add(t); + } + }).subscribe(ts); + ts.assertNoValues(); + ts.assertError(e); + ts.assertNotCompleted(); + } + + @Test + public void testCollectorFailureDoesNotResultInTwoErrorEmissions() { + try { + final List list = new CopyOnWriteArrayList(); + RxJavaHooks.setOnError(new Action1() { + + @Override + public void call(Throwable t) { + list.add(t); + } + }); + final RuntimeException e1 = new RuntimeException(); + final RuntimeException e2 = new RuntimeException(); + TestSubscriber> ts = TestSubscriber.create(); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber sub) { + sub.setProducer(new Producer() { + + @Override + public void request(long n) { + if (n > 0) { + sub.onNext(1); + sub.onError(e2); + } + } + }); + } + }).collect(new Func0>() { + + @Override + public List call() { + return new ArrayList(); + } + }, // + new Action2, Integer>() { + + @Override + public void call(List t1, Integer t2) { + throw e1; + } + }).unsafeSubscribe(ts); + assertEquals(Arrays.asList(e1), ts.getOnErrorEvents()); + ts.assertNotCompleted(); + assertEquals(Arrays.asList(e2), list); + } finally { + RxJavaHooks.reset(); + } + } + + @Test + public void testCollectorFailureDoesNotResultInErrorAndCompletedEmissions() { + final RuntimeException e1 = new RuntimeException(); + TestSubscriber> ts = TestSubscriber.create(); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber sub) { + sub.setProducer(new Producer() { + + @Override + public void request(long n) { + if (n > 0) { + sub.onNext(1); + sub.onCompleted(); + } + } + }); + } + }).collect(new Func0>() { + + @Override + public List call() { + return new ArrayList(); + } + }, // + new Action2, Integer>() { + + @Override + public void call(List t1, Integer t2) { + throw e1; + } + }).unsafeSubscribe(ts); + assertEquals(Arrays.asList(e1), ts.getOnErrorEvents()); + ts.assertNotCompleted(); + } + + @Test + public void testCollectorFailureDoesNotResultInErrorAndOnNextEmissions() { + final RuntimeException e1 = new RuntimeException(); + TestSubscriber> ts = TestSubscriber.create(); + final AtomicBoolean added = new AtomicBoolean(); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber sub) { + sub.setProducer(new Producer() { + + @Override + public void request(long n) { + if (n > 0) { + sub.onNext(1); + sub.onNext(2); + } + } + }); + } + }).collect(new Func0>() { + + @Override + public List call() { + return new ArrayList(); + } + }, // + new Action2, Integer>() { + boolean once = true; + @Override + public void call(List list, Integer t) { + if (once) { + once = false; + throw e1; + } else { + added.set(true); + } + } + }).unsafeSubscribe(ts); + assertEquals(Arrays.asList(e1), ts.getOnErrorEvents()); + ts.assertNoValues(); + ts.assertNotCompleted(); + assertFalse(added.get()); + } + +} diff --git a/src/test/java/rx/internal/operators/OnSubscribeReduceTest.java b/src/test/java/rx/internal/operators/OnSubscribeReduceTest.java index b09f92c158..7b0f55f6f3 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeReduceTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeReduceTest.java @@ -255,7 +255,7 @@ public Integer call(Integer a, Integer b) { assertEquals(Arrays.asList(e1), ts.getOnErrorEvents()); assertEquals(Arrays.asList(e2), list); } finally { - RxJavaHooks.setOnError(null); + RxJavaHooks.reset(); } } diff --git a/src/test/java/rx/internal/operators/OperatorAllTest.java b/src/test/java/rx/internal/operators/OperatorAllTest.java index 4bac172aae..0a22203d18 100644 --- a/src/test/java/rx/internal/operators/OperatorAllTest.java +++ b/src/test/java/rx/internal/operators/OperatorAllTest.java @@ -289,7 +289,7 @@ public Boolean call(Integer t) { assertEquals(Arrays.asList(e1), ts.getOnErrorEvents()); assertEquals(Arrays.asList(e2), list); } finally { - RxJavaHooks.setOnError(null); + RxJavaHooks.reset(); } } } diff --git a/src/test/java/rx/internal/operators/OperatorAnyTest.java b/src/test/java/rx/internal/operators/OperatorAnyTest.java index a9830c8a32..85f1cd3c93 100644 --- a/src/test/java/rx/internal/operators/OperatorAnyTest.java +++ b/src/test/java/rx/internal/operators/OperatorAnyTest.java @@ -382,7 +382,7 @@ public Boolean call(Integer t) { assertEquals(Arrays.asList(e1), ts.getOnErrorEvents()); assertEquals(Arrays.asList(e2), list); } finally { - RxJavaHooks.setOnError(null); + RxJavaHooks.reset(); } } }