本文基于 RxJava 2.1.2 。根据代码和输出日志会更容易理解。
observeOn
和 subscribeOn
,所有操作在调用 subscribe
的线程执行。 @Test
public void noThread() {
buildObservable().subscribe();
}
上面代码的输出为:
Thread[main] execute Action start emmit Thread[main] execute Operation-1, event: 1 Thread[main] execute Operation-2, event: 1
subscribeOn
不管调用多少次,只以第一次为准。如果只使用了 subscribeOn
、没有使用 observeOn
,则所有操作在第一次调用生成的线程里执行。 @Test
public void subscribeOn() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> observable = buildObservable();
observable
.subscribeOn(scheduler("subscribeOn-1"))
.subscribeOn(scheduler("subscribeOn-2"))
.subscribe(i -> {
showMessageWithThreadName("Action subscribe");
latch.countDown();
});
latch.await();
}
上面代码的输出为:
create scheduler subscribeOn-2 create scheduler subscribeOn-1 Thread[subscribeOn-1] execute Action start emmit Thread[subscribeOn-1] execute Operation-1, event: 1 Thread[subscribeOn-1] execute Operation-2, event: 1 Thread[subscribeOn-1] execute Action subscribe
observeOn
必须跟 subscribeOn
一起使用,单独使用会抛出空引用异常。 observeOn
应在 subscribeOn
的后面调用,否则会出现死锁的情况。 observeOn
操作会更改后续操作的执行线程,直至下一个 observeOn
调用之前的操作或 subscribe
操作。 @Test
public void observeOn() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> observable = buildObservable();
observable
.subscribeOn(scheduler("subscribeOn-1"))
.observeOn(scheduler("observeOn-1"))
.doOnNext(i -> {
showMessageWithThreadName("Operation-3, event: " + i);
})
.observeOn(scheduler("observeOn-2"))
.subscribe(i -> {
showMessageWithThreadName("subscribe " + i);
latch.countDown();
});
latch.await();
}
上面代码的输出为:
create scheduler subscribeOn-1 Thread[subscribeOn-1] execute Action start emmit Thread[subscribeOn-1] execute Operation-1, event: 1 Thread[subscribeOn-1] execute Operation-2, event: 1 create scheduler observeOn-1 Thread[observeOn-1] execute Operation-3, event: 1 create scheduler observeOn-2 Thread[observeOn-2] execute subscribe 1
// 返回用给定线程名 命名的Scheduler
private Scheduler scheduler(String name) {
return Schedulers.from(Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
System.out.println("create scheduler " + name);
Thread t = new Thread(r, name);
return t;
}
}));
}
// 输出当前线程名和给的消息
private void showMessageWithThreadName(String msg) {
Thread t = Thread.currentThread();
System.out.printf("%-10s execute %s/n", "Thread[" + t.getName() + "]", msg);
}
// 构建一个带有两个中间操作的 Observable
private Observable<Integer> buildObservable() {
return Observable.fromPublisher((Subscriber<? super Integer> s) -> {
showMessageWithThreadName("Action start emmit");
// 消息源
s.onNext(1);
s.onComplete();
})
.doOnNext(i -> {
showMessageWithThreadName("Operation-1, event: " + i);
})
.doOnNext(i -> {
showMessageWithThreadName("Operation-2, event: " + i);
});
}