Skip to content

Commit 31d0ab4

Browse files
committed
1.x: Added Single execution hooks
1 parent 9f49624 commit 31d0ab4

File tree

5 files changed

+207
-3
lines changed

5 files changed

+207
-3
lines changed

src/main/java/rx/Single.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import rx.internal.util.ScalarSynchronousSingle;
2424
import rx.internal.util.UtilityFunctions;
2525
import rx.observers.SafeSubscriber;
26-
import rx.plugins.RxJavaObservableExecutionHook;
2726
import rx.plugins.RxJavaPlugins;
27+
import rx.plugins.RxJavaSingleExecutionHook;
2828
import rx.schedulers.Schedulers;
2929
import rx.singles.BlockingSingle;
3030
import rx.subscriptions.Subscriptions;
@@ -100,7 +100,7 @@ private Single(final Observable.OnSubscribe<T> f) {
100100
this.onSubscribe = f;
101101
}
102102

103-
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
103+
static final RxJavaSingleExecutionHook hook = RxJavaPlugins.getInstance().getSingleExecutionHook();
104104

105105
/**
106106
* Returns a Single that will execute the specified function when a {@link SingleSubscriber} executes it or
@@ -129,7 +129,7 @@ private Single(final Observable.OnSubscribe<T> f) {
129129
* @see <a href="http://reactivex.io/documentation/operators/create.html">ReactiveX operators documentation: Create</a>
130130
*/
131131
public static <T> Single<T> create(OnSubscribe<T> f) {
132-
return new Single<T>(f); // TODO need hook
132+
return new Single<T>(hook.onCreate(f));
133133
}
134134

