Description
The Problem:
Creating Back-Pressure Enabled Observables that produce data from a non-blocking source is hard and our current APIs for creating arbitrary observables do not lend towards creating observables that respect the back pressure rules. Users have a choice between calling one of the Observable static method to generate standard case observables using well known patterns or they can attempt to implement a responsible observable out of an OnSubscribe<T>
. I will define a "responsible Observable" to be an observable that does the following:
- calls
subscriber.onNext(t)
only when thesubscriberCapacity > 0
wheresubscriberCapacity
is the sum of the integer arguments for all invocations ofproducer.request(n)
minus the number of calls made tosubscriber.onNext(t)
. - calls
subscriber.onCompleted()
orsubscriber.onError(e)
only once. - does not call
subscriber.onNext(t)
concurrently.
The onSubscribe lifecycle of a responsible observable should fit within the following lifecycle.
- The
onSubscribe.call(Subscriber<T>)
method is invoked by theObservable.subscribe(Subscriber<T>)
method. - The OnSubscribe func creates a
Producer
via thesubscriber.setProducer(Producer<T>)
method. At some point in the (potentially distant) future the subscriber may callproducer.request(n)
. This signals to the observable that capacity is allocated in the subscriber.
3a. Values are produced via a call tosubscriber.onNext(t)
.
3b. At some point a terminal event is emitted viaonCompleted
oronError
.
This allows for many different variations of control flow given that onNexts could possibly be scheduled, data could be pre-fetched, excess data could be batched or dropped, etc...
Proposal
With this ground work in mind I would like to start a discussion about a general framework that correctly models the various use cases, particularly with respects to the following requirements:
- capable of paginating data across multiple requests
- batching requests
- hot and cold data sources
- async and blocking onNexts
Naive Implementation
User provides callbacks that the framework will call between each of the steps above. That is
- Observable calls
onSubscribe.call(s)
- The framework's onSubscribe calls:
S dataState = onSubscribeGenerator.call()
- Next the onSubscribe calls:
subscriber.setProducer(p)
setting a framework Producer - At some point the request is received by our producer
p.request(n)
- Then the framework will call the user provided request callback and store the new resultant
dataState
for future requests.dataState = onRequestCallback.call(dataState, n, s)
Obvious short comings of this approach: Order may not be preserved across multiple requests if the onRequestCallback
schedules the onNext. Also onNexts may be concurrent so the subscriber must be wrapped with a serializing subscriber. Also, a case could be made that many observables created should specify the backpressure strategy. Any thoughts are welcome.