转载

RxJava源码解析

本文中的源码基于 RxJava1

RxJava : io.reactivex:rxjava:1.3.4

以下是各个 Part 主要分析源码的方向

  • Part 1: Observable, Observable.OnSubscribe, Subscriber
  • Part 2: map
  • Part 3: subscribeOn
  • Part 4: observeOn

Part 1

先来看一个最简单的 RxJava Demo

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("hi rxjava");
    }
}).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

那么,我们就从 Observable.create 的角度开始分析 RxJava 内部源码的实现。

public class Observable<T> {

    @Deprecated
    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

    ...

}

可以看到,我们上面传入 OnSubscribe 对象 f 被 RxJavaHooks 包装了一下。但是默认情况下的 RxJavaHooks.onCreate 返回的就是 f 本身。

public final class RxJavaHooks {

    public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
        Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
        if (f != null) {
            return f.call(onSubscribe);
        }
        return onSubscribe;
    }

}

接着,根据上面 demo 的代码可以看出,创建出来的 Observable 对象又调用了 subscribe 方法。

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

在 subscribe 内部调用了 subscribe(Subscriber<? super T> subscriber, Observable<T> observable) 方法

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
 // 一开始是对参数的校验
    if (subscriber == null) {
        throw new IllegalArgumentException("subscriber can not be null");
    }
    if (observable.onSubscribe == null) {
        throw new IllegalStateException("onSubscribe function can not be null.");
        /*
         * the subscribe function can also be overridden but generally that's not the appropriate approach
         * so I won't mention that in the exception
         */
    }

    // 调用 subscriber.onStart 方法,默认是空实现
    subscriber.onStart();

    /*
     * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
     * to user code from within an Observer"
     */
    // if not already wrapped
    // 把 subscriber 包装成 SafeSubscriber
    if (!(subscriber instanceof SafeSubscriber)) {
        // assign to `observer` so we return the protected version
        subscriber = new SafeSubscriber<T>(subscriber);
    }

    // The code below is exactly the same an unsafeSubscribe but not used because it would
    // add a significant depth to already huge call stacks.
    try {
        // 开始执行 onSubscribe 的 onCall 方法
        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        // special handling for certain Throwable/Error/Exception types
        Exceptions.throwIfFatal(e);
        // in case the subscriber can't listen to exceptions anymore
        if (subscriber.isUnsubscribed()) {
            RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
        } else {
            // 如果发生了异常,就调用 subscriber 的 onError 方法
            try {
                subscriber.onError(RxJavaHooks.onObservableError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                RxJavaHooks.onObservableError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r; // NOPMD
            }
        }
        return Subscriptions.unsubscribed();
    }	
}

我们详细的来看下 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber) 这句代码。

public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
    Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
    if (f != null) {
        return f.call(instance, onSubscribe);
    }
    return onSubscribe;
}

和上面的 RxJavaHooks 一样,默认的 hook 只是返回了 onSubscribe 对象。所以这句代码就可以“简化”为 onSubscribe.call(subscriber)

也就是执行了 demo 中的 call 方法。

@Override
public void call(Subscriber<? super String> subscriber) {
    subscriber.onNext("hi rxjava");
}

而在 call 方法中又调用了 subscriber 的 onNext 方法。

还记得上面 subscriber 被包装成 SafeSubscriber 了吗?

所以这里就会调用 SafeSubscriber.onNext 方法。

@Override
public void onNext(T t) {
    try {
        if (!done) {
            // actual就是我们自己定义的subscriber
            actual.onNext(t);
        }
    } catch (Throwable e) {
        // we handle here instead of another method so we don't add stacks to the frame
        // which can prevent it from being able to handle StackOverflow
        Exceptions.throwOrReport(e, this);
    }
}

在 SafeSubscriber.onNext 方法内,会调用真正的 subscriber.onNext 方法。SafeSubscriber 的作用就是是为了防止被调用 onCompleted 之后再重新调用 onNext 。换句话说,SafeSubscriber 就是为了防止重用。

