From ccd24b5c3ce471428531e12737591b94c91db8bf Mon Sep 17 00:00:00 2001 From: Aaron He Date: Wed, 2 Mar 2016 15:38:01 -0800 Subject: [PATCH] Add doOnSubscribe for Single --- src/main/java/rx/Single.java | 22 +++++++++++++++++++ src/test/java/rx/SingleTest.java | 37 ++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index 20b983c063..a8a10bafb9 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -2250,6 +2250,28 @@ public void onNext(T t) { return lift(new OperatorDoOnEach(observer)); } + /** + * Modifies the source {@code Single} so that it invokes the given action when it is subscribed from + * its subscribers. Each subscription will result in an invocation of the given action except when the + * source {@code Single} is reference counted, in which case the source {@code Single} will invoke + * the given action for the first subscription. + *

+ * + *

+ *
Scheduler:
+ *
{@code doOnSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param subscribe + * the action that gets called when an observer subscribes to this {@code Single} + * @return the source {@code Single} modified so as to call this Action when appropriate + * @see ReactiveX operators documentation: Do + */ + @Experimental + public final Single doOnSubscribe(final Action0 subscribe) { + return lift(new OperatorDoOnSubscribe(subscribe)); + } + /** * Returns an Single that emits the items emitted by the source Single shifted forward in time by a * specified delay. Error notifications from the source Single are not delayed. diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index 3ce86e9772..17794e4dbb 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -878,6 +878,43 @@ public void doOnSuccessShouldNotSwallowExceptionThrownByAction() { verify(action).call(eq("value")); } + @Test + public void doOnSubscribeShouldInvokeAction() { + Action0 action = mock(Action0.class); + Single single = Single.just(1).doOnSubscribe(action); + + verifyZeroInteractions(action); + + single.subscribe(); + single.subscribe(); + + verify(action, times(2)).call(); + } + + @Test + public void doOnSubscribeShouldInvokeActionBeforeSubscriberSubscribes() { + final List callSequence = new ArrayList(2); + + Single single = Single.create(new OnSubscribe() { + @Override + public void call(SingleSubscriber singleSubscriber) { + callSequence.add("onSubscribe"); + singleSubscriber.onSuccess(1); + } + }).doOnSubscribe(new Action0() { + @Override + public void call() { + callSequence.add("doOnSubscribe"); + } + }); + + single.subscribe(); + + assertEquals(2, callSequence.size()); + assertEquals("doOnSubscribe", callSequence.get(0)); + assertEquals("onSubscribe", callSequence.get(1)); + } + @Test public void delayWithSchedulerShouldDelayCompletion() { TestScheduler scheduler = new TestScheduler();