T - the value type
S - the per-subscriber user-defined state type
@Experimental public abstract class AbstractOnSubscribe<T,S> extends Objectimplements Observable .OnSubscribe <T>
Observable.OnSubscribe interface that helps you build Observable sources one
onNext at a time, and automatically supports unsubscription and backpressure.
next() method, you
state.onNext(),state.onError(), or state.onCompleted(), orstate.stop() indicating no further values will be sent.state.onNext() and either state.onError() or state.onCompleted() together, and IllegalStateException is forwarded to the Subscriber and the Observable is terminated;state.onfoo() methods more than once (yields IllegalStateException).AbstractOnSubscribe.SubscriptionState object features counters that may help implement a state machine:
state.calls() tells how many times the next() was run (zero based).state.phase, that helps track the current emission phase, in a switch() statement to implement the state machine. (It is named phase to avoid confusion with the per-subscriber state.)state.advancePhase(), state.advancedPhaseBy(int) and state.phase(int). When you implement AbstractOnSubscribe, you may override onSubscribe(rx.Subscriber<? super T>) to perform special actions (such as registering Subscriptions with Subscriber.add()) and return additional state for each subscriber subscribing. You can access this custom state with the state.state() method. If you need to do some cleanup, you can override the onTerminated(S) method.
For convenience, a lambda-accepting static factory method, create(rx.functions.Action1<rx.observables.AbstractOnSubscribe.SubscriptionState<T, S>>), is available. Another convenience is toObservable() which turns an AbstractOnSubscribe instance into an Observable fluently.
AbstractOnSubscribe.create(s -> {
s.onNext(1);
s.onCompleted();
}).toObservable().subscribe(System.out::println);
Iterable
iterable = ...; AbstractOnSubscribe.create(s -> { Iterator
it = s.state(); if (it.hasNext()) { s.onNext(it.next()); } if (!it.hasNext()) { s.onCompleted(); } }, u -> iterable.iterator()).subscribe(System.out::println);
AtomicInteger fails = new AtomicInteger();
int numFails = 50;
AbstractOnSubscribe.create(s -> {
long c = s.calls();
switch (s.phase()) {
case 0:
s.onNext("Beginning");
s.onError(new RuntimeException("Oh, failure.");
if (c == numFails.getAndIncrement()) {
s.advancePhase();
}
break;
case 1:
s.onNext("Beginning");
s.advancePhase();
case 2:
s.onNext("Finally working");
s.onCompleted();
s.advancePhase();
default:
throw new IllegalStateException("How did we get here?");
}
}).subscribe(System.out::println);
AbstractOnSubscribe.create(s -> {
s.stop();
}).toObservable()
.timeout(1, TimeUnit.SECONDS)
.subscribe(System.out::println, Throwable::printStacktrace, () -> System.out.println("Done"));
| Modifier and Type | Class and Description |
|---|---|
static class |
AbstractOnSubscribe
Represents a per-subscription state for the
AbstractOnSubscribe operation.
|
| Constructor and Description |
|---|
AbstractOnSubscribe()
|
| Modifier and Type | Method and Description |
|---|---|
void |
call(Subscriber
|
static <T |
create(Action1
Creates an
AbstractOnSubscribe instance which calls the provided
next action.
|
static <T |
create(Action1
Creates an
AbstractOnSubscribe instance which creates a custom state with the
onSubscribe function and calls the provided
next action.
|
static <T |
create(Action1
Creates an
AbstractOnSubscribe instance which creates a custom state with the
onSubscribe function, calls the provided
next action and calls the
onTerminated action to release the state when its no longer needed.
|
protected abstract void |
next(AbstractOnSubscribe
Override this method to create an emission state-machine.
|
protected S |
onSubscribe(Subscriber
Called when a Subscriber subscribes and lets the implementor create a per-subscriber custom state.
|
protected void |
onTerminated(S state)
Called after the terminal emission or when the downstream unsubscribes.
|
Observable |
toObservable()
Convenience method to create an Observable from this implemented instance.
|
protected S onSubscribe(Subscriber<? super T> subscriber)
Override this method to have custom state per-subscriber. The default implementation returns null.
subscriber - the subscriber who is subscribing
protected void onTerminated(S state)
This is called only once and no onNext call will run concurrently with it. The default implementation does nothing.
state - the user-provided state
protected abstract void next(AbstractOnSubscribe.SubscriptionState <T ,S> state)
state - the per-subscriber subscription state
public final void call(Subscriber<? super T> subscriber)
public final Observable<T> toObservable()
public static <T,S> AbstractOnSubscribe <T ,S> create(Action1 <AbstractOnSubscribe .SubscriptionState <T ,S>> next)
AbstractOnSubscribe instance which calls the provided
next action.
This is a convenience method to help create AbstractOnSubscribe instances with the help of lambdas.
T - the value type
S - the per-subscriber user-defined state type
next - the next action to call
AbstractOnSubscribe instance
public static <T,S> AbstractOnSubscribe <T ,S> create(Action1 <AbstractOnSubscribe .SubscriptionState <T ,S>> next, Func1 <? super Subscriber <? super T> ,? extends S> onSubscribe)
AbstractOnSubscribe instance which creates a custom state with the
onSubscribe function and calls the provided
next action.
This is a convenience method to help create AbstractOnSubscribe instances with the help of lambdas.
T - the value type
S - the per-subscriber user-defined state type
next - the next action to call
onSubscribe - the function that returns a per-subscriber state to be used by
next
AbstractOnSubscribe instance
public static <T,S> AbstractOnSubscribe <T ,S> create(Action1 <AbstractOnSubscribe .SubscriptionState <T ,S>> next, Func1 <? super Subscriber <? super T> ,? extends S> onSubscribe, Action1 <? super S> onTerminated)
AbstractOnSubscribe instance which creates a custom state with the
onSubscribe function, calls the provided
next action and calls the
onTerminated action to release the state when its no longer needed.
This is a convenience method to help create AbstractOnSubscribe instances with the help of lambdas.
T - the value type
S - the per-subscriber user-defined state type
next - the next action to call
onSubscribe - the function that returns a per-subscriber state to be used by
next
onTerminated - the action to call to release the state created by the
onSubscribe function
AbstractOnSubscribe instance