转载

试手 RxJava 2.x 及对线程的初步理解

在进行数据流处理过程中,需要一个高效苗条的流处理组件,比如对输入流能进行分组(窗口),能进行流量控制(Back Pressure - 背压),这也就涉及到响应式编程,流处理框架。这方面如果直接基于 Akka actor 来构建 Akka ActorSystem 也是比较复杂,依赖的组件也不少。还有构筑在 Akka actor 之上的 Akka Streams,再往上的 Flink Streaming,它们都有像滑动,滚动窗口的概念,但是依赖更不得了。一个基本的 Flink Streaming 的项目会依赖到 45 M 以上的第三方组件,如果用它来写一个数据流处理的共享组件,那真是要命。Spring 5 也开始带上了自己的 Reactive-Streams 实现 Spring Reactor, 想要把它从 Spring 中单独抽离出也非易事。

Flink Streaming 组件依赖:org.apache.fling:flink-streaming-java_2.12:1.80, 会依赖于其他诸如 akka-stream, akka-actor, flink-core, flink-clients, scala-library 等非常多的东西

而另一个著名的响应式框架 RxJava 2 就清爽多了,完全没有第三方依赖,要说有也就是定义了四个接口的 reactive-streams(2 KB 大小),就自身那个  rxjava-2.2.9.jar 包只有 2.3 M,这才叫轻量级。因为它设计来是能被应用于 Android 客户端应用的,Andriod 上的 rxandriod-1.2.1.aar 只有 9 K。所以 RxJava 2.x 太适合用来写一些小的共享组件了。

说了那么多,RxJava 是什么?直接网上抄一句定义:RxJava是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。它的并发不一定需要了解它的实现细节,RxJava  也支持像 Flink, Akka Streams 那样的窗口概念。RxJava 2.x 符合了标准的响应式流(Reactive-Streams) 规范,主要应用了观察者模式,编程中围绕着的概念有 Observable, Observer, Subscriber , Subject(既是 Observable 又是 Observer), Flowable, Publisher , Processor (既是 Subscriber 又是 Publisher) 等,其中标暗红的对象是来自于 Reactive-Streams 定义的概念。响应的编程还需了解类似于 Iterator 的 onNext, onError, onComplete 这三个基本事件。还有响应式编程通常会用的 Producer/Consumer, Source/Sink, Publisher/Subscriber,传递的数据元素通常称作 Subject。 

下面用几段基本代码来体验 RxJava 2 来做什么

Hello World

任何语言的 Hello World 代码都毫无意义的,RxJava 2 的也不例外,像下面这行

Observable.just("Hello World").subscribe(System.out::println);

效果和 System.out.println("Hello World") 一样,说明不了什么问题。再进一步观察一个集合

Observable.fromIterable(Arrays.asList("one", "two", "three"))
    .subscribe(s -> System.out.println("Hello " + s));

稍好一些,像是 forEach() 操作

onNext, onError, onComplete

来一个更多一点功能的代码,用 onNext, onError, onComplete 来控制数据

Observable.<String>create(emitter -> {
    emitter.onNext("Monday");
    emitter.onNext("Tuesday");
    // emitter.onError(new RuntimeException("Something wrong")); #1
    emitter.onNext("Wednesday");
    emitter.onComplete();
}).subscribe(
    day -> System.out.println("Day: " + day),  //onNext
    t -> System.err.println(t.getMessage()),   //onError
    () -> System.out.println("Done")           //onComplete
);
 
System.out.println("Program exit");

以上用 Java 8 Lambda 语法书写的 subscribe 中各个 onNext, onError, onComplete 函数参数,

如果没有 #1 行代码为被注释状态时的输出为

Day: Monday  Day: Tuesday  Day: Wednesday  Done  Program exit

如果 #1 代码启用的话,输出为

Day: Monday  Day: Tuesday  Something wrong  Program exit

上面整个代码也是一个同步执行的过程,因为到目前为止还未引用新的线程,所以全部的输出操作都是在主线程上完成的,这就是为什么数据遍历完之后才输出 Program exit

基本线程模型 - observeOn() 和 subscribeOn()

如果只是单线程操作,那要 RxJava 的优势在哪儿,RxJava 支持并发,而且还把并发操作透明化,也就是我们不需要太了解它的线程模型的实现。但作为一个希望通过阅读源代码来更准备理解一个框架的人来说,透明化的东西也要对它的线程模型看个究竟。

首先来看一下以上代码各部分是由什么线程来执行的

Observable.<String>create(emitter -> {
    log("Producer");
    emitter.onNext("Monday");
    emitter.onComplete();
}).subscribe(
    day -> {
        log("Consumer-" + day);
    }
);
 
log("Program exit");

log() 方法输出当前线程名与消息,实现代码为

private static void log(String msg) {
    System.out.println(Thread.currentThread() + ": " + msg);
}

执行后输出为

Thread[main,5,main]: Producer  Thread[main,5,main]: Consumer-Monday  Thread[main,5,main]: Program exit

加上 observeOn(Scheduler) , 指定一个观察者在哪个调度器上观察这个Observable

Observable.<String>create(emitter -> {
    log("Producer");
    emitter.onNext("Monday");
    emitter.onComplete();
}).observeOn(Schedulers.newThread())  //只有消息消费用这个线程(池), 生产者仍然是主线程
    .subscribe(
        day -> {
            Thread.sleep(200);
            log("Consumer-" + day);
        }
    );
 
log("Program exit");
Thread.sleep(1000);

第 5 行加了 .observeOn(Schedulers.newThread()) 让观察者在一个新的线程上去。Schedulers 中出来的各种 Scheduler 这里暂不展开,只要知道会在新的线程上去执行某件事情。

并且消费消息时加了一个延时,最后加上等待 1 秒钟,以确保主线程退出前,消息还被全部消费完成。看下现在输出为

Thread[main,5,main]: Producer  Thread[main,5,main]: Program exit  Thread[RxNewThreadScheduler-1,5,main]: Consumer-Monday

我们看到消息仍然是由主线程放进去的,消息放完后立即退到主线程上去了,消费消息在 observeOn() 指定的线程上。

再试一下 subscribeOn(Scheduler) , 指定Observable自身在哪个调度器上执行

Observable.<String>create(emitter -> {
    log("Producer");
    emitter.onNext("Monday");
    emitter.onComplete();
}).subscribeOn(Schedulers.newThread())   //消息生产与消费都会有这个线程(池)
    .subscribe(
        day -> {
            Thread.sleep(200);
            log("Consumer-" + day);
        }
    );
 
log("Program exit");
Thread.sleep(1000);

直接看输出来理解

Thread[main,5,main]: Program exit  Thread[RxNewThreadScheduler-1,5,main]: Producer  Thread[RxNewThreadScheduler-1,5,main]: Consumer-Monday

我们看到只用 subscribeOn(Scheduler) 时,消息的产生与消费都在它指定的线程池上。

observerOn() 和 subscribeOn() 双管齐下

Observable.<String>create(emitter -> {
    log("Producer");
    emitter.onNext("Monday");
    emitter.onComplete();
}).subscribeOn(Schedulers.newThread()) //消息生产者用这个线程(池)
    .observeOn(Schedulers.io())        //消息消费者用这个线程(池)  
    .subscribe(
        day -> {
            Thread.sleep(200);
            log("Consumer-" + day);
        }
    );
 
log("Program exit");
Thread.sleep(1000);

记住前面用 Schedulers.newThread() 的线程名称类似 RxNewThreadScheduler-1,5,main , 现在来看新的输出

Thread[main,5,main]: Program exit  Thread[RxNewThreadScheduler-1,5,main]: Producer  Thread[RxCachedThreadScheduler-1,5,main]: Consumer-Monday

新的线程名 RxCachedThreadScheduler-1,5,main 就是 Schedulers.io() 所对应的,所以由上面的输出表明

  1. subscribeOn(Scheduler) 指定消息生产者用的线程(池)
  2. observeOn(Scheduler) 指定消息消费者用的线程(池)

subscribeOn() 只有第一次调用有效,因为 Observable 就一个,observeOn() 可以调用多次,每次调用都会影响到后续的 map, filter, subscribe 等操作。

不同的Schduler 类型

Schedulers 中的工厂方法可以创建出不同类型的线程池,简单说明如下

  1. single():  单线程线程池,相当于 Executors.newSingleThreadExecutor(), 但多次调用  Schedulers.single() 总是同一个线程, observable.observeOn(Schedules.single()).map(#1).observeOn(Schedules.single()).map(#2), #1 和 #2 用相同的线程
  2. newThread(): 创建一个新的线程, 与 single()  的区别是每次不同的线程。observable.observeOn(Schedules.newThread()).map(#1).observeOn(Schedules.newThread()).map(#2), #1 和#2 会用两个不同的线程
  3. computation(): 以 CPU 内核数为固定大小的线程池,适于 CPU 密集型计算
  4. io():  相当于 Executors.newCachedThreadPool() 创建的线程池,适于大量 I/O 等待的操作
  5. trampoline(): 在当前线程上运行,好像是默认行为,很多时候好像是可以省略的
  6. from(Executor): 指定自定的线程池,如 Schedulers.from(Executor.newFixedThreadPool(10)), 有个好处是容易共享线程池和不用总是看到 RxXxx 那样的线程名

RxJava vs CompletableFuture

RxJava 在 Java 7 之前就有了,所以它不依赖于 JDK 7 的 ForkJoinPool,解决了 JDK 8 的 CompletableFuture 之前的任务间的依赖问题。可以大概对比一下 RxJava 与  CompletableFuture 两种写法,不要太纠缠于细节,以下代码不是完全对等

RxJava 代码

Observable.just(request1, request2)
    .observeOn(Schedulers.io())
    .map(Request::get)
    .subscribe(System.out::println);

CompletableFuture 代码

ExecutorService threadPool = Executors.newFixedThreadPool(10);
Stream.of(request1, request2)
    .map(request -> CompletableFuture.supplyAsync(request::get, threadPool))
    .forEach(completableFuture ->
    log(completableFuture.join())
);

以上两段代码并非用以说明哪种写法的好坏,只是纯粹的提供两种代码风格

一个更实用的生产消费的代码

前方的代码是加入消息消费前所有消息都准备好了,这对处理一个已经列表有用,更多的时候是先设置好消费者后,消息的产生是连续不断的。

PublishProcessor<String> restProcessor = PublishProcessor.create();
Observable.fromPublisher(restProcessor).subscribe(System.out::println);
 
restProcessor.offer("Monday");
restProcessor.offer("Tuesday");
restProcessor.offer("Wednesday");
restProcessor.onComplete();

本文的内容比较杂,其余更多的话题有 RxJava 的背压和窗口的支持等。

原文  https://yanbin.blog/rxjava-get-started/
正文到此结束
Loading...