因此,subscriber.onNext 也就被执行了。

@Override
public void onNext(String s) {
    System.out.println(s);
}

Part 2

在这里,我们把上面简单的 demo 稍微增加一点难度,中间加一个转换:

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("hi rxjava");
    }
})
.map(new Func1<String, String>() {
    @Override
    public String call(String s) {
        return s + "hahaha";
    }
})
.subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

可以看到,中间加了一层 map 操作符。

所以我们来分析一下 map 中到底干了什么。

public class Observable<T> {

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
    }

}

map 中调用了 unsafeCreate 方法。我们来看看 unsafeCreate 方法内部

public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

这代码多么似曾相识啊,和上面的 Observable.create 比较一下,发现 Observable.unsafeCreate 和 Observable.create 的逻辑是一样的。

所以我们可以知道, map 操作符内部会重新创建一个 Observable ,而这个 Observable 的 OnSubscribe 是一个 OnSubscribeMap 对象。

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source; // 原来我们自己创建的 Observable
        this.transformer = transformer; // Fun1 转换
    }

}

所以我们在这里可以小结一下,map 操作符会创建一个新的 Observable 对象,并且它的 OnSubscribe 是一个 OnSubscribeMap 对象,而我们自己的 Observable 会保存在 OnSubscribeMap 里。

再回头看看上面的 demo ,发现 map 创建出来的 Observable 对象调用了 subscribe 方法。在 Part 1 中我们分析过,调用 subscribe 方法内部其实就是会去调用 Observable 中 OnSubscribe 的 call 方法。

所以,我们直接来看 OnSubscribeMap 的 call 方法。

@Override
public void call(final Subscriber<? super R> o) { // o 就是我们自定义的 Subscriber
    MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
    o.add(parent);
    // source 就是我们自己创建的 Observable
    // 和 subscribe 相比,unsafeSubscribe 内部不会对参数校验,subscriber 不会包装成 SafeSubscriber
    source.unsafeSubscribe(parent);
}

call 方法内部创建了一个新的 MapSubscriber 对象,

static final class MapSubscriber<T, R> extends Subscriber<T> {

    public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
        this.actual = actual; // actual 就是我们自定义的 Subscriber
        this.mapper = mapper; // mapper 就是 Func1 转换
    }
}

然后让我们自己的 Observable 去 subscribe 这个 MapSubscriber 对象。

那么接着代码就会执行到 MapSubscriber.call 方法。

@Override
public void onNext(T t) {
    R result;

    try {
        // 执行 Func1 转换器 实现从 T 到 R 的转换
        // 在 demo 中就是在 hi rxjava 后面追加 hahaha
        result = mapper.call(t);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        unsubscribe();
        onError(OnErrorThrowable.addValueAsLastCause(ex, t));
        return;
    }
    // 执行我们自定义的 Subscriber 的 onNext 方法
    actual.onNext(result);
}

到这里,整个 map 操作符的流程就讲完了。不明白的同学可以对照着源码多读几遍,相信你会明白的。

献上官方对 map 操作符的示意图

RxJava源码解析

Part 3

这一小节来看看 subscribeOn 操作,先来看 demo

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("hi rxjava");
    }
})
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

先来看看 Schedulers.io() 中到底干了什么。

public static Scheduler io() {
    return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}

其实就是获取 Schedulers 单例中的 ioScheduler 。

private static Schedulers getInstance() {
    for (;;) {
        Schedulers current = INSTANCE.get();
        if (current != null) {
            return current;
        }
        current = new Schedulers();
        if (INSTANCE.compareAndSet(null, current)) {
            return current;
        } else {
            current.shutdownInstance();
        }
    }
}

