Skip to content

AsyncOnSubscribe strange behavior #3341

Closed
@akarnokd

Description

@akarnokd

I've been asked to show test cases that prove the existence of bugs I mentioned in #3203.

First, I tried to show the race between inner termination and outer unsubscribtion, but the following simple code returns an error instead of any value:

@Test
public void bugRequestAlwaysMax() {
    Action2<Long, Observer<Observable<? extends Integer>>> action2 = new Action2<Long, Observer<Observable<? extends Integer>>>() {
        @Override
        public void call(Long t1, Observer<Observable<? extends Integer>> t2) {
            System.out.println(t1);
            int intValue = t1.intValue();
            t2.onNext(
                    Observable.range(1, intValue)
            );
        }
    };

    final TestSubscriber<Integer> ts = TestSubscriber.create(0);

    AsyncOnSubscribe.createStateless(action2).call(ts);

    ts.requestMore(1);

    ts.assertNoErrors();
    ts.assertValue(1);
}
java.lang.AssertionError: Unexpected onError events: 1
    at rx.observers.TestSubscriber.assertNoErrors(TestSubscriber.java:263)
    at rx.observables.AsyncOnSubscribeTest.bugUnsubscribeVsInnerComplete(AsyncOnSubscribeTest.java:443)
Caused by: java.lang.IllegalArgumentException: Count can not be negative
    at rx.Observable.range(Observable.java:2514)
    at rx.observables.AsyncOnSubscribeTest$27.call(AsyncOnSubscribeTest.java:425)
    at rx.observables.AsyncOnSubscribeTest$27.call(AsyncOnSubscribeTest.java:1)
    at rx.observables.AsyncOnSubscribe$3.call(AsyncOnSubscribe.java:223)
    at rx.observables.AsyncOnSubscribe$3.call(AsyncOnSubscribe.java:1)
    at rx.observables.AsyncOnSubscribe$AsyncOnSubscribeImpl.next(AsyncOnSubscribe.java:303)
    at rx.observables.AsyncOnSubscribe$AsyncOuterSubscriber.nextIteration(AsyncOnSubscribe.java:370)
    at rx.observables.AsyncOnSubscribe$AsyncOuterSubscriber.request(AsyncOnSubscribe.java:392)
    at rx.Subscriber.setProducer(Subscriber.java:209)
    at rx.observables.AsyncOnSubscribe.call(AsyncOnSubscribe.java:320)
    at rx.observables.AsyncOnSubscribe.call(AsyncOnSubscribe.java:1)
    at rx.observables.AsyncOnSubscribeTest.bugUnsubscribeVsInnerComplete(AsyncOnSubscribeTest.java:439)
    ... 26 more

It seems that AsyncOuterSubscriber always requests Long.MAX_VALUE even if the child requests nothing. This value ends up as -1 and hence the error.

I've debugged testOnUnsubscribeHasCorrectState and it seems there is an initial request of MAX_VALUE which triggers state 1. the value is also put into the queue which is polled on the next iteration yielding another MAX_VALUE and state 2. At this point, the main reaches requestMore(2) which triggers state 3 and requestMore(3) does nothing since the inner has called onComplete().

The following code hangs after a few dozen iterations. It tries to overlap the termination of the inner range with a concurrent cancellation.

ExecutorService exec = Executors.newSingleThreadExecutor();
try {
    for (int i = 0; i < 1000; i++) {
        if (i % 50 == 0) {
            System.out.println("bugUnsubscribeVsInnerComplete >> " + i);
        }
        final CyclicBarrier cb = new CyclicBarrier(2);
        Action2<Long, Observer<Observable<? extends Integer>>> action2 = new Action2<Long, Observer<Observable<? extends Integer>>>() {
            @Override
            public void call(Long t1, Observer<Observable<? extends Integer>> t2) {
                System.out.println(t1);
                t2.onNext(
                        Observable.range(1, 1)
                        .doOnSubscribe(new Action0() {
                            @Override
                            public void call() {
                                System.out.println("Subscribed to inner");
                            }
                        })
                        .doOnCompleted(new Action0() {
                            @Override
                            public void call() {
                                System.out.println("Entering completion handler");
                                await(cb);
                            }
                        })
                );
            }
        };

        final TestSubscriber<Integer> ts = TestSubscriber.create(0);

        Future<?> f = exec.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("Entering unsubscribe");
                await(cb);
                ts.unsubscribe();
            }
        });

        AsyncOnSubscribe.createStateless(action2).call(ts);


        f.get();

        ts.assertNoErrors();
    }
} finally {
    exec.shutdownNow();
}

I couldn't get further with the other bugs because the behavior of this two tests prevent any other progress.

Since I don't understand what the original intent was nor how the request-juggling supposed to work, I can't offer any suggestion on what to and how to fix the operator. Therefore, I suggest rolling back the merge and extending the unit test with more checks, including real concurrent ones (not just TestScheduler-based).

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions