RxJava之Scheduer(调度器)

RxJava是一种为异步编程而实现的库,异步是其重要特色,合理地利用异步编程能够提高系统的处理速度。但是异步也会带来线程安全问题,而且异步并不等于并发,与异步概念相对应的是同步

在默认情况下,RxJava只在当前线程中运行,它是单线程的。此时Observable用于发射数据流,Observer用于接收和响应数据流,各种操作符(Operators)用于加工数据流,它们都在同一个线程中运行,实现出来的是一个同步的函数响应式。然而,函数响应式的实际应用是大部分操作都在后台处理,前台响应的一个过程。所以需要对刚才的流程做一下修改,改成Observable生成发射数据流,Operators加工数据流在后台线程中进行,Observer在前台线程中接受并响应数据。 此时会涉及使用多线程来操作RxJava,可以使用RxJava的调度器(Scheduler)来实现。

二.Scheduler

Scheduler是RxJava对线程控制其的一个抽象,RxJava内置了多个Scheduler的实现,它们基本满足绝大多数使用场景,如下表:

Scheduler 作用
single 使用定长为1的线程池(new Sheduled Thread Pool(1)),重复利用这个线程
newThread 每次都启用新线程,并在新线程中执行操作
computation 使用的固定的线程池(Fixed Scheduler Pool),大小为CPU核数,适用于CPU密集型计算
io 适用I/O操作(读写文件,读写数据库,网络信息交互等)所使用的Scheduler。行为模式和 newThread() 差不多,区别 在于 : io() 的内部实现是用一个 无数量上限的线程池 ,可 以重用空闲的线程 ,因而多数情况下, io()比newThread()更有效率
tranpoline 直接在当前线程运行,如果当前线程有其他任务正在执行,则会先暂停其他任务
Schedulers.from java.util.concurrent.Executor转换成一个调度器实例,即可以自定义一个Executor来作为调度器

三.RxJava线程模型

RxJava的被观察者们在使用操作符时可以利用线程调度器——Scheduler来切换线程,例如:

public void SchedulersTest() {
        Observable.just("aaa", "bbb")
                .observeOn(Schedulers.newThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        System.out.println("Map对应的线程:" + Thread.currentThread().getName() + "/t" + Thread.currentThread().getId());
                        return s.toUpperCase();
                    }
                })
                .subscribeOn(Schedulers.single())
                .observeOn(Schedulers.io())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println("subscribe对应的线程:" + Thread.currentThread().getName() + "/t" + Thread.currentThread().getId());
                        System.out.println(s);
                    }
                });
    }
复制代码

下图不同的箭头颜色表示不同的线程:

RxJava之Scheduer(调度器)

图片来源于:
RxJava 线程模型分析

其中,蓝色表示主线程、橙色表示newThread、粉色表示I/O线程。

(一)线程调度器

Schedulers是一个静态工厂类,通过分析Schedulers的源码可以看出他有多个不同类型的Scheduler。下面是Schedulers的各个工厂方法:

  • computation()

computation()用于CPU密集型的计算任务,但并不适合I/O操作。

/**
     * Creates and returns a {@link Scheduler} intended for computational work.
     *
     * @return a {@link Scheduler} meant for computation-bound work
     */
    @NonNull
    public static Scheduler computation() {
        return RxJavaPlugins.onComputationScheduler(COMPUTATION);
    }
复制代码
  • io()

io()用于I/O密集型任务,支持异步阻塞I/O操作,这个调度器的线程池会根据需要增长。对于 普通的计算任务 ,请使 用Schedulers.computation()

/**
     * Creates and returns a {@link Scheduler} intended for IO-bound work.
     *
     * @return a {@link Scheduler} meant for IO-bound work
     */
    @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
复制代码
  • tranpoline()

在RxJava2中与在RxJava1中的作用不同。在RxJava2中表示立即执行,如果当前线程有任务在执行,则会将其暂停,等插入进来的新任务执行完成之后,再接着执行原先未完成的任务。在RxJava1中,表示在当前线程中等待其他任务完成之后,再执行新的任务。

/**
     * Creates and returns a {@link Scheduler} that queues work on the current thread to be executed after the
     * current work completes.
     *
     * @return a {@link Scheduler} that queues work on the current thread
     */
    @NonNull
    public static Scheduler trampoline() {
        return TRAMPOLINE;
    }
复制代码
  • newThread()

