S - the type of the user-define state used in
generateState(S) ,
next(S, Long, Subscriber), and
onUnsubscribe(S).
T - the type of
Subscribers that will be compatible with
this.
@Experimental public abstract class AsyncOnSubscribe<S,T> extends Objectimplements Observable .OnSubscribe <T>
OnSubscribe<T> functions that respond correctly to back pressure requests from subscribers. This is an improvement over
Observable.create(OnSubscribe) which does not provide any means of managing back pressure requests out-of-the-box. This variant of an OnSubscribe function allows for the asynchronous processing of requests.
| Constructor and Description |
|---|
AsyncOnSubscribe()
|
| Modifier and Type | Method and Description |
|---|---|
void |
call(Subscriber
|
static <S |
createSingleState(Func0
Generates a synchronous
AsyncOnSubscribe that calls the provided
next function to generate data to downstream subscribers.
|
static <S |
createSingleState(Func0
Generates a synchronous
AsyncOnSubscribe that calls the provided
next function to generate data to downstream subscribers.
|
static <S |
createStateful(Func0
Generates a synchronous
AsyncOnSubscribe that calls the provided
next function to generate data to downstream subscribers.
|
static <S |
createStateful(Func0
Generates a synchronous
AsyncOnSubscribe that calls the provided
next function to generate data to downstream subscribers.
|
static <T> Observable |
createStateless(Action2
Generates a synchronous
AsyncOnSubscribe that calls the provided
next function to generate data to downstream subscribers.
|
static <T> Observable |
createStateless(Action2
Generates a synchronous
AsyncOnSubscribe that calls the provided
next function to generate data to downstream subscribers.
|
protected abstract S |
generateState()
Executed once when subscribed to by a subscriber (via
OnSubscribe#call(Subscriber)) to produce a state value.
|
protected abstract S |
next(S state, long requested, Observer
Called to produce data to the downstream subscribers.
|
protected void |
onUnsubscribe(S state)
Clean up behavior that is executed after the downstream subscriber's subscription is unsubscribed.
|
protected abstract S generateState()
OnSubscribe#call(Subscriber)) to produce a state value. This value is passed into
next(S state, Observer
observer)
on the first iteration. Subsequent iterations of
next will receive the state returned by the previous invocation of
next.
protected abstract S next(S state, long requested, Observer<Observable <? extends T>> observer)
observer.onNext(t). To signal an error condition call
observer.onError(throwable) or throw an Exception. To signal the end of a data stream call
observer.onCompleted(). Implementations of this method must follow the following rules.
observer.onNext(t) more than 1 time per invocation.observer.onNext(t) concurrently.state argument of the next invocation of this method.
state - the state value (from
generateState() on the first invocation or the previous invocation of this method.
requested - the amount of data requested. An observable emitted to the observer should not exceed this amount.
observer - the observer of data emitted by
protected void onUnsubscribe(S state)
state - the last state value returned from
next(S, Long, Observer) or
generateState() at the time when a terminal event is emitted from
next(Object, long, Observer) or unsubscribing.
@Experimental public static <S,T> Observable .OnSubscribe <T> createSingleState(Func0 <? extends S> generator, Action3 <? super S ,Long ,? super Observer <Observable <? extends T>>> next)
AsyncOnSubscribe that calls the provided
next function to generate data to downstream subscribers.
generator - generates the initial state value (see
generateState())
next - produces data to the downstream subscriber (see
next(S, long, Observer))
@Experimental public static <S,T> Observable .OnSubscribe <T> createSingleState(Func0 <? extends S> generator, Action3 <? super S ,Long ,? super Observer <Observable <? extends T>>> next, Action1 <? super S> onUnsubscribe)
AsyncOnSubscribe that calls the provided
next function to generate data to downstream subscribers. This overload creates a AsyncOnSubscribe without an explicit clean up step.
generator - generates the initial state value (see
generateState())
next - produces data to the downstream subscriber (see
next(S, long, Observer))
onUnsubscribe - clean up behavior (see
onUnsubscribe(S))
@Experimental public static <S,T> Observable .OnSubscribe <T> createStateful(Func0 <? extends S> generator, Func3 <? super S ,Long ,? super Observer <Observable <? extends T>> ,? extends S> next, Action1 <? super S> onUnsubscribe)
AsyncOnSubscribe that calls the provided
next function to generate data to downstream subscribers.
generator - generates the initial state value (see
generateState())
next - produces data to the downstream subscriber (see
next(S, long, Observer))
onUnsubscribe - clean up behavior (see
onUnsubscribe(S))
@Experimental public static <S,T> Observable .OnSubscribe <T> createStateful(Func0 <? extends S> generator, Func3 <? super S ,Long ,? super Observer <Observable <? extends T>> ,? extends S> next)
AsyncOnSubscribe that calls the provided
next function to generate data to downstream subscribers.
generator - generates the initial state value (see
generateState())
next - produces data to the downstream subscriber (see
next(S, long, Observer))
@Experimental public static <T> Observable.OnSubscribe <T> createStateless(Action2 <Long ,? super Observer <Observable <? extends T>>> next)
AsyncOnSubscribe that calls the provided
next function to generate data to downstream subscribers. This overload creates a "state-less" AsyncOnSubscribe which does not have an explicit state value. This should be used when the
next function closes over it's state.
next - produces data to the downstream subscriber (see
next(S, long, Observer))
@Experimental public static <T> Observable.OnSubscribe <T> createStateless(Action2 <Long ,? super Observer <Observable <? extends T>>> next, Action0 onUnsubscribe)
AsyncOnSubscribe that calls the provided
next function to generate data to downstream subscribers. This overload creates a "state-less" AsyncOnSubscribe which does not have an explicit state value. This should be used when the
next function closes over it's state.
next - produces data to the downstream subscriber (see
next(S, long, Observer))
onUnsubscribe - clean up behavior (see
onUnsubscribe(S))
public final void call(Subscriber<? super T> actualSubscriber)