RxJava是ReactiveX在JVM上的一个实现,使用可观察序列来编写异步和基于事件的程序的库。它扩展了观察者模式以支持数据/事件序列,并添加了允许您以声明方式组合序列的运算符,同时抽象出对低级线程,同步,线程安全和并发数据结构等问题的关注。
观察者模式也被称为发布-订阅(Publish/Subscribe)模式,它属于行为型模式的一种。观察者模式定义了一种一对多的依赖关系,一个主题对象可被多个观察者对象同时监听。当这个主题对象状态变化时,会通知所有观察者对象并作出相应处理逻辑。
1.创建抽象被观察者(Subject):
public interface Star {
/**
* 添加粉丝
*/
void addFan(Fan fan);
/**
* 取消粉丝
*/
void removeFan(Fan fan);
/**
* 分享动态
*/
void notifyFan(String message);
}
复制代码
2.创建抽象观察者(Observer)
public interface Fan {
/**
* 更新动态
*/
void update(String message);
}
复制代码
3.创建具体被观察者(Concrete Subject 具体明星)
public class AStar implements Star{
private List<Fan> fanList = null;
public AStar(){
fanList = new ArrayList<Fan>();
}
@Override
public void addFan(Fan fan){
fanList.add(fan);
}
@Override
public void removeFan(Fan fan){
fanList.remove(fan);
}
@Override
public void notifyFan(String message){
for(Fan fan : fanList){
fan.update("AStar 发布了 ** 信息");
}
}
}
复制代码
4.创建具体观察者(Concrere Observer 具体粉丝)
public class AFan implements Fan{
private String fanName;
public AFan(String fanName){
this.fanName = fanName;
}
@Override
public void update(String message){
Log.d("AFan 收到了 AStar 发布的消息");
}
}
复制代码
Observable(被观察者),Observer(观察者),subscribe(订阅)。
Observable 是一个抽象类,实现了ObservableSource抽象接口。
public abstract class Observable<T> implements ObservableSource<T> {
......
}
复制代码
ObservableSource中subscribe()用来订阅观察者,所以ObservableSource相当于抽象被观察者。
public interface ObservableSource<T> {
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(@NonNull Observer<? super T> observer);
}
复制代码
通过ObservableSource的subscribe()方法可知抽象观察者为里面的参数对象Observer。
public interface Observer<T> {
/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(@NonNull Disposable d);
/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(@NonNull T t);
/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(@NonNull Throwable e);
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
}
复制代码
/**
* 创建Observable
*/
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello");
}
});
/**
* Provides an API (via a cold Observable) that bridges the reactive world with the callback-style world.
* @param <T> the element type
* @param source the emitter that is called when an Observer subscribes to the returned {@code Observable}
* @return the new Observable instance
* @see ObservableOnSubscribe
* @see ObservableEmitter
* @see Cancellable
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
复制代码
通过create方法源码可知ObservableCreate为具体被观察者。
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
复制代码
上述实现observer接口的observer为具体观察者。
Rxjava订阅实现
observable.subscribe(observer); 复制代码
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
复制代码