RxJava 的线程调度

RxJava 是一个为异步编程而实现的库,异步是其重要的特色,合理地利用异步编程能够提高系统的处理速度。但是异步也会带来线程安全问题,而且异步并不等于并发
在默认情况下,RxJava 只在当前线程中运行,是单线程的,此时 Observable 用于发射数据流,Observer 用于接收和响应数据流,各种操作符(Operators) 用于加工数据流,它们都在同一个线程中运行,实现出来的是一个同步的函数响应式。然而函数响应式的实际应用大部分操作是在后台处理,前台响应的一个过程。所以需要对刚才的流程做一下修改,改成 Observable 生成发射数据流,Operators 加工数据流在后台线程中运行,Observer 在前台线程中接收并响应数据。此时会涉及使用多线程来操作 RxJava,可以使用 RxJava 的调度器(Scheduler)来实现。

2. Scheduler

Scheduler 是 RxJava 对线程控制器的一个抽象,RxJava 内置了多了 Scheduler 的实现。

Scheduler 作用
single 使用定长为 1 的线程池(new Scheduler Thread Pool(1)),重复利用这个线程
newThread 每次都启用新线程,并在新线程中执行操作
computation 使用固定的线程池(Fixed Scheduler Pool),大小为 CPU 核数,适用于 CPU 密集型计算
io 适合 I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread 差不多,区别在于 io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下,io() 比 newThread() 更有效率
trampoline 直接在当前线程运行,如果当前线程有其他任务正在执行,则会先暂停其他任务
Scheduler.from java.util.concurrent.Executor 转化成一个调度器实例,既可以自定义一个 Executor 来作为调度器

如果内置的 Scheduler 不能满足业务需求,那么可以使用自定义的 Executor 作为调度器,以满足个性化需求。

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("hello");
        e.onNext("world");
    }
}).observeOn(Schedulers.newThread())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
复制代码

这里的 Observable 发射完数据之后,切换到 newThread。后面的两次打印都是在 newThread 中进行的。

2. RxJava 线程模型

RxJava 的被观察者们在使用操作符时可以利用线程调度器——Scheduler 来切换线程。

1. 线程调度器

Scheduler 是一个静态工厂类。Scheduler 是 RxJava 的线程任务调度器,Worker 是线程任务的具体执行者。从 Scheduler 源码中可以看出,Scheduler 在 schedulerDirect()、schedulerPeriodicllyDirect() 方法中创建了 Worker,然后会分别调用 worker 的 scheduler()、schedulerPeriodically() 来执行任务。
Worker 也是一个抽象类,每种 Scheduler 都会对应一种具体的 Worker。

  1. SingleScheduler
    SingleScheduler 是 RxJava 2 新增的 Scheduler。SingleScheduler 中有一个属性叫做 executor,是使用 AtomicReference 包装的 ScheduledExecutorService。在 SingleScheduler 构造函数中,executor 会调用 lazySet()。

原文 

https://juejin.im/post/5d6616a9f265da03bf0f537b

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » RxJava 的线程调度

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址