Skip to content

concatMap + observeOn throws a NullPointerException sometimes #2874

@ashish-tyagi

Description

@ashish-tyagi

It is difficult to produce this one. It occurs in our production service, which has a code almost like this:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

/**
 * @author Ashish.Tyagi
 */
public class Test {
    public static void main(String[] args) throws InterruptedException {
        Subject<Integer, Integer> subject = PublishSubject.create();
        ExecutorService executor = Executors.newFixedThreadPool(1);

        subject.concatMap(new Func1<Integer, Observable<? extends Integer>>() {
            @Override
            public Observable<? extends Integer> call(Integer t) {
                return Observable.just(t);
            }
        }).observeOn(Schedulers.from(executor)).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer t) {
                if (t % 1000 == 0) {
                    System.out.println(t);
                }
            }
        });

        for (int i = 0; i < 10000000; i++) {
            subject.onNext(i);
        }
        subject.onCompleted();

        executor.shutdown();
        executor.awaitTermination(5, TimeUnit.MINUTES);
    }
}
Foolowing is the exception thrown:
Exception in thread "serviceExecutor-1" java.lang.NullPointerException
    at rx.internal.operators.OperatorConcat$ConcatSubscriber.requestFromChild(OperatorConcat.java:129)
    at rx.internal.operators.OperatorConcat$ConcatSubscriber.access$100(OperatorConcat.java:78)
    at rx.internal.operators.OperatorConcat$ConcatProducer.request(OperatorConcat.java:73)
    at rx.Subscriber.request(Subscriber.java:141)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:207)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.access$000(OperatorObserveOn.java:65)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:155)
    at rx.schedulers.ExecutorScheduler$ExecutorAction.run(ExecutorScheduler.java:173)
    at rx.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:99)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions