From cf4246ba868c4ec6f0a3eff1629008a20eab5e6b Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 11 Nov 2016 10:01:02 +0100 Subject: [PATCH] 2.x: add doAfterNext & doAfterSuccess to the other types --- src/main/java/io/reactivex/Maybe.java | 19 ++ src/main/java/io/reactivex/Observable.java | 21 ++ src/main/java/io/reactivex/Single.java | 19 ++ .../operators/maybe/MaybeDoAfterSuccess.java | 99 +++++++ .../observable/ObservableDoAfterNext.java | 77 +++++ .../single/SingleDoAfterSuccess.java | 96 ++++++ .../maybe/MaybeDoAfterSuccessTest.java | 151 ++++++++++ .../observable/ObservableDoAfterNextTest.java | 275 ++++++++++++++++++ .../single/SingleDoAfterSuccessTest.java | 130 +++++++++ 9 files changed, 887 insertions(+) create mode 100644 src/main/java/io/reactivex/internal/operators/maybe/MaybeDoAfterSuccess.java create mode 100644 src/main/java/io/reactivex/internal/operators/observable/ObservableDoAfterNext.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleDoAfterSuccess.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeDoAfterSuccessTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/observable/ObservableDoAfterNextTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/single/SingleDoAfterSuccessTest.java diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index fd92edd8e8..5ad99eea94 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -2273,6 +2273,25 @@ public final Maybe delaySubscription(long delay, TimeUnit unit, Scheduler sch return delaySubscription(Flowable.timer(delay, unit, scheduler)); } + /** + * Calls the specified consumer with the success item after this item has been emitted to the downstream. + *

Note that the {@code onAfterNext} action is shared between subscriptions and as such + * should be thread-safe. + *

+ *
Scheduler:
+ *
{@code doAfterSuccess} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param onAfterSuccess the Consumer that will be called after emitting an item from upstream to the downstream + * @return the new Maybe instance + * @since 2.0.1 - experimental + */ + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Maybe doAfterSuccess(Consumer onAfterSuccess) { + ObjectHelper.requireNonNull(onAfterSuccess, "doAfterSuccess is null"); + return RxJavaPlugins.onAssembly(new MaybeDoAfterSuccess(this, onAfterSuccess)); + } + /** * Registers an {@link Action} to be called when this Maybe invokes either * {@link MaybeObserver#onComplete onSuccess}, diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 2db7d91319..974b9498c1 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -6416,6 +6416,27 @@ public final Observable distinctUntilChanged(BiPredicate(this, Functions.identity(), comparer)); } + /** + * Calls the specified consumer with the current item after this item has been emitted to the downstream. + *

Note that the {@code onAfterNext} action is shared between subscriptions and as such + * should be thread-safe. + *

+ *
Scheduler:
+ *
{@code doAfterNext} does not operate by default on a particular {@link Scheduler}.
+ * Operator-fusion: + *
This operator supports boundary-limited synchronous or asynchronous queue-fusion.
+ *
+ * @param onAfterNext the Consumer that will be called after emitting an item from upstream to the downstream + * @return the new Observable instance + * @since 2.0.1 - experimental + */ + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Observable doAfterNext(Consumer onAfterNext) { + ObjectHelper.requireNonNull(onAfterNext, "onAfterNext is null"); + return RxJavaPlugins.onAssembly(new ObservableDoAfterNext(this, onAfterNext)); + } + /** * Registers an {@link Action} to be called when this ObservableSource invokes either * {@link Observer#onComplete onComplete} or {@link Observer#onError onError}. diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index e9b371a179..e54dddaf1e 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -1716,6 +1716,25 @@ public final Single delaySubscription(long time, TimeUnit unit, Scheduler return delaySubscription(Observable.timer(time, unit, scheduler)); } + /** + * Calls the specified consumer with the success item after this item has been emitted to the downstream. + *

Note that the {@code doAfterSuccess} action is shared between subscriptions and as such + * should be thread-safe. + *

+ *
Scheduler:
+ *
{@code doAfterSuccess} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param onAfterSuccess the Consumer that will be called after emitting an item from upstream to the downstream + * @return the new Single instance + * @since 2.0.1 - experimental + */ + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Single doAfterSuccess(Consumer onAfterSuccess) { + ObjectHelper.requireNonNull(onAfterSuccess, "doAfterSuccess is null"); + return RxJavaPlugins.onAssembly(new SingleDoAfterSuccess(this, onAfterSuccess)); + } + /** * Calls the specified action after this Single signals onSuccess or onError or gets disposed by * the downstream. diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoAfterSuccess.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoAfterSuccess.java new file mode 100644 index 0000000000..a693c36cca --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoAfterSuccess.java @@ -0,0 +1,99 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Consumer; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Calls a consumer after pushing the current item to the downstream. + * @param the value type + * @since 2.0.1 - experimental + */ +@Experimental +public final class MaybeDoAfterSuccess extends AbstractMaybeWithUpstream { + + final Consumer onAfterSuccess; + + public MaybeDoAfterSuccess(MaybeSource source, Consumer onAfterSuccess) { + super(source); + this.onAfterSuccess = onAfterSuccess; + } + + @Override + protected void subscribeActual(MaybeObserver s) { + source.subscribe(new DoAfterObserver(s, onAfterSuccess)); + } + + static final class DoAfterObserver implements MaybeObserver, Disposable { + + final MaybeObserver actual; + + final Consumer onAfterSuccess; + + Disposable d; + + DoAfterObserver(MaybeObserver actual, Consumer onAfterSuccess) { + this.actual = actual; + this.onAfterSuccess = onAfterSuccess; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T t) { + actual.onSuccess(t); + + try { + onAfterSuccess.accept(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + // remember, onSuccess is a terminal event and we can't call onError + RxJavaPlugins.onError(ex); + } + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + + @Override + public void dispose() { + d.dispose(); + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDoAfterNext.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDoAfterNext.java new file mode 100644 index 0000000000..a14452a826 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDoAfterNext.java @@ -0,0 +1,77 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.functions.Consumer; +import io.reactivex.internal.observers.BasicFuseableObserver; + +/** + * Calls a consumer after pushing the current item to the downstream. + * @param the value type + * @since 2.0.1 - experimental + */ +@Experimental +public final class ObservableDoAfterNext extends AbstractObservableWithUpstream { + + final Consumer onAfterNext; + + public ObservableDoAfterNext(ObservableSource source, Consumer onAfterNext) { + super(source); + this.onAfterNext = onAfterNext; + } + + @Override + protected void subscribeActual(Observer s) { + source.subscribe(new DoAfterObserver(s, onAfterNext)); + } + + static final class DoAfterObserver extends BasicFuseableObserver { + + final Consumer onAfterNext; + + DoAfterObserver(Observer actual, Consumer onAfterNext) { + super(actual); + this.onAfterNext = onAfterNext; + } + + @Override + public void onNext(T t) { + actual.onNext(t); + + if (sourceMode == NONE) { + try { + onAfterNext.accept(t); + } catch (Throwable ex) { + fail(ex); + } + } + } + + @Override + public int requestFusion(int mode) { + return transitiveBoundaryFusion(mode); + } + + @Override + public T poll() throws Exception { + T v = qs.poll(); + if (v != null) { + onAfterNext.accept(v); + } + return v; + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDoAfterSuccess.java b/src/main/java/io/reactivex/internal/operators/single/SingleDoAfterSuccess.java new file mode 100644 index 0000000000..e000a8f0c8 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDoAfterSuccess.java @@ -0,0 +1,96 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Consumer; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Calls a consumer after pushing the current item to the downstream. + * @param the value type + * @since 2.0.1 - experimental + */ +@Experimental +public final class SingleDoAfterSuccess extends Single { + + final SingleSource source; + + final Consumer onAfterSuccess; + + public SingleDoAfterSuccess(SingleSource source, Consumer onAfterSuccess) { + this.source = source; + this.onAfterSuccess = onAfterSuccess; + } + + @Override + protected void subscribeActual(SingleObserver s) { + source.subscribe(new DoAfterObserver(s, onAfterSuccess)); + } + + static final class DoAfterObserver implements SingleObserver, Disposable { + + final SingleObserver actual; + + final Consumer onAfterSuccess; + + Disposable d; + + DoAfterObserver(SingleObserver actual, Consumer onAfterSuccess) { + this.actual = actual; + this.onAfterSuccess = onAfterSuccess; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T t) { + actual.onSuccess(t); + + try { + onAfterSuccess.accept(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + // remember, onSuccess is a terminal event and we can't call onError + RxJavaPlugins.onError(ex); + } + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void dispose() { + d.dispose(); + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeDoAfterSuccessTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeDoAfterSuccessTest.java new file mode 100644 index 0000000000..62ec635594 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeDoAfterSuccessTest.java @@ -0,0 +1,151 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import static org.junit.Assert.*; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.PublishSubject; + +public class MaybeDoAfterSuccessTest { + + final List values = new ArrayList(); + + final Consumer afterSuccess = new Consumer() { + @Override + public void accept(Integer e) throws Exception { + values.add(-e); + } + }; + + final TestObserver ts = new TestObserver() { + @Override + public void onNext(Integer t) { + super.onNext(t); + MaybeDoAfterSuccessTest.this.values.add(t); + } + }; + + @Test + public void just() { + Maybe.just(1) + .doAfterSuccess(afterSuccess) + .subscribeWith(ts) + .assertResult(1); + + assertEquals(Arrays.asList(1, -1), values); + } + + @Test + public void error() { + Maybe.error(new TestException()) + .doAfterSuccess(afterSuccess) + .subscribeWith(ts) + .assertFailure(TestException.class); + + assertTrue(values.isEmpty()); + } + + @Test + public void empty() { + Maybe.empty() + .doAfterSuccess(afterSuccess) + .subscribeWith(ts) + .assertResult(); + + assertTrue(values.isEmpty()); + } + + @Test(expected = NullPointerException.class) + public void consumerNull() { + Maybe.just(1).doAfterSuccess(null); + } + + @Test + public void justConditional() { + Maybe.just(1) + .doAfterSuccess(afterSuccess) + .filter(Functions.alwaysTrue()) + .subscribeWith(ts) + .assertResult(1); + + assertEquals(Arrays.asList(1, -1), values); + } + + @Test + public void errorConditional() { + Maybe.error(new TestException()) + .doAfterSuccess(afterSuccess) + .filter(Functions.alwaysTrue()) + .subscribeWith(ts) + .assertFailure(TestException.class); + + assertTrue(values.isEmpty()); + } + + @Test + public void emptyConditional() { + Maybe.empty() + .doAfterSuccess(afterSuccess) + .filter(Functions.alwaysTrue()) + .subscribeWith(ts) + .assertResult(); + + assertTrue(values.isEmpty()); + } + + @Test + public void consumerThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + Maybe.just(1) + .doAfterSuccess(new Consumer() { + @Override + public void accept(Integer e) throws Exception { + throw new TestException(); + } + }) + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().singleElement().doAfterSuccess(afterSuccess)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Maybe m) throws Exception { + return m.doAfterSuccess(afterSuccess); + } + }); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDoAfterNextTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDoAfterNextTest.java new file mode 100644 index 0000000000..37140ecd8e --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDoAfterNextTest.java @@ -0,0 +1,275 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import static org.junit.Assert.*; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.Observable; +import io.reactivex.TestHelper; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Consumer; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.fuseable.QueueSubscription; +import io.reactivex.observers.*; +import io.reactivex.processors.UnicastProcessor; +import io.reactivex.subscribers.*; + +public class ObservableDoAfterNextTest { + + final List values = new ArrayList(); + + final Consumer afterNext = new Consumer() { + @Override + public void accept(Integer e) throws Exception { + values.add(-e); + } + }; + + final TestObserver ts = new TestObserver() { + @Override + public void onNext(Integer t) { + super.onNext(t); + ObservableDoAfterNextTest.this.values.add(t); + } + }; + + @Test + public void just() { + Observable.just(1) + .doAfterNext(afterNext) + .subscribeWith(ts) + .assertResult(1); + + assertEquals(Arrays.asList(1, -1), values); + } + + @Test + public void range() { + Observable.range(1, 5) + .doAfterNext(afterNext) + .subscribeWith(ts) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values); + } + + @Test + public void error() { + Observable.error(new TestException()) + .doAfterNext(afterNext) + .subscribeWith(ts) + .assertFailure(TestException.class); + + assertTrue(values.isEmpty()); + } + + @Test + public void empty() { + Observable.empty() + .doAfterNext(afterNext) + .subscribeWith(ts) + .assertResult(); + + assertTrue(values.isEmpty()); + } + + @Test + public void syncFused() { + TestObserver ts0 = ObserverFusion.newTest(QueueSubscription.SYNC); + + Observable.range(1, 5) + .doAfterNext(afterNext) + .subscribe(ts0); + + ObserverFusion.assertFusion(ts0, QueueSubscription.SYNC) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values); + } + + @Test + public void asyncFusedRejected() { + TestObserver ts0 = ObserverFusion.newTest(QueueSubscription.ASYNC); + + Observable.range(1, 5) + .doAfterNext(afterNext) + .subscribe(ts0); + + ObserverFusion.assertFusion(ts0, QueueSubscription.NONE) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values); + } + + @Test + public void asyncFused() { + TestSubscriber ts0 = SubscriberFusion.newTest(QueueSubscription.ASYNC); + + UnicastProcessor up = UnicastProcessor.create(); + + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .doAfterNext(afterNext) + .subscribe(ts0); + + SubscriberFusion.assertFusion(ts0, QueueSubscription.ASYNC) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values); + } + + @Test(expected = NullPointerException.class) + public void consumerNull() { + Observable.just(1).doAfterNext(null); + } + + @Test + public void justConditional() { + Observable.just(1) + .doAfterNext(afterNext) + .filter(Functions.alwaysTrue()) + .subscribeWith(ts) + .assertResult(1); + + assertEquals(Arrays.asList(1, -1), values); + } + + @Test + public void rangeConditional() { + Observable.range(1, 5) + .doAfterNext(afterNext) + .filter(Functions.alwaysTrue()) + .subscribeWith(ts) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values); + } + + @Test + public void errorConditional() { + Observable.error(new TestException()) + .doAfterNext(afterNext) + .filter(Functions.alwaysTrue()) + .subscribeWith(ts) + .assertFailure(TestException.class); + + assertTrue(values.isEmpty()); + } + + @Test + public void emptyConditional() { + Observable.empty() + .doAfterNext(afterNext) + .filter(Functions.alwaysTrue()) + .subscribeWith(ts) + .assertResult(); + + assertTrue(values.isEmpty()); + } + + @Test + public void syncFusedConditional() { + TestObserver ts0 = ObserverFusion.newTest(QueueSubscription.SYNC); + + Observable.range(1, 5) + .doAfterNext(afterNext) + .filter(Functions.alwaysTrue()) + .subscribe(ts0); + + ObserverFusion.assertFusion(ts0, QueueSubscription.SYNC) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values); + } + + @Test + public void asyncFusedRejectedConditional() { + TestObserver ts0 = ObserverFusion.newTest(QueueSubscription.ASYNC); + + Observable.range(1, 5) + .doAfterNext(afterNext) + .filter(Functions.alwaysTrue()) + .subscribe(ts0); + + ObserverFusion.assertFusion(ts0, QueueSubscription.NONE) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values); + } + + @Test + public void asyncFusedConditional() { + TestSubscriber ts0 = SubscriberFusion.newTest(QueueSubscription.ASYNC); + + UnicastProcessor up = UnicastProcessor.create(); + + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .doAfterNext(afterNext) + .filter(Functions.alwaysTrue()) + .subscribe(ts0); + + SubscriberFusion.assertFusion(ts0, QueueSubscription.ASYNC) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values); + } + + @Test + public void consumerThrows() { + Observable.just(1, 2) + .doAfterNext(new Consumer() { + @Override + public void accept(Integer e) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class, 1); + } + + @Test + public void consumerThrowsConditional() { + Observable.just(1, 2) + .doAfterNext(new Consumer() { + @Override + public void accept(Integer e) throws Exception { + throw new TestException(); + } + }) + .filter(Functions.alwaysTrue()) + .test() + .assertFailure(TestException.class, 1); + } + + @Test + public void consumerThrowsConditional2() { + Observable.just(1, 2).hide() + .doAfterNext(new Consumer() { + @Override + public void accept(Integer e) throws Exception { + throw new TestException(); + } + }) + .filter(Functions.alwaysTrue()) + .test() + .assertFailure(TestException.class, 1); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleDoAfterSuccessTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleDoAfterSuccessTest.java new file mode 100644 index 0000000000..897191ca9f --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleDoAfterSuccessTest.java @@ -0,0 +1,130 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import static org.junit.Assert.*; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.PublishSubject; + +public class SingleDoAfterSuccessTest { + + final List values = new ArrayList(); + + final Consumer afterSuccess = new Consumer() { + @Override + public void accept(Integer e) throws Exception { + values.add(-e); + } + }; + + final TestObserver ts = new TestObserver() { + @Override + public void onNext(Integer t) { + super.onNext(t); + SingleDoAfterSuccessTest.this.values.add(t); + } + }; + + @Test + public void just() { + Single.just(1) + .doAfterSuccess(afterSuccess) + .subscribeWith(ts) + .assertResult(1); + + assertEquals(Arrays.asList(1, -1), values); + } + + @Test + public void error() { + Single.error(new TestException()) + .doAfterSuccess(afterSuccess) + .subscribeWith(ts) + .assertFailure(TestException.class); + + assertTrue(values.isEmpty()); + } + + @Test(expected = NullPointerException.class) + public void consumerNull() { + Single.just(1).doAfterSuccess(null); + } + + @Test + public void justConditional() { + Single.just(1) + .doAfterSuccess(afterSuccess) + .filter(Functions.alwaysTrue()) + .subscribeWith(ts) + .assertResult(1); + + assertEquals(Arrays.asList(1, -1), values); + } + + @Test + public void errorConditional() { + Single.error(new TestException()) + .doAfterSuccess(afterSuccess) + .filter(Functions.alwaysTrue()) + .subscribeWith(ts) + .assertFailure(TestException.class); + + assertTrue(values.isEmpty()); + } + + @Test + public void consumerThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + Single.just(1) + .doAfterSuccess(new Consumer() { + @Override + public void accept(Integer e) throws Exception { + throw new TestException(); + } + }) + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().singleOrError().doAfterSuccess(afterSuccess)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Single m) throws Exception { + return m.doAfterSuccess(afterSuccess); + } + }); + } +}