diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 2ca7ffddf0..7168467775 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -3533,13 +3533,13 @@ public final Observable cache() { *
{@code cache} does not operate by default on a particular {@link Scheduler}.
* * - * @param capacity hint for number of items to cache (for optimizing underlying data structure) + * @param capacityHint hint for number of items to cache (for optimizing underlying data structure) * @return an Observable that, when first subscribed to, caches all of its items and notifications for the * benefit of subsequent subscribers * @see ReactiveX operators documentation: Replay */ - public final Observable cache(int capacity) { - return CachedObservable.from(this, capacity); + public final Observable cache(int capacityHint) { + return CachedObservable.from(this, capacityHint); } /** diff --git a/src/main/java/rx/internal/operators/OnSubscribeAutoConnect.java b/src/main/java/rx/internal/operators/OnSubscribeAutoConnect.java new file mode 100644 index 0000000000..c664717332 --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeAutoConnect.java @@ -0,0 +1,55 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import java.util.concurrent.atomic.AtomicInteger; + +import rx.Observable.OnSubscribe; +import rx.*; +import rx.functions.Action1; +import rx.observables.ConnectableObservable; + +/** + * Wraps a ConnectableObservable and calls its connect() method once + * the specified number of Subscribers have subscribed. + * + * @param the value type of the chain + */ +public final class OnSubscribeAutoConnect implements OnSubscribe { + final ConnectableObservable source; + final int numberOfSubscribers; + final Action1 connection; + final AtomicInteger clients; + + public OnSubscribeAutoConnect(ConnectableObservable source, + int numberOfSubscribers, + Action1 connection) { + if (numberOfSubscribers <= 0) { + throw new IllegalArgumentException("numberOfSubscribers > 0 required"); + } + this.source = source; + this.numberOfSubscribers = numberOfSubscribers; + this.connection = connection; + this.clients = new AtomicInteger(); + } + @Override + public void call(Subscriber child) { + source.unsafeSubscribe(child); + if (clients.incrementAndGet() == numberOfSubscribers) { + source.connect(connection); + } + } +} diff --git a/src/main/java/rx/observables/ConnectableObservable.java b/src/main/java/rx/observables/ConnectableObservable.java index a3c80ef1b4..868c2d3071 100644 --- a/src/main/java/rx/observables/ConnectableObservable.java +++ b/src/main/java/rx/observables/ConnectableObservable.java @@ -15,11 +15,10 @@ */ package rx.observables; -import rx.Observable; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Action1; -import rx.internal.operators.OnSubscribeRefCount; +import rx.*; +import rx.annotations.Experimental; +import rx.functions.*; +import rx.internal.operators.*; /** * A {@code ConnectableObservable} resembles an ordinary {@link Observable}, except that it does not begin @@ -80,4 +79,56 @@ public void call(Subscription t1) { public Observable refCount() { return create(new OnSubscribeRefCount(this)); } + + /** + * Returns an Observable that automatically connects to this ConnectableObservable + * when the first Subscriber subscribes. + * + * @return an Observable that automatically connects to this ConnectableObservable + * when the first Subscriber subscribes + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public Observable autoConnect() { + return autoConnect(1); + } + /** + * Returns an Observable that automatically connects to this ConnectableObservable + * when the specified number of Subscribers subscribe to it. + * + * @param numberOfSubscribers the number of subscribers to await before calling connect + * on the ConnectableObservable. A non-positive value indicates + * an immediate connection. + * @return an Observable that automatically connects to this ConnectableObservable + * when the specified number of Subscribers subscribe to it + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public Observable autoConnect(int numberOfSubscribers) { + return autoConnect(numberOfSubscribers, Actions.empty()); + } + + /** + * Returns an Observable that automatically connects to this ConnectableObservable + * when the specified number of Subscribers subscribe to it and calls the + * specified callback with the Subscription associated with the established connection. + * + * @param numberOfSubscribers the number of subscribers to await before calling connect + * on the ConnectableObservable. A non-positive value indicates + * an immediate connection. + * @param connection the callback Action1 that will receive the Subscription representing the + * established connection + * @return an Observable that automatically connects to this ConnectableObservable + * when the specified number of Subscribers subscribe to it and calls the + * specified callback with the Subscription associated with the established connection + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public Observable autoConnect(int numberOfSubscribers, Action1 connection) { + if (numberOfSubscribers <= 0) { + this.connect(connection); + return this; + } + return create(new OnSubscribeAutoConnect(this, numberOfSubscribers, connection)); + } } diff --git a/src/test/java/rx/observables/ConnectableObservableTest.java b/src/test/java/rx/observables/ConnectableObservableTest.java new file mode 100644 index 0000000000..419c694bb0 --- /dev/null +++ b/src/test/java/rx/observables/ConnectableObservableTest.java @@ -0,0 +1,175 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.observables; + +import java.util.concurrent.atomic.*; + +import org.junit.*; + +import rx.*; +import rx.functions.*; +import rx.observers.TestSubscriber; + +public class ConnectableObservableTest { + @Test + public void testAutoConnect() { + final AtomicInteger run = new AtomicInteger(); + + ConnectableObservable co = Observable.defer(new Func0>() { + @Override + public Observable call() { + return Observable.just(run.incrementAndGet()); + } + }).publish(); + + Observable source = co.autoConnect(); + + Assert.assertEquals(0, run.get()); + + TestSubscriber ts1 = TestSubscriber.create(); + source.subscribe(ts1); + + ts1.assertCompleted(); + ts1.assertNoErrors(); + ts1.assertValue(1); + + Assert.assertEquals(1, run.get()); + + TestSubscriber ts2 = TestSubscriber.create(); + source.subscribe(ts2); + + ts2.assertNotCompleted(); + ts2.assertNoErrors(); + ts2.assertNoValues(); + + Assert.assertEquals(1, run.get()); + } + @Test + public void testAutoConnect0() { + final AtomicInteger run = new AtomicInteger(); + + ConnectableObservable co = Observable.defer(new Func0>() { + @Override + public Observable call() { + return Observable.just(run.incrementAndGet()); + } + }).publish(); + + Observable source = co.autoConnect(0); + + Assert.assertEquals(1, run.get()); + + TestSubscriber ts1 = TestSubscriber.create(); + source.subscribe(ts1); + + ts1.assertNotCompleted(); + ts1.assertNoErrors(); + ts1.assertNoValues(); + + Assert.assertEquals(1, run.get()); + + TestSubscriber ts2 = TestSubscriber.create(); + source.subscribe(ts2); + + ts2.assertNotCompleted(); + ts2.assertNoErrors(); + ts2.assertNoValues(); + + Assert.assertEquals(1, run.get()); + } + @Test + public void testAutoConnect2() { + final AtomicInteger run = new AtomicInteger(); + + ConnectableObservable co = Observable.defer(new Func0>() { + @Override + public Observable call() { + return Observable.just(run.incrementAndGet()); + } + }).publish(); + + Observable source = co.autoConnect(2); + + Assert.assertEquals(0, run.get()); + + TestSubscriber ts1 = TestSubscriber.create(); + source.subscribe(ts1); + + ts1.assertNotCompleted(); + ts1.assertNoErrors(); + ts1.assertNoValues(); + + Assert.assertEquals(0, run.get()); + + TestSubscriber ts2 = TestSubscriber.create(); + source.subscribe(ts2); + + Assert.assertEquals(1, run.get()); + + ts1.assertCompleted(); + ts1.assertNoErrors(); + ts1.assertValue(1); + + ts2.assertCompleted(); + ts2.assertNoErrors(); + ts2.assertValue(1); + + } + + @Test + public void testAutoConnectUnsubscribe() { + final AtomicInteger run = new AtomicInteger(); + + ConnectableObservable co = Observable.defer(new Func0>() { + @Override + public Observable call() { + return Observable.range(run.incrementAndGet(), 10); + } + }).publish(); + + final AtomicReference conn = new AtomicReference(); + + Observable source = co.autoConnect(1, new Action1() { + @Override + public void call(Subscription t) { + conn.set(t); + } + }); + + Assert.assertEquals(0, run.get()); + + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + Subscription s = conn.get(); + if (s != null) { + s.unsubscribe(); + } else { + onError(new NullPointerException("No connection reference")); + } + } + }; + + source.subscribe(ts); + + ts.assertNotCompleted(); + ts.assertNoErrors(); + ts.assertValue(1); + + Assert.assertTrue("Connection not unsubscribed?", conn.get().isUnsubscribed()); + } +}