Class AbstractOnSubscribe<T,S>

  • Type Parameters:
    T - the value type
    S - the per-subscriber user-defined state type
    All Implemented Interfaces:
    Action, Action1<Subscriber<? super T>>, Function, Observable.OnSubscribe<T>


    @Experimental
    public abstract class AbstractOnSubscribe<T,S>
    extends Object
    implements Observable.OnSubscribe<T>
    Abstract base class for the Observable.OnSubscribe interface that helps you build Observable sources one onNext at a time, and automatically supports unsubscription and backpressure.

    Usage rules

    When you implement the next() method, you The AbstractOnSubscribe.SubscriptionState object features counters that may help implement a state machine:
    • A call counter, accessible via state.calls() tells how many times the next() was run (zero based).
    • You can use a phase counter, accessible via state.phase, that helps track the current emission phase, in a switch() statement to implement the state machine. (It is named phase to avoid confusion with the per-subscriber state.)
    • You can arbitrarily change the current phase with state.advancePhase(), state.advancedPhaseBy(int) and state.phase(int).

    When you implement AbstractOnSubscribe, you may override onSubscribe(rx.Subscriber<? super T>) to perform special actions (such as registering Subscriptions with Subscriber.add()) and return additional state for each subscriber subscribing. You can access this custom state with the state.state() method. If you need to do some cleanup, you can override the onTerminated(S) method.

    For convenience, a lambda-accepting static factory method, create(rx.functions.Action1<rx.observables.AbstractOnSubscribe.SubscriptionState<T, S>>), is available. Another convenience is toObservable() which turns an AbstractOnSubscribe instance into an Observable fluently.

    Examples

    Note: these examples use the lambda-helper factories to avoid boilerplate.

    Implement: just

    
     AbstractOnSubscribe.create(s -> {
       s.onNext(1);
       s.onCompleted();
     }).toObservable().subscribe(System.out::println);
     

    Implement: from Iterable

    
     Iterable
                        
                           iterable = ...; AbstractOnSubscribe.create(s -> { Iterator
                          
                             it = s.state(); if (it.hasNext()) { s.onNext(it.next()); } if (!it.hasNext()) { s.onCompleted(); } }, u -> iterable.iterator()).subscribe(System.out::println); 
                          
                        

    Implement source that fails a number of times before succeeding

    
     AtomicInteger fails = new AtomicInteger();
     int numFails = 50;
     AbstractOnSubscribe.create(s -> {
       long c = s.calls();
       switch (s.phase()) {
       case 0:
         s.onNext("Beginning");
         s.onError(new RuntimeException("Oh, failure.");
         if (c == numFails.getAndIncrement()) {
           s.advancePhase();
         }
         break;
       case 1:
         s.onNext("Beginning");
         s.advancePhase();
       case 2:
         s.onNext("Finally working");
         s.onCompleted();
         s.advancePhase();
       default:
         throw new IllegalStateException("How did we get here?");
       }
     }).subscribe(System.out::println);
     

    Implement: never

    
     AbstractOnSubscribe.create(s -> {
       s.stop();
     }).toObservable()
     .timeout(1, TimeUnit.SECONDS)
     .subscribe(System.out::println, Throwable::printStacktrace, () -> System.out.println("Done"));
     
    Since:
    (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
    • Constructor Detail

      • AbstractOnSubscribe

        public AbstractOnSubscribe()
    • Method Detail

      • onSubscribe

        protected S onSubscribe(Subscriber<? super T> subscriber)
        Called when a Subscriber subscribes and lets the implementor create a per-subscriber custom state.

        Override this method to have custom state per-subscriber. The default implementation returns null.

        Parameters:
        subscriber - the subscriber who is subscribing
        Returns:
        the custom state
      • onTerminated

        protected void onTerminated(S state)
        Called after the terminal emission or when the downstream unsubscribes.

        This is called only once and no onNext call will run concurrently with it. The default implementation does nothing.

        Parameters:
        state - the user-provided state
      • next

        protected abstract void next(AbstractOnSubscribe.SubscriptionState<T,S> state)
        Override this method to create an emission state-machine.
        Parameters:
        state - the per-subscriber subscription state
      • call

        public final void call(Subscriber<? super T> subscriber)
      • toObservable

        public final Observable<T> toObservable()
        Convenience method to create an Observable from this implemented instance.
        Returns:
        the created observable
      • create

        public static <T,S> AbstractOnSubscribe<T,S> create(Action1<AbstractOnSubscribe.SubscriptionState<T,S>> next)
        Creates an AbstractOnSubscribe instance which calls the provided next action.

        This is a convenience method to help create AbstractOnSubscribe instances with the help of lambdas.

        Type Parameters:
        T - the value type
        S - the per-subscriber user-defined state type
        Parameters:
        next - the next action to call
        Returns:
        an AbstractOnSubscribe instance
      • create

        public static <T,S> AbstractOnSubscribe<T,S> create(Action1<AbstractOnSubscribe.SubscriptionState<T,S>> next,
                                                            Func1<? super Subscriber<? super T>,? extends S> onSubscribe)
        Creates an AbstractOnSubscribe instance which creates a custom state with the onSubscribe function and calls the provided next action.

        This is a convenience method to help create AbstractOnSubscribe instances with the help of lambdas.

        Type Parameters:
        T - the value type
        S - the per-subscriber user-defined state type
        Parameters:
        next - the next action to call
        onSubscribe - the function that returns a per-subscriber state to be used by next
        Returns:
        an AbstractOnSubscribe instance
      • create

        public static <T,S> AbstractOnSubscribe<T,S> create(Action1<AbstractOnSubscribe.SubscriptionState<T,S>> next,
                                                            Func1<? super Subscriber<? super T>,? extends S> onSubscribe,
                                                            Action1<? super S> onTerminated)
        Creates an AbstractOnSubscribe instance which creates a custom state with the onSubscribe function, calls the provided next action and calls the onTerminated action to release the state when its no longer needed.

        This is a convenience method to help create AbstractOnSubscribe instances with the help of lambdas.

        Type Parameters:
        T - the value type
        S - the per-subscriber user-defined state type
        Parameters:
        next - the next action to call
        onSubscribe - the function that returns a per-subscriber state to be used by next
        onTerminated - the action to call to release the state created by the onSubscribe function
        Returns:
        an AbstractOnSubscribe instance