首先创建一个 observer 和 observable
new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
复制代码
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
}
});
复制代码
先看 Observable 的创建过程, create() 只是把传进去 ObservableOnSubscribe 对象包装了一层返回,这里可以忽略
然后是 ObservableOnSubscribe 类的内部
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
// 1
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 2
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
// 3
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
复制代码
2. 这里new了一个Emitter,也就是我们用来onNext,onError的类,然后把Emitter传给observer中,由于CreateEmitter实现了Disposable接口,所以可以看到Observer里有一个 onSubscribe(Disposable d) ,我们可以用来控制流的结束等操作,实际上这个 Disposable 就是源Obserable创建的,
3. 这里把 Emitter 传给source,也就是传给我们new的 ObservableOnSubscribe 的 subscribe()
ObservableMap ,也是一个 Observable
Observable 都会有一个 subscribeActual() 方法,这个方法在 Observable 调用 subscrible() 之后会被调用,也就是说一个 Observable 在调用 subscrible() 之后,实际上的操作逻辑都是在 subscribeActual() 里面 ObservableMap 的 subscribeActual() 里,调用了 source.subscribe(new MapObserver<T, U>(t, function)); 这句代码, source 是上层的observable,参数为包装过后的 observer 即 MapObserver ,由图三可以看出, MapObserver 内部的 onNext 是往下游传递一个经过 apply() 变化过后的数据,也就达到了我们用 map 变化数据的功能了。