private Schedulers() {
    @SuppressWarnings("deprecation")
    RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();

    Scheduler c = hook.getComputationScheduler();
    if (c != null) {
        computationScheduler = c;
    } else {
        computationScheduler = RxJavaSchedulersHook.createComputationScheduler();
    }

    Scheduler io = hook.getIOScheduler();
    if (io != null) {
        ioScheduler = io;
    } else {
        ioScheduler = RxJavaSchedulersHook.createIoScheduler();
    }

    Scheduler nt = hook.getNewThreadScheduler();
    if (nt != null) {
        newThreadScheduler = nt;
    } else {
        newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();
    }
}

可以看到,ioScheduler 默认实现是 RxJavaSchedulersHook.createIoScheduler()

public static Scheduler createIoScheduler() {
    return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}

public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
    if (threadFactory == null) {
        throw new NullPointerException("threadFactory == null");
    }
    return new CachedThreadScheduler(threadFactory);
}

默认创建一个新的 CachedThreadScheduler 。

public CachedThreadScheduler(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
    this.pool = new AtomicReference<CachedWorkerPool>(NONE);
    start();
}

@Override
public void start() {
    CachedWorkerPool update =
        new CachedWorkerPool(threadFactory, KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
    if (!pool.compareAndSet(NONE, update)) {
        update.shutdown();
    }
}

在 CachedWorkerPool 构造方法内部会去创建线程池。

然后回过头来看 subscribeOn 方法的内部代码:

public final Observable<T> subscribeOn(Scheduler scheduler) {
    return subscribeOn(scheduler, !(this.onSubscribe instanceof OnSubscribeCreate));
}

public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
}

发现逻辑都是类似的,也是创建了一个新的 Observable , 而对应的 OnSubscribe 是一个 OperatorSubscribeOn 对象。

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
        this.scheduler = scheduler;
        this.source = source; // 真正的 Observable
        this.requestOn = requestOn;
    }

}

按照以往的惯例,最后 subscribe 的时候肯定会调用 OperatorSubscribeOn 的 call 方法,所以我们直接去看 call 方法。

@Override
public void call(final Subscriber<? super T> subscriber) {
    final Worker inner = scheduler.createWorker();
    // 创建时传入真正的 subscriber
    SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
    subscriber.add(parent);
    subscriber.add(inner);

    inner.schedule(parent);
}

scheduler.createWorker() 是可以理解为在新的工作线程中去做某一个动作(Action0)。前面说过,这里的 scheduler 是 CachedThreadScheduler 类型,所以 createWorker 就是创建了一个 EventLoopWorker 对象。

@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

然后调用 inner.schedule 。

@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
    if (innerSubscription.isUnsubscribed()) {
        // don't schedule, we are unsubscribed
        return Subscriptions.unsubscribed();
    }
    // threadWorker.scheduleActual 就是调用了线程池去执行这个 action0
    ScheduledAction s = threadWorker.scheduleActual(new Action0() {
        @Override
        public void call() {
            if (isUnsubscribed()) {
                return;
            }
            // 这里的 action 就是上面的 SubscribeOnSubscriber 对象
            // 因此 SubscribeOnSubscriber 的 call 方法就在工作线程中被调用了 
            action.call();
        }
    }, delayTime, unit);
    innerSubscription.add(s);
    s.addParent(innerSubscription);
    return s;
}

接下来就到 SubscribeOnSubscriber 的 call 方法中看看。

@Override
public void call() {
    Observable<T> src = source;
    source = null;
    t = Thread.currentThread();
    // 把真正的 Observable subscribe 到 SubscribeOnSubscriber 中
    src.unsafeSubscribe(this);
}

最后,真正的 Observable 调用 call 方法时,会调用 SubscribeOnSubscriber 的 onNext 方法。

@Override
public void onNext(T t) {
    // actual 是真正我们自定义的 Subscriber
    actual.onNext(t);
}

SubscribeOnSubscriber 中的 onNext 方法再把参数传给真正的 Subscriber 。