135135
/**

src/main/java/rx/plugins/RxJavaPlugins.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class RxJavaPlugins {
5050

5151
private final AtomicReference<RxJavaErrorHandler> errorHandler = new AtomicReference<RxJavaErrorHandler>();
5252
private final AtomicReference<RxJavaObservableExecutionHook> observableExecutionHook = new AtomicReference<RxJavaObservableExecutionHook>();
53+
private final AtomicReference<RxJavaSingleExecutionHook> singleExecutionHook = new AtomicReference<RxJavaSingleExecutionHook>();
5354
private final AtomicReference<RxJavaSchedulersHook> schedulersHook = new AtomicReference<RxJavaSchedulersHook>();
5455

5556
/**
@@ -156,6 +157,48 @@ public void registerObservableExecutionHook(RxJavaObservableExecutionHook impl)
156157
}
157158
}
158159

160+
/**
161+
* Retrieves the instance of {@link RxJavaSingleExecutionHook} to use based on order of precedence as
162+
* defined in {@link RxJavaPlugins} class header.
163+
* <p>
164+
* Override the default by calling {@link #registerSingleExecutionHook(RxJavaSingleExecutionHook)}
165+
* or by setting the property {@code rxjava.plugin.RxJavaSingleExecutionHook.implementation} with the
166+
* full classname to load.
167+
*
168+
* @return {@link RxJavaSingleExecutionHook} implementation to use
169+
*/
170+
public RxJavaSingleExecutionHook getSingleExecutionHook() {
171+
if (singleExecutionHook.get() == null) {
172+
// check for an implementation from System.getProperty first
173+
Object impl = getPluginImplementationViaProperty(RxJavaSingleExecutionHook.class, System.getProperties());
174+
if (impl == null) {
175+
// nothing set via properties so initialize with default
176+
singleExecutionHook.compareAndSet(null, RxJavaSingleExecutionHookDefault.getInstance());
177+
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
178+
} else {
179+
// we received an implementation from the system property so use it
180+
singleExecutionHook.compareAndSet(null, (RxJavaSingleExecutionHook) impl);
181+
}
182+
}
183+
return singleExecutionHook.get();
184+
}
185+
186+
/**
187+
* Register an {@link RxJavaSingleExecutionHook} implementation as a global override of any injected or
188+
* default implementations.
189+
*
190+
* @param impl
191+
* {@link RxJavaSingleExecutionHook} implementation
192+
* @throws IllegalStateException
193+
* if called more than once or after the default was initialized (if usage occurs before trying
194+
* to register)
195+
*/
196+
public void registerSingleExecutionHook(RxJavaSingleExecutionHook impl) {
197+
if (!singleExecutionHook.compareAndSet(null, impl)) {
198+
throw new IllegalStateException("Another strategy was already registered: " + singleExecutionHook.get());
199+
}
200+
}
201+
159202
/* test */ static Object getPluginImplementationViaProperty(Class<?> pluginClass, Properties props) {
160203
final String classSimpleName = pluginClass.getSimpleName();
161204
/*
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.plugins;
17+
18+
import rx.Observable;
19+
import rx.Single;
20+
import rx.Subscriber;
21+
import rx.Subscription;
22+
import rx.functions.Func1;
23+
24+
/**
25+
* Abstract ExecutionHook with invocations at different lifecycle points of {@link Single} execution with a
26+
* default no-op implementation.
27+
* <p>
28+
* See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins:
29+
* <a href="https://github.com/ReactiveX/RxJava/wiki/Plugins">https://github.com/ReactiveX/RxJava/wiki/Plugins</a>.
30+
* <p>
31+
* <b>Note on thread-safety and performance:</b>
32+
* <p>
33+
* A single implementation of this class will be used globally so methods on this class will be invoked
34+
* concurrently from multiple threads so all functionality must be thread-safe.
35+
* <p>
36+
* Methods are also invoked synchronously and will add to execution time of the single so all behavior
37+
* should be fast. If anything time-consuming is to be done it should be spawned asynchronously onto separate
38+
* worker threads.
39+
*
40+
*/
41+
public abstract class RxJavaSingleExecutionHook {
42+
/**
43+
* Invoked during the construction by {@link Single#create(Single.OnSubscribe)}
44+
* <p>
45+
* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
46+
* logging, metrics and other such things and pass-thru the function.
47+
*
48+
* @param f
49+
* original {@link Single.OnSubscribe}<{@code T}> to be executed
50+
* @return {@link Single.OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
51+
* returned as a pass-thru
52+
*/
53+
public <T> Single.OnSubscribe<T> onCreate(Single.OnSubscribe<T> f) {
54+
return f;
55+
}
56+
57+
/**
58+
* Invoked before {@link Single#subscribe(Subscriber)} is about to be executed.
59+
* <p>
60+
* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
61+
* logging, metrics and other such things and pass-thru the function.
62+
*
63+
* @param onSubscribe
64+
* original {@link Single.OnSubscribe}<{@code T}> to be executed
65+
* @return {@link Single.OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
66+
* returned as a pass-thru
67+
*/
68+
public <T> Single.OnSubscribe<T> onSubscribeStart(Single<? extends T> singleInstance, final Single.OnSubscribe<T> onSubscribe) {
69+
// pass-thru by default
70+
return onSubscribe;
71+
}
72+
73+
/**
74+
* Invoked after successful execution of {@link Single#subscribe(Subscriber)} with returned
75+
* {@link Subscription}.
76+
* <p>
77+
* This can be used to decorate or replace the {@link Subscription} instance or just perform extra logging,
78+
* metrics and other such things and pass-thru the subscription.
79+
*
80+
* @param subscription
81+
* original {@link Subscription}
82+
* @return {@link Subscription} subscription that can be modified, decorated, replaced or just returned as a
83+
* pass-thru
84+
*/
85+
public <T> Subscription onSubscribeReturn(Subscription subscription) {
86+
// pass-thru by default
87+
return subscription;
88+
}
89+
90+
/**
91+
* Invoked after failed execution of {@link Single#subscribe(Subscriber)} with thrown Throwable.
92+
* <p>
93+
* This is <em>not</em> errors emitted via {@link Subscriber#onError(Throwable)} but exceptions thrown when
94+
* attempting to subscribe to a {@link Func1}<{@link Subscriber}{@code <T>}, {@link Subscription}>.
95+
*
96+
* @param e
97+
* Throwable thrown by {@link Single#subscribe(Subscriber)}
98+
* @return Throwable that can be decorated, replaced or just returned as a pass-thru
99+
*/
100+
public <T> Throwable onSubscribeError(Throwable e) {
101+
// pass-thru by default
102+
return e;
103+
}
104+
105+
/**
106+
* Invoked just as the operator functions is called to bind two operations together into a new
107+
* {@link Single} and the return value is used as the lifted function
108+
* <p>
109+
* This can be used to decorate or replace the {@link Observable.Operator} instance or just perform extra
110+
* logging, metrics and other such things and pass-thru the onSubscribe.
111+
*
112+
* @param lift
113+
* original {@link Observable.Operator}{@code <R, T>}
114+
* @return {@link Observable.Operator}{@code <R, T>} function that can be modified, decorated, replaced or just
115+
* returned as a pass-thru
116+
*/
117+
public <T, R> Observable.Operator<? extends R, ? super T> onLift(final Observable.Operator<? extends R, ? super T> lift) {
118+
return lift;
119+
}
120+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.plugins;
17+
18+
/**
19+
* Default no-op implementation of {@link RxJavaSingleExecutionHook}
20+
*/
21+
/* package */class RxJavaSingleExecutionHookDefault extends RxJavaSingleExecutionHook {
22+
23+
private static RxJavaSingleExecutionHookDefault INSTANCE = new RxJavaSingleExecutionHookDefault();
24+
25+
public static RxJavaSingleExecutionHook getInstance() {
26+
return INSTANCE;
27+
}
28+
}

src/test/java/rx/plugins/RxJavaPluginsTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,14 @@ public void testObservableExecutionHookViaRegisterMethod() {
107107
assertTrue(impl instanceof RxJavaObservableExecutionHookTestImpl);
108108
}
109109

110+
@Test
111+
public void testSingleExecutionHookViaRegisterMethod() {
112+
RxJavaPlugins p = new RxJavaPlugins();
113+
p.registerSingleExecutionHook(new RxJavaSingleExecutionHookTestImpl());
114+
RxJavaSingleExecutionHook impl = p.getSingleExecutionHook();
115+
assertTrue(impl instanceof RxJavaSingleExecutionHookTestImpl);
116+
}
117+
110118
@Test
111119
public void testObservableExecutionHookViaProperty() {
112120
try {
@@ -238,6 +246,11 @@ public static class RxJavaObservableExecutionHookTestImpl extends RxJavaObservab
238246
// just use defaults
239247
}
240248

249+
// inside test so it is stripped from Javadocs
250+
public static class RxJavaSingleExecutionHookTestImpl extends RxJavaSingleExecutionHook {
251+
// just use defaults
252+
}
253+
241254
private static String getFullClassNameForTestClass(Class<?> cls) {
242255
return RxJavaPlugins.class.getPackage()
243256
.getName() + "." + RxJavaPluginsTest.class.getSimpleName() + "$" + cls.getSimpleName();

0 commit comments

Comments
 (0)