diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index bacdd90667..d81bd3bd9f 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -29,6 +29,7 @@ import rx.observers.SerializedSubscriber; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaSingleExecutionHook; import rx.schedulers.Schedulers; import rx.singles.BlockingSingle; import rx.subscriptions.Subscriptions; @@ -101,7 +102,7 @@ private Single(final Observable.OnSubscribe f) { this.onSubscribe = f; } - static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); + static RxJavaSingleExecutionHook hook = RxJavaPlugins.getInstance().getSingleExecutionHook(); /** * Returns a Single that will execute the specified function when a {@link SingleSubscriber} executes it or @@ -130,7 +131,7 @@ private Single(final Observable.OnSubscribe f) { * @see ReactiveX operators documentation: Create */ public static Single create(OnSubscribe f) { - return new Single(f); // TODO need hook + return new Single(hook.onCreate(f)); } /** @@ -1570,14 +1571,12 @@ public final void onNext(T args) { * @param subscriber * the Subscriber that will handle the emission or notification from the Single */ - public final void unsafeSubscribe(Subscriber subscriber) { + public final Subscription unsafeSubscribe(Subscriber subscriber) { try { // new Subscriber so onStart it subscriber.onStart(); - // TODO add back the hook - // hook.onSubscribeStart(this, onSubscribe).call(subscriber); - onSubscribe.call(subscriber); - hook.onSubscribeReturn(subscriber); + hook.onSubscribeStart(this, onSubscribe).call(subscriber); + return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); @@ -1594,6 +1593,7 @@ public final void unsafeSubscribe(Subscriber subscriber) { // TODO why aren't we throwing the hook's return value. throw r; } + return Subscriptions.unsubscribed(); } } @@ -1685,9 +1685,7 @@ public final Subscription subscribe(Subscriber subscriber) { // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks. try { // allow the hook to intercept and/or decorate - // TODO add back the hook - // hook.onSubscribeStart(this, onSubscribe).call(subscriber); - onSubscribe.call(subscriber); + hook.onSubscribeStart(this, onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types diff --git a/src/main/java/rx/plugins/RxJavaPlugins.java b/src/main/java/rx/plugins/RxJavaPlugins.java index 09e542779d..9678a32e15 100644 --- a/src/main/java/rx/plugins/RxJavaPlugins.java +++ b/src/main/java/rx/plugins/RxJavaPlugins.java @@ -50,6 +50,7 @@ public class RxJavaPlugins { private final AtomicReference errorHandler = new AtomicReference(); private final AtomicReference observableExecutionHook = new AtomicReference(); + private final AtomicReference singleExecutionHook = new AtomicReference(); private final AtomicReference schedulersHook = new AtomicReference(); /** @@ -68,6 +69,7 @@ public static RxJavaPlugins getInstance() { /* package accessible for unit tests */void reset() { INSTANCE.errorHandler.set(null); INSTANCE.observableExecutionHook.set(null); + INSTANCE.singleExecutionHook.set(null); INSTANCE.schedulersHook.set(null); } @@ -156,6 +158,48 @@ public void registerObservableExecutionHook(RxJavaObservableExecutionHook impl) } } + /** + * Retrieves the instance of {@link RxJavaSingleExecutionHook} to use based on order of precedence as + * defined in {@link RxJavaPlugins} class header. + *

+ * Override the default by calling {@link #registerSingleExecutionHook(RxJavaSingleExecutionHook)} + * or by setting the property {@code rxjava.plugin.RxJavaSingleExecutionHook.implementation} with the + * full classname to load. + * + * @return {@link RxJavaSingleExecutionHook} implementation to use + */ + public RxJavaSingleExecutionHook getSingleExecutionHook() { + if (singleExecutionHook.get() == null) { + // check for an implementation from System.getProperty first + Object impl = getPluginImplementationViaProperty(RxJavaSingleExecutionHook.class, System.getProperties()); + if (impl == null) { + // nothing set via properties so initialize with default + singleExecutionHook.compareAndSet(null, RxJavaSingleExecutionHookDefault.getInstance()); + // we don't return from here but call get() again in case of thread-race so the winner will always get returned + } else { + // we received an implementation from the system property so use it + singleExecutionHook.compareAndSet(null, (RxJavaSingleExecutionHook) impl); + } + } + return singleExecutionHook.get(); + } + + /** + * Register an {@link RxJavaSingleExecutionHook} implementation as a global override of any injected or + * default implementations. + * + * @param impl + * {@link RxJavaSingleExecutionHook} implementation + * @throws IllegalStateException + * if called more than once or after the default was initialized (if usage occurs before trying + * to register) + */ + public void registerSingleExecutionHook(RxJavaSingleExecutionHook impl) { + if (!singleExecutionHook.compareAndSet(null, impl)) { + throw new IllegalStateException("Another strategy was already registered: " + singleExecutionHook.get()); + } + } + /* test */ static Object getPluginImplementationViaProperty(Class pluginClass, Properties props) { final String classSimpleName = pluginClass.getSimpleName(); /* diff --git a/src/main/java/rx/plugins/RxJavaSingleExecutionHook.java b/src/main/java/rx/plugins/RxJavaSingleExecutionHook.java new file mode 100644 index 0000000000..9fce6531f3 --- /dev/null +++ b/src/main/java/rx/plugins/RxJavaSingleExecutionHook.java @@ -0,0 +1,120 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.plugins; + +import rx.Observable; +import rx.Single; +import rx.Subscriber; +import rx.Subscription; +import rx.functions.Func1; + +/** + * Abstract ExecutionHook with invocations at different lifecycle points of {@link Single} execution with a + * default no-op implementation. + *

+ * See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins: + * https://github.com/ReactiveX/RxJava/wiki/Plugins. + *

+ * Note on thread-safety and performance: + *

+ * A single implementation of this class will be used globally so methods on this class will be invoked + * concurrently from multiple threads so all functionality must be thread-safe. + *

+ * Methods are also invoked synchronously and will add to execution time of the single so all behavior + * should be fast. If anything time-consuming is to be done it should be spawned asynchronously onto separate + * worker threads. + * + */ +public abstract class RxJavaSingleExecutionHook { + /** + * Invoked during the construction by {@link Single#create(Single.OnSubscribe)} + *

+ * This can be used to decorate or replace the onSubscribe function or just perform extra + * logging, metrics and other such things and pass-thru the function. + * + * @param f + * original {@link Single.OnSubscribe}<{@code T}> to be executed + * @return {@link Single.OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just + * returned as a pass-thru + */ + public Single.OnSubscribe onCreate(Single.OnSubscribe f) { + return f; + } + + /** + * Invoked before {@link Single#subscribe(Subscriber)} is about to be executed. + *

+ * This can be used to decorate or replace the onSubscribe function or just perform extra + * logging, metrics and other such things and pass-thru the function. + * + * @param onSubscribe + * original {@link Observable.OnSubscribe}<{@code T}> to be executed + * @return {@link Observable.OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just + * returned as a pass-thru + */ + public Observable.OnSubscribe onSubscribeStart(Single singleInstance, final Observable.OnSubscribe onSubscribe) { + // pass-thru by default + return onSubscribe; + } + + /** + * Invoked after successful execution of {@link Single#subscribe(Subscriber)} with returned + * {@link Subscription}. + *

+ * This can be used to decorate or replace the {@link Subscription} instance or just perform extra logging, + * metrics and other such things and pass-thru the subscription. + * + * @param subscription + * original {@link Subscription} + * @return {@link Subscription} subscription that can be modified, decorated, replaced or just returned as a + * pass-thru + */ + public Subscription onSubscribeReturn(Subscription subscription) { + // pass-thru by default + return subscription; + } + + /** + * Invoked after failed execution of {@link Single#subscribe(Subscriber)} with thrown Throwable. + *

+ * This is not errors emitted via {@link Subscriber#onError(Throwable)} but exceptions thrown when + * attempting to subscribe to a {@link Func1}<{@link Subscriber}{@code }, {@link Subscription}>. + * + * @param e + * Throwable thrown by {@link Single#subscribe(Subscriber)} + * @return Throwable that can be decorated, replaced or just returned as a pass-thru + */ + public Throwable onSubscribeError(Throwable e) { + // pass-thru by default + return e; + } + + /** + * Invoked just as the operator functions is called to bind two operations together into a new + * {@link Single} and the return value is used as the lifted function + *

+ * This can be used to decorate or replace the {@link Observable.Operator} instance or just perform extra + * logging, metrics and other such things and pass-thru the onSubscribe. + * + * @param lift + * original {@link Observable.Operator}{@code } + * @return {@link Observable.Operator}{@code } function that can be modified, decorated, replaced or just + * returned as a pass-thru + */ + public Observable.Operator onLift(final Observable.Operator lift) { + return lift; + } +} diff --git a/src/main/java/rx/plugins/RxJavaSingleExecutionHookDefault.java b/src/main/java/rx/plugins/RxJavaSingleExecutionHookDefault.java new file mode 100644 index 0000000000..60a382589f --- /dev/null +++ b/src/main/java/rx/plugins/RxJavaSingleExecutionHookDefault.java @@ -0,0 +1,28 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.plugins; + +/** + * Default no-op implementation of {@link RxJavaSingleExecutionHook} + */ +class RxJavaSingleExecutionHookDefault extends RxJavaSingleExecutionHook { + + private static final RxJavaSingleExecutionHookDefault INSTANCE = new RxJavaSingleExecutionHookDefault(); + + public static RxJavaSingleExecutionHook getInstance() { + return INSTANCE; + } +} diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index d2457da4e9..ce34b6a2fe 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -28,14 +29,28 @@ import rx.Single.OnSubscribe; import rx.exceptions.*; import rx.functions.*; +import rx.observers.SafeSubscriber; import rx.observers.TestSubscriber; import rx.schedulers.*; +import rx.plugins.RxJavaPluginsTest; +import rx.plugins.RxJavaSingleExecutionHook; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; import rx.singles.BlockingSingle; import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; public class SingleTest { + private static RxJavaSingleExecutionHook hookSpy; + + @Before + public void setUp() throws Exception { + hookSpy = spy( + new RxJavaPluginsTest.RxJavaSingleExecutionHookTestImpl()); + Single.hook = hookSpy; + } + @Test public void testHelloWorld() { TestSubscriber ts = new TestSubscriber(); @@ -359,6 +374,83 @@ public void testMergeWith() { ts.assertReceivedOnNext(Arrays.asList("A", "B")); } + @Test + public void testHookCreate() { + OnSubscribe subscriber = mock(OnSubscribe.class); + Single.create(subscriber); + + verify(hookSpy, times(1)).onCreate(subscriber); + } + + @Test + public void testHookSubscribeStart() { + TestSubscriber ts = new TestSubscriber(); + + Single single = Single.create(new OnSubscribe() { + @Override public void call(SingleSubscriber s) { + s.onSuccess("Hello"); + } + }); + single.subscribe(ts); + + verify(hookSpy, times(1)).onSubscribeStart(eq(single), any(Observable.OnSubscribe.class)); + } + + @Test + public void testHookUnsafeSubscribeStart() { + TestSubscriber ts = new TestSubscriber(); + Single single = Single.create(new OnSubscribe() { + @Override public void call(SingleSubscriber s) { + s.onSuccess("Hello"); + } + }); + single.unsafeSubscribe(ts); + + verify(hookSpy, times(1)).onSubscribeStart(eq(single), any(Observable.OnSubscribe.class)); + } + + @Test + public void testHookSubscribeReturn() { + TestSubscriber ts = new TestSubscriber(); + + Single single = Single.create(new OnSubscribe() { + @Override public void call(SingleSubscriber s) { + s.onSuccess("Hello"); + } + }); + single.subscribe(ts); + + verify(hookSpy, times(1)).onSubscribeReturn(any(SafeSubscriber.class)); + } + + @Test + public void testHookUnsafeSubscribeReturn() { + TestSubscriber ts = new TestSubscriber(); + + Single single = Single.create(new OnSubscribe() { + @Override public void call(SingleSubscriber s) { + s.onSuccess("Hello"); + } + }); + single.unsafeSubscribe(ts); + + verify(hookSpy, times(1)).onSubscribeReturn(ts); + } + + @Test + public void testReturnUnsubscribedWhenHookThrowsError() { + TestSubscriber ts = new TestSubscriber(); + + Single single = Single.create(new OnSubscribe() { + @Override public void call(SingleSubscriber s) { + throw new RuntimeException("Exception"); + } + }); + Subscription subscription = single.unsafeSubscribe(ts); + + assertTrue(subscription.isUnsubscribed()); + } + @Test public void testCreateSuccess() { TestSubscriber ts = new TestSubscriber(); @@ -1680,14 +1772,14 @@ public void takeUntilError_withSingle_shouldMatch() { assertFalse(until.hasObservers()); assertFalse(ts.isUnsubscribed()); } - + @Test public void subscribeWithObserver() { @SuppressWarnings("unchecked") Observer o = mock(Observer.class); - + Single.just(1).subscribe(o); - + verify(o).onNext(1); verify(o).onCompleted(); verify(o, never()).onError(any(Throwable.class)); @@ -1697,14 +1789,14 @@ public void subscribeWithObserver() { public void subscribeWithObserverAndGetError() { @SuppressWarnings("unchecked") Observer o = mock(Observer.class); - + Single.error(new TestException()).subscribe(o); - + verify(o, never()).onNext(anyInt()); verify(o, never()).onCompleted(); verify(o).onError(any(TestException.class)); } - + @Test public void subscribeWithNullObserver() { try { diff --git a/src/test/java/rx/plugins/RxJavaPluginsTest.java b/src/test/java/rx/plugins/RxJavaPluginsTest.java index e4cd9f69ae..64a1ba1d1a 100644 --- a/src/test/java/rx/plugins/RxJavaPluginsTest.java +++ b/src/test/java/rx/plugins/RxJavaPluginsTest.java @@ -16,6 +16,7 @@ package rx.plugins; import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; import java.util.*; import java.util.concurrent.TimeUnit; @@ -107,6 +108,15 @@ public void testObservableExecutionHookViaRegisterMethod() { assertTrue(impl instanceof RxJavaObservableExecutionHookTestImpl); } + @Test + public void testSingleExecutionHookViaRegisterMethod() { + RxJavaPlugins p = new RxJavaPlugins(); + RxJavaSingleExecutionHook customHook = mock(RxJavaSingleExecutionHook.class); + p.registerSingleExecutionHook(customHook); + RxJavaSingleExecutionHook impl = p.getSingleExecutionHook(); + assertSame(impl, customHook); + } + @Test public void testObservableExecutionHookViaProperty() { try { @@ -238,6 +248,11 @@ public static class RxJavaObservableExecutionHookTestImpl extends RxJavaObservab // just use defaults } + // inside test so it is stripped from Javadocs + public static class RxJavaSingleExecutionHookTestImpl extends RxJavaSingleExecutionHook { + // just use defaults + } + private static String getFullClassNameForTestClass(Class cls) { return RxJavaPlugins.class.getPackage() .getName() + "." + RxJavaPluginsTest.class.getSimpleName() + "$" + cls.getSimpleName();