newThread()为每个任务创建一个新线程

/**
     * Creates and returns a {@link Scheduler} that creates a new {@link Thread} for each unit of work.
     *
     * @return a {@link Scheduler} that creates new threads
     */
    @NonNull
    public static Scheduler newThread() {
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    }
复制代码
  • single()

single()拥有一个线程单例,所有的任务都在这一个线程中执行。当此线程中有任务执行时,它的任务将会按照先进先出的顺序依次执行。

/**
     * Returns the common, single-thread backed Scheduler instance.
     * 
     * @return a {@link Scheduler} that shares a single backing thread.
     * @since 2.0
     */
    @NonNull
    public static Scheduler single() {
        return RxJavaPlugins.onSingleScheduler(SINGLE);
    }
复制代码

除此之外,还支持自定义的Executor来作为调度器。

/**
     * Converts an {@link Executor} into a new Scheduler instance.
     *
     * @param executor
     *          the executor to wrap
     * @return the new Scheduler wrapping the Executor
     */
    @NonNull
    public static Scheduler from(@NonNull Executor executor) {
        return new ExecutorScheduler(executor);
    }
复制代码

如下图,Scheduler是RxJava的线程任务调度器,Worker是线程任务的具体执行者。从Scheduler源码可以看出, SchedulerschedulerDirect()schedulerPeriodicallyDirect() 方法中 创建了Worker ,然后会分别调用worker的 schedule()schedulePeriodically() 来执行任务。

RxJava之Scheduer(调度器)

图2

图片来源于: RxJava 线程模型分析

(二)具体的实现方法

1.Schedulers类

  • scheduleDirect()
/**
     * Schedules the execution of the given task with the given delay amount.
     *
     * <p>
     * This method is safe to be called from multiple threads but there are no
     * ordering guarantees between tasks.
     *
     * @param run the task to schedule
     * @param delay the delay amount, non-positive values indicate non-delayed scheduling
     * @param unit the unit of measure of the delay amount
     * @return the Disposable that let us one cancel this particular delayed task.
     * @since 2.0
     */
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    decoratedRun.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);

        return w;
    }
复制代码
  • schedulePeriodicallyDirect()
/**
     * Schedules a periodic execution of the given task with the given initial delay and period.
     *
     * <p>
     * This method is safe to be called from multiple threads but there are no
     * ordering guarantees between tasks.
     *
     * <p>
     * The periodic execution is at a fixed rate, that is, the first execution will be after the initial
     * delay, the second after initialDelay + period, the third after initialDelay + 2 * period, and so on.
     *
     * @param run the task to schedule
     * @param initialDelay the initial delay amount, non-positive values indicate non-delayed scheduling
     * @param period the period at which the task should be re-executed
     * @param unit the unit of measure of the delay amount
     * @return the Disposable that let us one cancel this particular delayed task.
     * @since 2.0
     */
    @NonNull
    public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);

        Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
        if (d == EmptyDisposable.INSTANCE) {
            return d;
        }

        return periodicTask;
    }
复制代码

2.Worker类

Worker也是一个抽象类,从图2中可以看到,每种Scheduler会对应一种具体的Worker。

/**
         * Schedules a cancelable action to be executed periodically. This default implementation schedules
         * recursively and waits for actions to complete (instead of potentially executing long-running actions
         * concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
         * <p>
         * Note to implementors: non-positive {@code initialTime} and {@code period} should be regarded as
         * non-delayed scheduling of the first and any subsequent executions.
         *
         * @param run
         *            the Runnable to execute periodically
         * @param initialDelay
         *            time to wait before executing the action for the first time; non-positive values indicate
         *            an non-delayed schedule
         * @param period
         *            the time interval to wait each time in between executing the action; non-positive values
         *            indicate no delay between repeated schedules
         * @param unit
         *            the time unit of {@code period}
         * @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
         */
        @NonNull
        public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
            final SequentialDisposable first = new SequentialDisposable();

            final SequentialDisposable sd = new SequentialDisposable(first);

            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

            final long periodInNanoseconds = unit.toNanos(period);
            final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
            final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);

            Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
                    periodInNanoseconds), initialDelay, unit);

            if (d == EmptyDisposable.INSTANCE) {
                return d;
            }
            first.replace(d);

            return sd;
        }
复制代码

原文 

https://juejin.im/post/5eed99c8f265da02b032b166

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » RxJava之Scheduer(调度器)

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址