Skip to content

Commit 3af1afe

Browse files
committed
Merge pull request #3504 from akarnokd/AndroidTestFixes
1.x: Test adjustments to reduce problems when tests run on Android
2 parents d3ebd70 + 2649b68 commit 3af1afe

File tree

5 files changed

+86
-54
lines changed

5 files changed

+86
-54
lines changed

src/test/java/rx/ObservableTests.java

Lines changed: 52 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -15,49 +15,25 @@
1515
*/
1616
package rx;
1717

18-
import static org.junit.Assert.assertEquals;
19-
import static org.junit.Assert.assertNotNull;
20-
import static org.junit.Assert.assertTrue;
21-
import static org.junit.Assert.fail;
22-
import static org.mockito.Matchers.any;
23-
import static org.mockito.Matchers.anyInt;
24-
import static org.mockito.Matchers.anyString;
25-
import static org.mockito.Matchers.isA;
26-
import static org.mockito.Mockito.inOrder;
27-
import static org.mockito.Mockito.mock;
28-
import static org.mockito.Mockito.never;
29-
import static org.mockito.Mockito.times;
30-
import static org.mockito.Mockito.verify;
31-
32-
import java.util.ArrayList;
33-
import java.util.Arrays;
34-
import java.util.LinkedList;
35-
import java.util.List;
36-
import java.util.NoSuchElementException;
37-
import java.util.concurrent.CountDownLatch;
38-
import java.util.concurrent.TimeUnit;
39-
import java.util.concurrent.atomic.AtomicInteger;
40-
import java.util.concurrent.atomic.AtomicReference;
41-
42-
import org.junit.Before;
43-
import org.junit.Test;
44-
import org.mockito.InOrder;
45-
import org.mockito.Mock;
46-
import org.mockito.MockitoAnnotations;
47-
48-
import rx.Observable.OnSubscribe;
49-
import rx.Observable.Transformer;
18+
import static org.junit.Assert.*;
19+
import static org.mockito.Matchers.*;
20+
import static org.mockito.Mockito.*;
21+
22+
import java.lang.Thread.UncaughtExceptionHandler;
23+
import java.util.*;
24+
import java.util.concurrent.*;
25+
import java.util.concurrent.atomic.*;
26+
27+
import org.junit.*;
28+
import org.mockito.*;
29+
30+
import rx.Observable.*;
5031
import rx.exceptions.OnErrorNotImplementedException;
51-
import rx.functions.Action1;
52-
import rx.functions.Action2;
53-
import rx.functions.Func0;
54-
import rx.functions.Func1;
55-
import rx.functions.Func2;
32+
import rx.functions.*;
5633
import rx.observables.ConnectableObservable;
5734
import rx.observers.TestSubscriber;
58-
import rx.schedulers.TestScheduler;
59-
import rx.subjects.ReplaySubject;
60-
import rx.subjects.Subject;
35+
import rx.schedulers.*;
36+
import rx.subjects.*;
6137
import rx.subscriptions.BooleanSubscription;
6238

6339
public class ObservableTests {
@@ -1101,19 +1077,45 @@ public String call(Integer t1) {
11011077
}
11021078

11031079
@Test
1104-
public void testErrorThrownIssue1685() {
1080+
public void testErrorThrownIssue1685() throws Exception {
11051081
Subject<Object, Object> subject = ReplaySubject.create();
11061082

1107-
Observable.error(new RuntimeException("oops"))
1108-
.materialize()
1109-
.delay(1, TimeUnit.SECONDS)
1110-
.dematerialize()
1111-
.subscribe(subject);
1112-
1113-
subject.subscribe();
1114-
subject.materialize().toBlocking().first();
1083+
ExecutorService exec = Executors.newSingleThreadExecutor();
1084+
1085+
try {
1086+
1087+
final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
1088+
1089+
Scheduler s = Schedulers.from(exec);
1090+
exec.submit(new Runnable() {
1091+
@Override
1092+
public void run() {
1093+
Thread.currentThread().setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
1094+
@Override
1095+
public void uncaughtException(Thread t, Throwable e) {
1096+
err.set(e);
1097+
}
1098+
});
1099+
}
1100+
}).get();
1101+
1102+
Observable.error(new RuntimeException("oops"))
1103+
.materialize()
1104+
.delay(1, TimeUnit.SECONDS, s)
1105+
.dematerialize()
1106+
.subscribe(subject);
1107+
1108+
subject.subscribe();
1109+
subject.materialize().toBlocking().first();
11151110

1116-
System.out.println("Done");
1111+
Thread.sleep(1000); // the uncaught exception comes after the terminal event reaches toBlocking
1112+
1113+
assertNotNull("UncaughtExceptionHandler didn't get anything.", err.get());
1114+
1115+
System.out.println("Done");
1116+
} finally {
1117+
exec.shutdownNow();
1118+
}
11171119
}
11181120

