Skip to content

ConnectableObservable's subscribers don't get emissions after second connection to source #4771

@Tagakov

Description

@Tagakov

I am reading IntroToRx .com pararaph Disposal of connections and subscriptions
 .

And it says that if I reconnect to previously disconnected ConncectableObservable it will resume emissions to its subscribers.

I have written a test for it, and it fails because a subscriber gets only the values emitted before the first disconnection. Is it correct behavior and my misunderstanding or it is bug?

    @Test
    public void reconnectToConnectableObservable() throws InterruptedException, IOException {

        final TestScheduler testScheduler = Schedulers.test();
        final TestSubscriber<Long> subscriber = TestSubscriber.create();
        final TestSubscriber<Long> sideEffectSubscriber = TestSubscriber.create();

        final ConnectableObservable<Long> observable = Observable.interval(1, TimeUnit.MILLISECONDS, testScheduler)
                .doOnNext(sideEffectSubscriber::onNext)
                .publish();

        final Subscription subscription = observable.subscribe(subscriber);

        final Subscription firstConnection = observable.connect();
        testScheduler.advanceTimeBy(2, TimeUnit.MILLISECONDS);

        firstConnection.unsubscribe();
        testScheduler.advanceTimeBy(16, TimeUnit.MILLISECONDS);

        final Subscription secondConnection = observable.connect();
        testScheduler.advanceTimeBy(2, TimeUnit.MILLISECONDS);

        secondConnection.unsubscribe();

        sideEffectSubscriber.assertValueCount(4); //pass
        assertThat(subscription.isUnsubscribed()).isFalse(); //pass
        subscriber.assertValueCount(4); //fails
    }

P.S. Help me with correct issue's title

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions