转载

rxjava2——线程切换

简介

对rxjava 有一个简单的学习之后,笔者还是很难 理解rxjava 在服务端的使用,感觉学习了hystrix 之后,这块的理解会更深刻一些。

首先对于同步调用,rxjava的作用有限,而对于异步调用,对于类似于netty这种 方法直接返回future的,rxjava也套不上。其所谓异步调用,通常是另起 线程执行一个同步调用(从驱动线程的角度看,这就是一个异步调用了),由此成为一个多线程代码,解决多线程环境下的 数据流控制问题。

观察者模式的推和拉

推模型

class Subject {
	private List<Observer> observers = new ArrayList<Observer>();
	public void notify(){
		for(Observer observer : observers){
			observer.update(data);	// data 是Subject 全部或部分信息
		}
	}
}

拉模型

class Subject {
	private List<Observer> observers = new ArrayList<Observer>();
	public void notify(){
		for(Observer observer : observers){
			observer.update(this); // observer 看情况通过 Subject 引用获取数据
		}
	}
}

线程切换

线程控制绝对是RxJava的重点之一。在不指定线程的情况下,RxJava遵循的是线程不变的原则,在哪个线程调用subscribe(),就在哪个线程生产、消费事件。

线程控制的 本质 还是 将 当前 Observable 转换为 另一个Observable,具体的说是转换Observable的onSubscribe 方法,跟filter 等普通的数据转换一样一样的。明面上是线程切换,其实是函数 包装。

public Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(this, scheduler);
}
public Observable<T> subscribeOn(Scheduler scheduler) {
    return subscribeOn(this, scheduler);
}
public Observable<T> filter(Func1<T, Boolean> predicate) {
    return filter(this, predicate);
}

谜之RxJava (三)update 2 —— subscribeOn 和 observeOn 的区别

笔者最早找到 支持observeOn 的版本 0.10.0

从0.10.0 可以看到,无论是observeOn 还是subscribeOn,参数都是Scheduler,都会导致 代码切换到 另一个线程(由Scheduler 实现类决定)执行。只是observeOn 只是 表示 其之后的操作,由observeOn 指定的Scheduler执行。subscribeOn 则是 之前及之后的操作 都由subscribeOn 指定的Scheduler 执行,直到遇到observeOn。

subscribeOn

Func1<Observer<T>, Subscription> 叫 onSubscribe,Subscribe 是 Subscribe ,别弄混onSubscribe和Subscribe。

public static <T> Func1<Observer<T>, Subscription> subscribeOn(Observable<T> source, Scheduler scheduler) {
    return new SubscribeOn<T>(source, scheduler);
}

private static class SubscribeOn<T> implements Func1<Observer<T>, Subscription> {
    private final Observable<T> source;
    private final Scheduler scheduler;

    public SubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    public Subscription call(final Observer<T> observer) {
        return scheduler.schedule(new Func0<Subscription>() {
            @Override
            public Subscription call() {
                return new ScheduledSubscription(source.subscribe(observer), scheduler);
            }
        });
    }
}

Observable.subscribeOn 的逻辑链条,根据 当前Observable 和 scheduler 创建一个新的 Func1<Observer<T>, Subscription> onSubscribe (学名叫subscribeOn )并基于此创建新的 Observable。 转换 onSubscribe 过程涉及到 几个Subscription 的转换

  1. 当前 Observable.subscribe(observer) 返回 Subscribe
  2. 将 Subscribe 封装为 ScheduledSubscription
  3. 将 ScheduledSubscription 封装为 SafeObservableSubscription

以NewThreadScheduler 为例

Observable.filter()				
			.map1()		
			.subscribeOn(NewThreadScheduler)
			.map2()
			.subscribe(xx)

以filter操作为例

// class Observable
public Observable<T> filter(Func1<T, Boolean> predicate) {
    return filter(this, predicate);
}
public static <T> Observable<T> filter(Observable<T> that, Func1<T, Boolean> predicate) {
    return create(OperationFilter.filter(that, predicate));
}
// class OperationFilter
public static <T> Func1<Observer<T>, Subscription> filter(Observable<T> that, Func1<T, Boolean> predicate) {
    return new Filter<T>(that, predicate);
}
  	private static class Filter<T> implements Func1<Observer<T>, Subscription> {
    private final Observable<T> that;
    private final Func1<T, Boolean> predicate;
    public Filter(Observable<T> that, Func1<T, Boolean> predicate) {
        this.that = that;
        this.predicate = predicate;
    }
    public Subscription call(final Observer<T> observer) {
        ...
     	that.subscribe(new Observer<T>() {
            public void onNext(T value) {
                try {
                    if (predicate.call(value)) {
                        observer.onNext(value);
                    }
                } catch (Throwable ex) {
                    observer.onError(ex);
                    ...
                }
            }
            public void onError(Throwable ex) {
                observer.onError(ex);
            }
            public void onCompleted() {
                observer.onCompleted();
            }
        });
        ...
    }
}
  1. filter 时的 Observable 和 最后 subscribe 当时的 Observable 已经不是同一个了。filter 时的observer 是 new 出来的,跟最后subscribe 方法参数的 observer 也不是同一个。

    动作 源Observable 对应observer
    filter Observable observer3.onNext
    map1 Observable1 observer2.onNext
    subscribeOn Observable2 observer1.onNext 只是异步驱动了一下
    map2 Observable3 observer1.onNext
    subscribe Observable4 observer.onNext

    rxjava 通过封装,只将原始的Observable 和 observer 暴露给了用户。

  2. 下一个Observable 简介持有 上一个 Observable 的引用
  3. 最新的Observable4.subscribe 驱动整个逻辑 开始 执行,具体的说 是驱动 其对应的 Func1<Observer<T>, Subscription> 的执行。
  4. Observable4.subscribe 实现是 Observable4. onSubscribe.call ,方法执行链条为

    Observable4.subscribe ==>  
     Observable4.onSubscribe.call ==> 	
     Observable3.subscribe ==> 
     Observable3.subscribeOn.call ==> 驱动线程执行完毕,切换thread 
     Observable2.subscribe ==> 
     Observable2.onSubscribe.call ==> 
     Observable1.subscribe ==> 
     Observable1.onSubscribe.call ==> 
     observer3.onNext ==> 
     filter ==> 
     observer2.onNext ==> 
     ...
     observer.onNext

对于这个方法执行链

RxJava for 100% beginners (part3-switching threads) subscribeOn() change the thread for emitting the source Observable’s elements, no matter where you put it in your “chain”.

用一张图解释RxJava中的线程控制 则将这个方法链分为两个阶段

  1. 驱动阶段,从下游到上游,反向驱动
  2. 事件发射阶段。第一个Observable开始产生事件,然后事件流就开始正向传递

这也就解答了笔者的一个疑惑, 为什么subscribeOn 放在任何位置 对“副作用函数” 都有效? 因为线程的切换 在事件驱动阶段,而副作用函数的执行 在事件发射阶段。

observeOn

以下列代码为例

Observable.filter()				
			.map1()		
			.observerOn(NewThreadScheduler)
			.map2()
			.subscribe(xx)

分析下 ObserveOn 源码

// OperationObserveOn
   	public static <T> Func1<Observer<T>, Subscription> observeOn(Observable<T> source, Scheduler scheduler) {
    return new ObserveOn<T>(source, scheduler);
}

private static class ObserveOn<T> implements Func1<Observer<T>, Subscription> {
    private final Observable<T> source;
    private final Scheduler scheduler;

    public ObserveOn(Observable<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    public Subscription call(final Observer<T> observer) {
        if (scheduler instanceof ImmediateScheduler) {
            // do nothing if we request ImmediateScheduler so we don't invoke overhead
            return source.subscribe(observer);
        } else {
            return source.subscribe(new ScheduledObserver<T>(observer, scheduler));
        }
    }
}

分析Observable 与 observer 的 变换

动作 源Observable 对应observer
filter Observable observer4.onNext
map1 Observable1 observer3.onNext
subscribeOn Observable2 observer2.onNext 变成了ScheduledObserver
map2 Observable3 observer1.onNext
subscribe Observable4 observer.onNext

分析方法执行链

Observable4.subscribe ==>  
Observable4.onSubscribe.call ==> 	
Observable3.subscribe ==> 
Observable3.subscribeOn.call ==> 
Observable2.subscribe ==> 
Observable2.onSubscribe.call ==> 
Observable1.subscribe ==> 
Observable1.onSubscribe.call ==> 
observer4.onNext ==> 
filter ==> 
observer3.onNext ==> 
map ==>
observer2.onNext ==> // 提交事件,驱动线程执行完毕,另一个线程执行下面的逻辑(接收事件并驱动后续执行)

...
observer.onNext

小结

形式上顺序执行filter、map 等,从上到下,实际上是subscribe 才真正触发执行,但最后还是按照filter、map 的顺序 执行业务逻辑——代码腾挪的艺术。

突然奇想对照下 builder 模式,示例代码可以类比为

Observable.setFilter(filterFunction)	
			.setMap1(map1Function)		
			.subscribeOn(NewThreadScheduler)
			.setMap2(map2Function)
			.setObserver(observer)
			.build()

类似于函数式编程,返回函数 或者 函数接口的,一定要小心, 代码写在哪里 跟 代码什么时候执行 没啥关系 , 经常违反直觉。

原文  http://qiankunli.github.io/2018/07/31/rxjava2.html
正文到此结束
Loading...