11191121
@Test

src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,7 @@ public void testWithCombineLatestIssue1717() throws InterruptedException {
798798
final AtomicInteger count = new AtomicInteger();
799799
final int SIZE = 2000;
800800
Observable<Long> timer = Observable.interval(0, 1, TimeUnit.MILLISECONDS)
801+
.onBackpressureBuffer()
801802
.observeOn(Schedulers.newThread())
802803
.doOnEach(new Action1<Notification<? super Long>>() {
803804

src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121

2222
import java.util.*;
2323
import java.util.concurrent.*;
24-
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.concurrent.atomic.*;
2525

2626
import org.junit.*;
2727
import org.mockito.*;
2828

2929
import rx.*;
30-
import rx.Observable.OnSubscribe;
3130
import rx.Observable;
31+
import rx.Observable.OnSubscribe;
3232
import rx.Observer;
3333
import rx.functions.*;
3434
import rx.observers.*;
@@ -528,6 +528,10 @@ public Integer call(Integer t1, Integer t2) {
528528

529529
@Test(timeout = 10000)
530530
public void testUpstreamErrorAllowsRetry() throws InterruptedException {
531+
532+
final AtomicReference<Throwable> err1 = new AtomicReference<Throwable>();
533+
final AtomicReference<Throwable> err2 = new AtomicReference<Throwable>();
534+
531535
final AtomicInteger intervalSubscribed = new AtomicInteger();
532536
Observable<String> interval =
533537
Observable.interval(200,TimeUnit.MILLISECONDS)
@@ -572,6 +576,11 @@ public void call(Throwable t1) {
572576
public void call(String t1) {
573577
System.out.println("Subscriber 1: " + t1);
574578
}
579+
}, new Action1<Throwable>() {
580+
@Override
581+
public void call(Throwable t) {
582+
err1.set(t);
583+
}
575584
});
576585
Thread.sleep(100);
577586
interval
@@ -587,11 +596,19 @@ public void call(Throwable t1) {
587596
public void call(String t1) {
588597
System.out.println("Subscriber 2: " + t1);
589598
}
599+
}, new Action1<Throwable>() {
600+
@Override
601+
public void call(Throwable t) {
602+
err2.set(t);
603+
}
590604
});
591605

592606
Thread.sleep(1300);
593607

594608
System.out.println(intervalSubscribed.get());
595609
assertEquals(6, intervalSubscribed.get());
610+
611+
assertNotNull("First subscriber didn't get the error", err1);
612+
assertNotNull("Second subscriber didn't get the error", err2);
596613
}
597614
}

src/test/java/rx/internal/operators/OperatorMergeMaxConcurrentTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import rx.*;
2828
import rx.Observable;
2929
import rx.Observer;
30+
import rx.internal.util.PlatformDependent;
3031
import rx.observers.TestSubscriber;
3132
import rx.schedulers.Schedulers;
3233

@@ -218,7 +219,11 @@ public void testSimpleAsync() {
218219
}
219220
@Test(timeout = 10000)
220221
public void testSimpleOneLessAsyncLoop() {
221-
for (int i = 0; i < 200; i++) {
222+
int max = 200;
223+
if (PlatformDependent.isAndroid()) {
224+
max = 50;
225+
}
226+
for (int i = 0; i < max; i++) {
222227
testSimpleOneLessAsync();
223228
}
224229
}

src/test/java/rx/internal/operators/OperatorReplayTest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import rx.internal.operators.OperatorReplay.BoundedReplayBuffer;
4747
import rx.internal.operators.OperatorReplay.Node;
4848
import rx.internal.operators.OperatorReplay.SizeAndTimeBoundReplayBuffer;
49+
import rx.internal.util.PlatformDependent;
4950
import rx.observables.ConnectableObservable;
5051
import rx.observers.TestSubscriber;
5152
import rx.schedulers.Schedulers;
@@ -1051,7 +1052,13 @@ public void testAsyncComeAndGo() {
10511052

10521053
@Test
10531054
public void testNoMissingBackpressureException() {
1054-
final int m = 4 * 1000 * 1000;
1055+
final int m;
1056+
if (PlatformDependent.isAndroid()) {
1057+
m = 500 * 1000;
1058+
} else {
1059+
m = 4 * 1000 * 1000;
1060+
}
1061+
10551062
Observable<Integer> firehose = Observable.create(new OnSubscribe<Integer>() {
10561063
@Override
10571064
public void call(Subscriber<? super Integer> t) {

0 commit comments

Comments
 (0)