diff --git a/src/test/java/rx/ObservableTests.java b/src/test/java/rx/ObservableTests.java index d59e8c41a9..99dfd9e89c 100644 --- a/src/test/java/rx/ObservableTests.java +++ b/src/test/java/rx/ObservableTests.java @@ -15,49 +15,25 @@ */ package rx; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import org.junit.Before; -import org.junit.Test; -import org.mockito.InOrder; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import rx.Observable.OnSubscribe; -import rx.Observable.Transformer; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import org.junit.*; +import org.mockito.*; + +import rx.Observable.*; import rx.exceptions.OnErrorNotImplementedException; -import rx.functions.Action1; -import rx.functions.Action2; -import rx.functions.Func0; -import rx.functions.Func1; -import rx.functions.Func2; +import rx.functions.*; import rx.observables.ConnectableObservable; import rx.observers.TestSubscriber; -import rx.schedulers.TestScheduler; -import rx.subjects.ReplaySubject; -import rx.subjects.Subject; +import rx.schedulers.*; +import rx.subjects.*; import rx.subscriptions.BooleanSubscription; public class ObservableTests { @@ -1101,19 +1077,45 @@ public String call(Integer t1) { } @Test - public void testErrorThrownIssue1685() { + public void testErrorThrownIssue1685() throws Exception { Subject subject = ReplaySubject.create(); - Observable.error(new RuntimeException("oops")) - .materialize() - .delay(1, TimeUnit.SECONDS) - .dematerialize() - .subscribe(subject); - - subject.subscribe(); - subject.materialize().toBlocking().first(); + ExecutorService exec = Executors.newSingleThreadExecutor(); + + try { + + final AtomicReference err = new AtomicReference(); + + Scheduler s = Schedulers.from(exec); + exec.submit(new Runnable() { + @Override + public void run() { + Thread.currentThread().setUncaughtExceptionHandler(new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + err.set(e); + } + }); + } + }).get(); + + Observable.error(new RuntimeException("oops")) + .materialize() + .delay(1, TimeUnit.SECONDS, s) + .dematerialize() + .subscribe(subject); + + subject.subscribe(); + subject.materialize().toBlocking().first(); - System.out.println("Done"); + Thread.sleep(1000); // the uncaught exception comes after the terminal event reaches toBlocking + + assertNotNull("UncaughtExceptionHandler didn't get anything.", err.get()); + + System.out.println("Done"); + } finally { + exec.shutdownNow(); + } } @Test diff --git a/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java b/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java index c28606cae0..bf4af98057 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java @@ -818,6 +818,7 @@ public void testWithCombineLatestIssue1717() throws InterruptedException { final AtomicInteger count = new AtomicInteger(); final int SIZE = 2000; Observable timer = Observable.interval(0, 1, TimeUnit.MILLISECONDS) + .onBackpressureBuffer() .observeOn(Schedulers.newThread()) .doOnEach(new Action1>() { diff --git a/src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java b/src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java index fa38d2bdf1..ab076ce411 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java @@ -21,14 +21,14 @@ import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import org.junit.*; import org.mockito.*; import rx.*; -import rx.Observable.OnSubscribe; import rx.Observable; +import rx.Observable.OnSubscribe; import rx.Observer; import rx.functions.*; import rx.observers.*; @@ -528,6 +528,10 @@ public Integer call(Integer t1, Integer t2) { @Test(timeout = 10000) public void testUpstreamErrorAllowsRetry() throws InterruptedException { + + final AtomicReference err1 = new AtomicReference(); + final AtomicReference err2 = new AtomicReference(); + final AtomicInteger intervalSubscribed = new AtomicInteger(); Observable interval = Observable.interval(200,TimeUnit.MILLISECONDS) @@ -572,6 +576,11 @@ public void call(Throwable t1) { public void call(String t1) { System.out.println("Subscriber 1: " + t1); } + }, new Action1() { + @Override + public void call(Throwable t) { + err1.set(t); + } }); Thread.sleep(100); interval @@ -587,11 +596,19 @@ public void call(Throwable t1) { public void call(String t1) { System.out.println("Subscriber 2: " + t1); } + }, new Action1() { + @Override + public void call(Throwable t) { + err2.set(t); + } }); Thread.sleep(1300); System.out.println(intervalSubscribed.get()); assertEquals(6, intervalSubscribed.get()); + + assertNotNull("First subscriber didn't get the error", err1); + assertNotNull("Second subscriber didn't get the error", err2); } } diff --git a/src/test/java/rx/internal/operators/OperatorMergeMaxConcurrentTest.java b/src/test/java/rx/internal/operators/OperatorMergeMaxConcurrentTest.java index af20d14316..128af7cb8e 100644 --- a/src/test/java/rx/internal/operators/OperatorMergeMaxConcurrentTest.java +++ b/src/test/java/rx/internal/operators/OperatorMergeMaxConcurrentTest.java @@ -27,6 +27,7 @@ import rx.*; import rx.Observable; import rx.Observer; +import rx.internal.util.PlatformDependent; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; @@ -218,7 +219,11 @@ public void testSimpleAsync() { } @Test(timeout = 10000) public void testSimpleOneLessAsyncLoop() { - for (int i = 0; i < 200; i++) { + int max = 200; + if (PlatformDependent.isAndroid()) { + max = 50; + } + for (int i = 0; i < max; i++) { testSimpleOneLessAsync(); } } diff --git a/src/test/java/rx/internal/operators/OperatorReplayTest.java b/src/test/java/rx/internal/operators/OperatorReplayTest.java index c0ec384d84..408fbc71ba 100644 --- a/src/test/java/rx/internal/operators/OperatorReplayTest.java +++ b/src/test/java/rx/internal/operators/OperatorReplayTest.java @@ -46,6 +46,7 @@ import rx.internal.operators.OperatorReplay.BoundedReplayBuffer; import rx.internal.operators.OperatorReplay.Node; import rx.internal.operators.OperatorReplay.SizeAndTimeBoundReplayBuffer; +import rx.internal.util.PlatformDependent; import rx.observables.ConnectableObservable; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; @@ -1049,7 +1050,13 @@ public void testAsyncComeAndGo() { @Test public void testNoMissingBackpressureException() { - final int m = 4 * 1000 * 1000; + final int m; + if (PlatformDependent.isAndroid()) { + m = 500 * 1000; + } else { + m = 4 * 1000 * 1000; + } + Observable firehose = Observable.create(new OnSubscribe() { @Override public void call(Subscriber t) {