Skip to content

Exceptions not being propagated via onError when using toMap with observeOn #3555

Closed
@vyadh

Description

@vyadh

Hi,

When I try to use toMap() where my value selector may throw an exception, the error only seems to be propagated via the Observable error stream if the observable is with running single threaded or subscribeOn is being used, but not when observeOn is being used.

Here is a unit test to illustrate the problem, where the test "onErrorCalledWhenObserveOnUsed" fails:

public class RxJavaToMapWithErrorBehaviourTest {

  @Test
  public void onErrorCalledForSingleThread() {
    Observable<Map<Integer, Integer>> observable = Observable.just(1, 2, 3, 4, 5)
          .toMap(n -> n, n -> {
            throw new IllegalStateException();
          });

    assertOnErrorCalled(observable);
  }

  @Test
  public void onErrorCalledWhenSubscribeOnUsed() {
    Observable<Map<Integer, Integer>> observable = Observable.just(1, 2, 3, 4, 5)
          .subscribeOn(Schedulers.computation())
          .toMap(n -> n, n -> {
            throw new IllegalStateException();
          });

    assertOnErrorCalled(observable);
  }

  @Test
  public void onErrorCalledWhenObserveOnUsed() {
    Observable<Map<Integer, Integer>> observable = Observable.just(1, 2, 3, 4, 5)
          .observeOn(Schedulers.computation())
          .toMap(n -> n, n -> {
              throw new IllegalStateException();
          });

    assertOnErrorCalled(observable);
  }

  private <T> void assertOnErrorCalled(Observable<T> observable) {
    TestSubscriber<T> subscriber = new TestSubscriber<>();
    observable.subscribe(subscriber);

    subscriber.awaitTerminalEvent(2, TimeUnit.SECONDS);
    subscriber.assertError(IllegalStateException.class);
  }

}

The error thrown is:

Exception in thread "RxComputationThreadPool-3" java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:62)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException
    at RxJavaToMapWithErrorBehaviourTest.lambda$onErrorCalledWhenObserveOnUsed$5(RxJavaToMapWithErrorBehaviourTest.java:47)
    at RxJavaToMapWithErrorBehaviourTest$$Lambda$2/431687835.call(Unknown Source)
    at rx.internal.operators.OperatorToMap$1.onNext(OperatorToMap.java:90)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:208)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:170)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    ... 7 more

The observeOn() case seems to make make sense for my use-case at least (where I used subscribeOn to do a database call, followed by observeOn for subsequent calculations).

I think I can probably workaround this problem by first doing a map() to do the calculation and then the toMap(), but I wondered whether this was a bug or expected behaviour?

Thanks.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions