Skip to content

2.x Exception unhandled after dispose() #4991

Closed
@loongee

Description

@loongee

Exception unhandled if dispose() called.
Did I used it in a wrong way ?

error stack:

java.lang.IllegalStateException: example
	at com.example.MyClass$2.apply(MyClass.java:31)
	at com.example.MyClass$2.apply(MyClass.java:23)
	at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:121)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:63)
	at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:246)
	at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
	at io.reactivex.Observable.subscribe(Observable.java:10179)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
	at io.reactivex.Scheduler$1.run(Scheduler.java:134)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Exception in thread "RxCachedThreadScheduler-1" java.lang.IllegalStateException: illegal
	at com.example.MyClass$2.apply(MyClass.java:31)
	at com.example.MyClass$2.apply(MyClass.java:23)
	at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:121)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:63)
	at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:246)
	at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
	at io.reactivex.Observable.subscribe(Observable.java:10179)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
	at io.reactivex.Scheduler$1.run(Scheduler.java:134)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

sample code:


public class MyClass {

    static Disposable disposable = null;

    public static void main(String[] args) {

        Observable.just(1)
                .subscribeOn(Schedulers.io())
                .flatMap(new Function<Integer, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(Integer integer) throws Exception {
                        long endTime = System.currentTimeMillis() + 500;
                        while (System.currentTimeMillis() < endTime) {
                        }
                        return Observable.error(new IllegalStateException("example"));
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onError(Throwable e) {}

                    @Override
                    public void onComplete() {}

                    @Override
                    public void onSubscribe(Disposable d) {
                        disposable = d;
                    }

                    @Override
                    public void onNext(Integer integer) {}
                });


        try {
            Thread.sleep(50);
            disposable.dispose();
            Thread.sleep(500000);
        } catch (InterruptedException e) {
        }
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions