转载

Rxjava2 Observable源码浅析

  • 上游:一般来说是 ObservableFlowableSubject
  • 下游:一般来说是 ObserverSubscrption
  • cold Observable:只有通过 Observable#subscribe 才开始请求上游发送数据。当下游请求 dispose() 停止通知上游停止发送。
  • hot Observable:不管有无下游,都可以进行数据的发送
  • Rxjava1 开始就有人说Rxjava可以看作流水线,上游怎么加工对于下游来说是无感知的,下游只要负责接收响应对应数据事件就行。

对于rxajva的思考,可以参考一下:Rxjava沉思录系列和 Rxjava主要负责人系列博客

cold Observable

一般cold Observable创建都是通过 justcreatefromXXjust 创建的。最简单粗暴的创建方式:

Observable.create<String> { it.onNext("") }.subscribe()
复制代码
//[仅关注点相关代码]
//ObservableOnSubscribe仅一个subscribe方法
public interface ObservableOnSubscribe<T> {
    void subscribe(ObservableEmitter<T> e) throws Exception;
}

public abstract class Observable<T> implements ObservableSource<T> {
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        //RxJavaPlugins这是一个全局Hook,#onAssembly不实现默认直接返回
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

    public final void subscribe(Observer<? super T> observer) {
        try {
            ........
            //真正调用subscribe的实现
            subscribeActual(observer);
        } 
        ......
    }
    //整个Observable唯一的抽象方法,由子Observable实现,通过这个方法将上游和下游关联起来
    protected abstract void subscribeActual(Observer<? super T> observer);
}
复制代码

Observable#create 真正返回的是 ObservableCreate ,当调用 Observable#subscribe 才真正通知上游 Observable 开始发送数据。其实质是通过 #subscribeActual 将上下游建立联系,并调用上游 #subscribe (在 ObservableCreate 中就是 ObservableOnSubscribe#subscribe )方法通知上游,下游已订阅可以开始发送数据。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            //source(上游)即Observable#create传入的ObservableOnSubscribe
            //这里就将上下游真正的联系了起来。
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
        
        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
        .....
    }
}
复制代码

所以实质就是下游通知上游,下游已产生订阅触发上游下发数据/事件,上游再通过下发数据/事件,最终下游通过指定方法响应上游下发的数据/事件。所以一开始说的流水线方式就可以理解了。

因为每次下游产生一次订阅都会通知到上游的 #subscribe ,所以如果上游只在 #subscribe 中去创建初始数据源就可以每个做到不同下游的数据不关联

Observable.create<String> { it.onNext("") }.subscribe() 流程图如下:

Rxjava2 Observable源码浅析

map操作符

//Observable#map
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    //**ObservableMap将上游Observable和当前的转换Function建立联系
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

//ObservableMap.java
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        //将下游包装成MapObserver,并将MapObserver和上游建立联系
        //这样上游下发时,先通过MapObserver处理才下发给真正的Observer
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            ....
            U v;
            try {
                //通过Function获取到map后的数据
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch ....
            //向下游下发数据
            actual.onNext(v);
        }
        ...
    }
}
复制代码

可以看到 map 操作符的作用就是通过将上游拦截返回 ObservableMap 提供给下游订阅,并在map上游返回数据前通过 mapper 将上游数据转化并下发给下游。

线程调度

subscribeOn

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    //emmmm,是不是点眼熟
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
//ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //onSubscribe()方法执行在 订阅处所在的线程
        s.onSubscribe(parent);
        //将上游放入scheduler中调用,且立即执行
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        //scheduler#scheduleDirect中执行完后
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //该方法调用已经在scheduler中调用
            source.subscribe(parent);
        }
    }
}
复制代码

由源码可以看出由 scheduler.scheduleDirect -> SubscribeTask#run -> SubscribeOnObserver#subscribe(observer) 将整个调度切换到指定线程中。

因为订阅是用下自上的,所以 subscribeOn 也总是离源最近的一个生效。因为触发源的 subscribe 是离源最近一个。

observeOn

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
复制代码

可以看出Rxjava的操作符套路基本是将源 Observable 通过装饰者模式封装一层再返回新的Observable

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;//默认false
    final int bufferSize;//一般128
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //创建主线程调度器
            Scheduler.Worker w = scheduler.createWorker();
            //关联上下游,触发上游订阅过程
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}
复制代码

这里可以看出 ObservableObserveOn 还是很简单的,上游订阅过程并不用关心,下游的触发则由 ObserveOnObserver 处理。

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
        implements Observer<T>, Runnable {

    private static final long serialVersionUID = 6576896619930983584L;
    final Observer<? super T> actual;
    final Scheduler.Worker worker;
    final boolean delayError;
    final int bufferSize;
    //上游数据的缓存队列
    SimpleQueue<T> queue;

    Disposable s;

    Throwable error;
    volatile boolean done;

    volatile boolean cancelled;

    int sourceMode;

    boolean outputFused;

    ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
        this.actual = actual;
        this.worker = worker;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
            ......
            //创建对接缓存数据
            queue = new SpscLinkedArrayQueue<T>(bufferSize);
            //回调下游onSubscribe
            actual.onSubscribe(this);
        }
    }

    @Override
    public void onNext(T t) {
        if (done) {//执行过complete/error则done为true
            return;
        }
        
        if (sourceMode != QueueDisposable.ASYNC) {//非异步数据,默认同步数据
            queue.offer(t);//入队列
        }
        schedule();
    }

    @Override
    public void onError(Throwable t) {
        if (done) {
            RxJavaPlugins.onError(t);
            return;
        }
        error = t;
        done = true;//标记已完成
        schedule();
    }

    @Override
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;//标记已完成
        schedule();
    }

    @Override
    public void dispose() {
        if (!cancelled) {
            cancelled = true;
            s.dispose();
            worker.dispose();
            if (getAndIncrement() == 0) {
                queue.clear();
            }
        }
    }

    @Override
    public boolean isDisposed() {
        return cancelled;
    }

    void schedule() {
        //自旋+1,!=0则表示worker.schedule已在执行无需在调度
        if (getAndIncrement() == 0) {
            worker.schedule(this);//通过调度器处理,将数据取出下发到下游
        }
    }
    
    @Override
    public void run() {
        if (outputFused) {//默认false
            drainFused();
        } else {
            drainNormal();//取出数据下发
        }
    }

    void drainNormal() {
        int missed = 1;

        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = actual;

        for (; ; ) {
            //检测是否不用再处理
            if (checkTerminated(done, q.isEmpty(), a)) {
                return;
            }

            for (; ; ) {
                boolean d = done;
                T v;

                try {
                    v = q.poll();//取出一个数据
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    s.dispose();
                    q.clear();
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;

                if (checkTerminated(d, empty, a)) {//可能已经提前disposed了
                    return;
                }

                if (empty) {//数据为空队列无数据,退出下发循环
                    break;
                }
                //下发
                a.onNext(v);
            }
            
            //可能有错过的schedule,再次循环检测
            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
    
    //检测是否compelte/error/队列已空
    boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
        if (cancelled) {//已经disposed
            queue.clear();
            return true;
        }
        if (d) {//是否已结束
            Throwable e = error;
            if (delayError) {//延迟error,等待队列清空
                if (empty) {
                    if (e != null) {
                        a.onError(e);
                    } else {
                        a.onComplete();
                    }
                    worker.dispose();
                    return true;
                }
            } else {
                if (e != null) {
                    queue.clear();
                    a.onError(e);//下发error
                    worker.dispose();
                    return true;
                } else if (empty) {
                    a.onComplete();
                    worker.dispose();
                    return true;
                }
            }
        }
        return false;
    }
}
复制代码

ObserveOnObserver 继承于 BasicIntQueueDisposable 继承于 AtomicInteger ,通过自身的原子性(自旋/CAS)来消除多线程对 #schedule 的调用。

可以看出 #observeOn 只对下游有影响。

原文  https://juejin.im/post/5dba56c35188252d1378b27c
正文到此结束
Loading...