到这里,就把 subscribeOn 切换线程的原理讲完了。

Part 4

讲完了 subscribeOn ,再来看 observeOn 会简单很多。还是先来个 demo 吧

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("hi rxjava");
    }
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

先分析 AndroidSchedulers.mainThread() 。

public static Scheduler mainThread() {
    return getInstance().mainThreadScheduler;
}

和 Schedulers.io 类似,也是去获取 AndroidSchedulers.mainThreadScheduler

private static AndroidSchedulers getInstance() {
    for (;;) {
        AndroidSchedulers current = INSTANCE.get();
        if (current != null) {
            return current;
        }
        current = new AndroidSchedulers();
        if (INSTANCE.compareAndSet(null, current)) {
            return current;
        }
    }
}

private AndroidSchedulers() {
    RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook();

    Scheduler main = hook.getMainThreadScheduler();
    if (main != null) {
        mainThreadScheduler = main;
    } else {
        // 传入主线程的 looper 
        mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());
    }
}

LooperScheduler 的构造方法

class LooperScheduler extends Scheduler {
    private final Handler handler;

    LooperScheduler(Looper looper) {
        handler = new Handler(looper);
    }

    LooperScheduler(Handler handler) {
        this.handler = handler;
    }

}

内部就是创建了主线程的 Handler 。然后利用 Handler 去发送消息就行了。

那么我们就来看看 observeOn 方法。

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, RxRingBuffer.SIZE);
}

public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
    return observeOn(scheduler, false, bufferSize);
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

可以看到 observeOn 内部利用了 lift ,那么什么是 lift 呢?

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

原来 lift 内部也是去创建一个新的 Observable ,而且 Observable.OnSubscribe 是一个 OnSubscribeLift 对象。套路都是相似的,不一样的就是需要额外传入一个 Operator 对象。从上面可知, Operator 就是一个 OperatorObserveOn 对象。

接着就去 OnSubscribeLift 的 call 方法中看看。

@Override
public void call(Subscriber<? super R> o) {
    try {
        // 执行 operator.call 方法,返回一个 Subscriber 对象
        Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
        try {
            // new Subscriber created and being subscribed with so 'onStart' it
            st.onStart();
            // 这里的 parent 就是我们真正的 onSubscribe
            parent.call(st);
        } catch (Throwable e) {
            // localized capture of errors rather than it skipping all operators
            // and ending up in the try/catch of the subscribe method which then
            // prevents onErrorResumeNext and other similar approaches to error handling
            Exceptions.throwIfFatal(e);
            st.onError(e);
        }
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // if the lift function failed all we can do is pass the error to the final Subscriber
        // as we don't have the operator available to us
        o.onError(e);
    }
}

关键代码 RxJavaHooks.onObservableLift(operator).call(o); ,可以猜到默认hook就是返回 operator 本身。那么我们到 operator.call 中看看。

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
    if (scheduler instanceof ImmediateScheduler) {
        // avoid overhead, execute directly
        return child;
    } else if (scheduler instanceof TrampolineScheduler) {
        // avoid overhead, execute directly
        return child;
    } else {
        ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
        parent.init();
        return parent;
    }
}

从前面的代码可以知道,这里是 scheduler 是 AndroidScheduler 。所以这里会返回一个 ObserveOnSubscriber 对象。从上面的代码可知,返回了 ObserveOnSubscriber 对象之后,会调用 parent.call(st) 。这里的 parent 就是最原始,也就是我们自定义的 Observable 。所以最后代码就走到了 ObserveOnSubscriber 的 onNext 方法中。

static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {


    @Override
    public void onNext(final T t) {
        if (isUnsubscribed() || finished) {
            return;
        }
        if (!queue.offer(NotificationLite.next(t))) {
            onError(new MissingBackpressureException());
            return;
        }
        schedule();
    }

    protected void schedule() {
        if (counter.getAndIncrement() == 0) {
            // 这里的 recursiveScheduler 就是 LooperScheduler
            // 注意这里 schedule 的参数是 this !!!
            recursiveScheduler.schedule(this);
        }
    }

}

