diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 919548fd32..a813373950 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -1225,7 +1225,7 @@ public final static Observable from(T[] array) { * @see ReactiveX operators documentation: Interval */ public final static Observable interval(long interval, TimeUnit unit) { - return interval(interval, unit, Schedulers.computation()); + return interval(interval, interval, unit, Schedulers.computation()); } /** @@ -1248,7 +1248,65 @@ public final static Observable interval(long interval, TimeUnit unit) { * @see ReactiveX operators documentation: Interval */ public final static Observable interval(long interval, TimeUnit unit, Scheduler scheduler) { - return create(new OnSubscribeTimerPeriodically(interval, interval, unit, scheduler)); + return interval(interval, interval, unit, scheduler); + } + + /** + * Returns an Observable that emits a {@code 0L} after the {@code initialDelay} and ever increasing numbers + * after each {@code period} of time thereafter. + *

+ * + *

+ *
Backpressure Support:
+ *
This operator does not support backpressure as it uses time. If the downstream needs a slower rate + * it should slow the timer or use something like {@link #onBackpressureDrop}.
+ *
Scheduler:
+ *
{@code timer} operates by default on the {@code computation} {@link Scheduler}.
+ *
+ * + * @param initialDelay + * the initial delay time to wait before emitting the first value of 0L + * @param period + * the period of time between emissions of the subsequent numbers + * @param unit + * the time unit for both {@code initialDelay} and {@code period} + * @return an Observable that emits a 0L after the {@code initialDelay} and ever increasing numbers after + * each {@code period} of time thereafter + * @see ReactiveX operators documentation: Interval + * @since 1.0.12 + */ + public final static Observable interval(long initialDelay, long period, TimeUnit unit) { + return interval(initialDelay, period, unit, Schedulers.computation()); + } + + /** + * Returns an Observable that emits a {@code 0L} after the {@code initialDelay} and ever increasing numbers + * after each {@code period} of time thereafter, on a specified {@link Scheduler}. + *

+ * + *

+ *
Backpressure Support:
+ *
This operator does not support backpressure as it uses time. If the downstream needs a slower rate + * it should slow the timer or use something like {@link #onBackpressureDrop}.
+ *
Scheduler:
+ *
you specify which {@link Scheduler} this operator will use
+ *
+ * + * @param initialDelay + * the initial delay time to wait before emitting the first value of 0L + * @param period + * the period of time between emissions of the subsequent numbers + * @param unit + * the time unit for both {@code initialDelay} and {@code period} + * @param scheduler + * the Scheduler on which the waiting happens and items are emitted + * @return an Observable that emits a 0L after the {@code initialDelay} and ever increasing numbers after + * each {@code period} of time thereafter, while running on the given Scheduler + * @see ReactiveX operators documentation: Interval + * @since 1.0.12 + */ + public final static Observable interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) { + return create(new OnSubscribeTimerPeriodically(initialDelay, period, unit, scheduler)); } /** @@ -2462,9 +2520,11 @@ public final static Observable switchOnNext(ObservableReactiveX operators documentation: Timer + * @deprecated use {@link #interval(long, long, TimeUnit)} instead */ + @Deprecated public final static Observable timer(long initialDelay, long period, TimeUnit unit) { - return timer(initialDelay, period, unit, Schedulers.computation()); + return interval(initialDelay, period, unit, Schedulers.computation()); } /** @@ -2491,9 +2551,11 @@ public final static Observable timer(long initialDelay, long period, TimeU * @return an Observable that emits a 0L after the {@code initialDelay} and ever increasing numbers after * each {@code period} of time thereafter, while running on the given Scheduler * @see ReactiveX operators documentation: Timer + * @deprecated use {@link #interval(long, long, TimeUnit, Scheduler)} instead */ + @Deprecated public final static Observable timer(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) { - return create(new OnSubscribeTimerPeriodically(initialDelay, period, unit, scheduler)); + return interval(initialDelay, period, unit, scheduler); } /** diff --git a/src/perf/java/rx/operators/OperatorSerializePerf.java b/src/perf/java/rx/operators/OperatorSerializePerf.java index 49c32eca5b..cae310b72c 100644 --- a/src/perf/java/rx/operators/OperatorSerializePerf.java +++ b/src/perf/java/rx/operators/OperatorSerializePerf.java @@ -90,7 +90,7 @@ public int getSize() { public void setup(Blackhole bh) { super.setup(bh); - interval = Observable.timer(0, 1, TimeUnit.MILLISECONDS).take(size).map(this); + interval = Observable.interval(0, 1, TimeUnit.MILLISECONDS).take(size).map(this); } @Override public Integer call(Long t1) { diff --git a/src/test/java/rx/internal/operators/OnSubscribeCacheTest.java b/src/test/java/rx/internal/operators/OnSubscribeCacheTest.java index 3303106d93..0d74cd878b 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeCacheTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeCacheTest.java @@ -114,7 +114,7 @@ public Integer call(Long t1) { Observable source2 = source1 .repeat(4) - .zipWith(Observable.timer(0, 10, TimeUnit.MILLISECONDS, Schedulers.newThread()), new Func2() { + .zipWith(Observable.interval(0, 10, TimeUnit.MILLISECONDS, Schedulers.newThread()), new Func2() { @Override public Integer call(Integer t1, Long t2) { return t1; diff --git a/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java b/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java index c2a9f15dce..e593d30465 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java @@ -820,7 +820,7 @@ public void testWithCombineLatestIssue1717() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final AtomicInteger count = new AtomicInteger(); final int SIZE = 2000; - Observable timer = Observable.timer(0, 1, TimeUnit.MILLISECONDS) + Observable timer = Observable.interval(0, 1, TimeUnit.MILLISECONDS) .observeOn(Schedulers.newThread()) .doOnEach(new Action1>() { diff --git a/src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java b/src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java index 59d1ee7de4..fa38d2bdf1 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java @@ -47,7 +47,7 @@ public void setUp() { public void testRefCountAsync() { final AtomicInteger subscribeCount = new AtomicInteger(); final AtomicInteger nextCount = new AtomicInteger(); - Observable r = Observable.timer(0, 5, TimeUnit.MILLISECONDS) + Observable r = Observable.interval(0, 5, TimeUnit.MILLISECONDS) .doOnSubscribe(new Action0() { @Override @@ -183,7 +183,7 @@ public void call(Integer l) { public void testRepeat() { final AtomicInteger subscribeCount = new AtomicInteger(); final AtomicInteger unsubscribeCount = new AtomicInteger(); - Observable r = Observable.timer(0, 1, TimeUnit.MILLISECONDS) + Observable r = Observable.interval(0, 1, TimeUnit.MILLISECONDS) .doOnSubscribe(new Action0() { @Override diff --git a/src/test/java/rx/internal/operators/OnSubscribeTimerTest.java b/src/test/java/rx/internal/operators/OnSubscribeTimerTest.java index 1dda1fa4a9..99c677cedb 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeTimerTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeTimerTest.java @@ -64,7 +64,7 @@ public void testTimerOnce() { @Test public void testTimerPeriodically() { - Subscription c = Observable.timer(100, 100, TimeUnit.MILLISECONDS, scheduler).subscribe(observer); + Subscription c = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler).subscribe(observer); scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); InOrder inOrder = inOrder(observer); @@ -260,7 +260,7 @@ public void onCompleted() { } @Test public void testPeriodicObserverThrows() { - Observable source = Observable.timer(100, 100, TimeUnit.MILLISECONDS, scheduler); + Observable source = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler); InOrder inOrder = inOrder(observer); diff --git a/src/test/java/rx/internal/operators/OperatorBufferTest.java b/src/test/java/rx/internal/operators/OperatorBufferTest.java index 8e0a9b614e..d05c151ee2 100644 --- a/src/test/java/rx/internal/operators/OperatorBufferTest.java +++ b/src/test/java/rx/internal/operators/OperatorBufferTest.java @@ -557,7 +557,7 @@ public void bufferWithSizeSkipTake1() { } @Test(timeout = 2000) public void bufferWithTimeTake1() { - Observable source = Observable.timer(40, 40, TimeUnit.MILLISECONDS, scheduler); + Observable source = Observable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler); Observable> result = source.buffer(100, TimeUnit.MILLISECONDS, scheduler).take(1); @@ -574,7 +574,7 @@ public void bufferWithTimeTake1() { } @Test(timeout = 2000) public void bufferWithTimeSkipTake2() { - Observable source = Observable.timer(40, 40, TimeUnit.MILLISECONDS, scheduler); + Observable source = Observable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler); Observable> result = source.buffer(100, 60, TimeUnit.MILLISECONDS, scheduler).take(2); @@ -593,8 +593,8 @@ public void bufferWithTimeSkipTake2() { } @Test(timeout = 2000) public void bufferWithBoundaryTake2() { - Observable boundary = Observable.timer(60, 60, TimeUnit.MILLISECONDS, scheduler); - Observable source = Observable.timer(40, 40, TimeUnit.MILLISECONDS, scheduler); + Observable boundary = Observable.interval(60, 60, TimeUnit.MILLISECONDS, scheduler); + Observable source = Observable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler); Observable> result = source.buffer(boundary).take(2); @@ -615,15 +615,15 @@ public void bufferWithBoundaryTake2() { @Test(timeout = 2000) public void bufferWithStartEndBoundaryTake2() { - Observable start = Observable.timer(61, 61, TimeUnit.MILLISECONDS, scheduler); + Observable start = Observable.interval(61, 61, TimeUnit.MILLISECONDS, scheduler); Func1> end = new Func1>() { @Override public Observable call(Long t1) { - return Observable.timer(100, 100, TimeUnit.MILLISECONDS, scheduler); + return Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler); } }; - Observable source = Observable.timer(40, 40, TimeUnit.MILLISECONDS, scheduler); + Observable source = Observable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler); Observable> result = source.buffer(start, end).take(2); @@ -693,7 +693,7 @@ public void bufferWithTimeThrows() { } @Test public void bufferWithTimeAndSize() { - Observable source = Observable.timer(30, 30, TimeUnit.MILLISECONDS, scheduler); + Observable source = Observable.interval(30, 30, TimeUnit.MILLISECONDS, scheduler); Observable> result = source.buffer(100, TimeUnit.MILLISECONDS, 2, scheduler).take(3); diff --git a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java index 1f0cc0a892..e505bf0672 100644 --- a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java +++ b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java @@ -663,7 +663,7 @@ public void onNext(Long t) { @Test public void testHotOperatorBackpressure() { TestSubscriber ts = new TestSubscriber(); - Observable.timer(0, 1, TimeUnit.MICROSECONDS) + Observable.interval(0, 1, TimeUnit.MICROSECONDS) .observeOn(Schedulers.computation()) .map(new Func1() { @@ -687,7 +687,7 @@ public String call(Long t1) { @Test public void testErrorPropagatesWhenNoOutstandingRequests() { - Observable timer = Observable.timer(0, 1, TimeUnit.MICROSECONDS) + Observable timer = Observable.interval(0, 1, TimeUnit.MICROSECONDS) .doOnEach(new Action1>() { @Override diff --git a/src/test/java/rx/internal/operators/OperatorPublishTest.java b/src/test/java/rx/internal/operators/OperatorPublishTest.java index f6bfaa7e21..a916a5e41c 100644 --- a/src/test/java/rx/internal/operators/OperatorPublishTest.java +++ b/src/test/java/rx/internal/operators/OperatorPublishTest.java @@ -247,7 +247,7 @@ public void call() { @Test public void testConnectWithNoSubscriber() { TestScheduler scheduler = new TestScheduler(); - ConnectableObservable co = Observable.timer(10, 10, TimeUnit.MILLISECONDS, scheduler).take(3).publish(); + ConnectableObservable co = Observable.interval(10, 10, TimeUnit.MILLISECONDS, scheduler).take(3).publish(); co.connect(); // Emit 0 scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS); diff --git a/src/test/java/rx/internal/producers/ProducersTest.java b/src/test/java/rx/internal/producers/ProducersTest.java index ee746335fd..0e5beacdfa 100644 --- a/src/test/java/rx/internal/producers/ProducersTest.java +++ b/src/test/java/rx/internal/producers/ProducersTest.java @@ -355,10 +355,10 @@ public void testObserverArbiterAsync() { TestScheduler test = Schedulers.test(); @SuppressWarnings("unchecked") List> timers = Arrays.asList( - Observable.timer(100, 100, TimeUnit.MILLISECONDS, test), - Observable.timer(100, 100, TimeUnit.MILLISECONDS, test) + Observable.interval(100, 100, TimeUnit.MILLISECONDS, test), + Observable.interval(100, 100, TimeUnit.MILLISECONDS, test) .map(plus(20)), - Observable.timer(100, 100, TimeUnit.MILLISECONDS, test) + Observable.interval(100, 100, TimeUnit.MILLISECONDS, test) .map(plus(40)) );