Skip to content

Commit e0fb980

Browse files
committed
Drop volatile in favor of failing fast if not subscribed from UI thread
1 parent 06c6229 commit e0fb980

File tree

1 file changed

+29
-2
lines changed

1 file changed

+29
-2
lines changed

rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperationObserveFromAndroidComponent.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,14 @@
2424

2525
import android.app.Activity;
2626
import android.app.Fragment;
27+
import android.os.Looper;
2728
import android.util.Log;
2829

2930
import java.lang.reflect.Field;
31+
import java.util.concurrent.Callable;
32+
import java.util.concurrent.Executors;
33+
import java.util.concurrent.Future;
34+
import java.util.concurrent.TimeUnit;
3035

3136
public class OperationObserveFromAndroidComponent {
3237

@@ -47,8 +52,8 @@ private static abstract class OnSubscribeBase<T, AndroidComponent> implements Ob
4752
private static final String LOG_TAG = OperationObserveFromAndroidComponent.class.getSimpleName();
4853

4954
private final Observable<T> source;
50-
private volatile AndroidComponent componentRef;
51-
private volatile Observer<? super T> observerRef;
55+
private AndroidComponent componentRef;
56+
private Observer<? super T> observerRef;
5257

5358
private OnSubscribeBase(Observable<T> source, AndroidComponent component) {
5459
this.source = source;
@@ -65,6 +70,7 @@ private void log(String message) {
6570

6671
@Override
6772
public Subscription onSubscribe(Observer<? super T> observer) {
73+
assertUiThread();
6874
observerRef = observer;
6975
final Subscription sourceSub = source.observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<T>() {
7076
@Override
@@ -111,6 +117,12 @@ private void releaseReferences() {
111117
observerRef = null;
112118
componentRef = null;
113119
}
120+
121+
private void assertUiThread() {
122+
if (Looper.getMainLooper() != Looper.myLooper()) {
123+
throw new IllegalStateException("Observers must subscribe from the main UI thread, but was " + Thread.currentThread());
124+
}
125+
}
114126
}
115127

116128
private static final class OnSubscribeFragment<T> extends OnSubscribeBase<T, android.app.Fragment> {
@@ -171,6 +183,21 @@ public void setupMocks() {
171183
when(mockFragment.isAdded()).thenReturn(true);
172184
}
173185

186+
@Test
187+
public void itThrowsIfObserverSubscribesFromBackgroundThread() throws Exception {
188+
final Future<Object> future = Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
189+
@Override
190+
public Object call() throws Exception {
191+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(
192+
mockObservable, mockFragment).subscribe(mockObserver);
193+
return null;
194+
}
195+
});
196+
future.get(1, TimeUnit.SECONDS);
197+
verify(mockObserver).onError(any(IllegalStateException.class));
198+
verifyNoMoreInteractions(mockObserver);
199+
}
200+
174201
@Test
175202
public void itObservesTheSourceSequenceOnTheMainUIThread() {
176203
OperationObserveFromAndroidComponent.observeFromAndroidComponent(mockObservable, mockFragment).subscribe(mockObserver);

0 commit comments

Comments
 (0)