可以看到,这里会调用 LooperScheduler 来处理。

@Override
public Subscription schedule(final Action0 action) {
    // 这里的 action 参数就是 ObserveOnSubscriber 对象
    return schedule(action, 0, TimeUnit.MILLISECONDS);
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
    if (unsubscribed) {
        return Subscriptions.unsubscribed();
    }

    action = hook.onSchedule(action);
    // ScheduledAction 实现了 Runnable 接口
    ScheduledAction scheduledAction = new ScheduledAction(action, handler);
    // 之后该 message 会在主线程中取出,然后执行 ScheduledAction
    Message message = Message.obtain(handler, scheduledAction);
    message.obj = this; // Used as token for unsubscription operation.

    handler.sendMessageDelayed(message, unit.toMillis(delayTime));

    if (unsubscribed) {
        handler.removeCallbacks(scheduledAction);
        return Subscriptions.unsubscribed();
    }

    return scheduledAction;
}

LooperScheduler.schedule 主要做的就是构造出 message ,然后利用 Handler 把 message 发送到主线程中去执行。所以接着代码就到了 ScheduledAction.run 中。

static final class ScheduledAction implements Runnable, Subscription {

    @Override public void run() {
        try {
            // 这里的 action 参数就是 ObserveOnSubscriber 对象
            // 所以会调用 ObserveOnSubscriber.call       
            action.call();
        } catch (Throwable e) {
            // nothing to do but print a System error as this is fatal and there is nowhere else to throw this
            IllegalStateException ie;
            if (e instanceof OnErrorNotImplementedException) {
                ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e);
            } else {
                ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
            }
            RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
            Thread thread = Thread.currentThread();
            thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
        }
    }

}

ScheduledAction.run 是在主线程中运行的,而 run 方法中调用了 action.call(); 。action 其实就是原来那个 ObserveOnSubscriber 对象。

所以代码再次跳转到 ObserveOnSubscriber.call 方法中。

// only execute this from schedule()
@Override
public void call() {
    long missed = 1L;
    long currentEmission = emitted;

    // these are accessed in a tight loop around atomics so
    // loading them into local variables avoids the mandatory re-reading
    // of the constant fields
    final Queue<Object> q = this.queue;
    // 这里的 child 就是真正的我们自定义的 Subscriber
    final Subscriber<? super T> localChild = this.child; 
    
    // requested and counter are not included to avoid JIT issues with register spilling
    // and their access is is amortized because they are part of the outer loop which runs
    // less frequently (usually after each bufferSize elements)

    for (;;) {
        long requestAmount = requested.get();

        while (requestAmount != currentEmission) {
            boolean done = finished;
            Object v = q.poll();
            boolean empty = v == null;

            if (checkTerminated(done, empty, localChild, q)) {
                return;
            }

            if (empty) {
                break;
            }
            // 在主线程中调用了真正的 Subscriber 的 onNext 方法       
            localChild.onNext(NotificationLite.<T>getValue(v));

            currentEmission++;
            if (currentEmission == limit) {
                requestAmount = BackpressureUtils.produced(requested, currentEmission);
                request(currentEmission);
                currentEmission = 0L;
            }
        }

        if (requestAmount == currentEmission) {
            if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                return;
            }
        }

        emitted = currentEmission;
        missed = counter.addAndGet(-missed);
        if (missed == 0L) {
            break;
        }
    }
}

到这里,observeOn 实现切换线程的原理就讲完了。基本的 RxJava 操作中的源码也都讲了一遍。至于其他的操作符后面有空再讲吧。

原文  https://yuqirong.me/2019/11/16/RxJava源码解析/
正文到此结束
Loading...