本文作为《Java编程方法论:响应式Rxjava与代码设计实战》一书第二章 Rxjava中的Subject一节的补充解读。
首先来看一个Demo:
@Test
void replay_PublishSubject_test() {
PublishSubject<Object> publishSubject = PublishSubject.create();
ConnectableObservable<Object> replay = publishSubject.replay();
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
List<Integer> integers = new ArrayList<>();
for (int i=1;i<10;i++){
integers.add(i);
}
Disposable subscribe1 = replay.subscribe(x -> {
log("一郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
Disposable subscribe2 = replay.subscribe(x -> {
log("二郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
Disposable subscribe3 = replay.subscribe(x -> {
log("三郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
AtomicInteger atomicInteger = new AtomicInteger(integers.size());
try {
forkJoinPool.submit(() -> {
integers.forEach(id -> {
sleep(1,TimeUnit.SECONDS);
publishSubject.onNext(id);
if (atomicInteger.decrementAndGet() == 0) {
publishSubject.onComplete();
}
});
});
replay.connect();
sleep(2,TimeUnit.SECONDS);
subscribe1.dispose();
sleep(1,TimeUnit.SECONDS);
//replay.connect(consumer -> consumer.dispose());
publishSubject.onComplete();
System.out.println("test");
} finally {
try {
forkJoinPool.shutdown();
int shutdownDelaySec = 2;
System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
} finally {
System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
List<Runnable> l = forkJoinPool.shutdownNow();
System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
}
}
}
复制代码
得到的结果如下所示:
ForkJoinPool.commonPool-worker-3: 一郎神: 1 ForkJoinPool.commonPool-worker-3: 二郎神: 1 ForkJoinPool.commonPool-worker-3: 三郎神: 1 ForkJoinPool.commonPool-worker-3: 二郎神: 2 ForkJoinPool.commonPool-worker-3: 三郎神: 2 Emission completed Emission completed test ………………等待 2 秒后结束服务……………… 调用 forkJoinPool.shutdownNow()结束服务... 还剩 0 个任务等待被执行,服务已关闭 复制代码
在调用 subscribe1.dispose() 的时候,完成了订阅者自行解除订阅关系的约定,而假如后面调用的是 replay.connect(consumer -> consumer.dispose()) ,依然会在发送元素的过程中强行中断,不带任何通知。而在使用 publishSubject.onComplete() 后,则可以很优雅地通知后续订阅者优雅地结束。 如图 2-3 所示,我们按照图中文字操作,并在 System.out.println("test") 这行打断点查看状态,发现其他2个订阅者并没有被移除,为什么会出现这种情况?
通过 publishSubject.replay() ,我们得到了一个 ConnectableObservable 对象,具体如下:
//io.reactivex.Observable#replay
public final ConnectableObservable<T> replay() {
return ObservableReplay.createFrom(this);
}
复制代码
结合前面 ConnectableObservable 相关知识的学习,在调用 replay.subscribe(...) 时,会将下游的订阅者与 DEFAULT_UNBOUNDED_FACTORY 所得到的 UnboundedReplayBuffer 对象通过一个 ReplayObserver 对象建立起联系:
//ObservableReplay#createFrom
public static <T> ConnectableObservable<T> createFrom(ObservableSource<? extends T> source) {
return create(source, DEFAULT_UNBOUNDED_FACTORY);
}
//ObservableReplay#create
static <T> ConnectableObservable<T> create(ObservableSource<T> source,
final BufferSupplier<T> bufferFactory) {
// the current connection to source needs to be shared between the operator and its onSubscribe call
final AtomicReference<ReplayObserver<T>> curr = new AtomicReference<ReplayObserver<T>>();
//注意此处
ObservableSource<T> onSubscribe = new ReplaySource<T>(curr, bufferFactory);
//此处这个curr会作为ObservableReplay下current字段的值,记住,它是个引用类型对象
return RxJavaPlugins.onAssembly(new ObservableReplay<T>(onSubscribe, source, curr, bufferFactory));
}
//ObservableReplay#subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
onSubscribe.subscribe(observer);
}
//ObservableReplay.ReplaySource#subscribe
public void subscribe(Observer<? super T> child) {
for (;;) {
ReplayObserver<T> r = curr.get();
if (r == null) {
ReplayBuffer<T> buf = bufferFactory.call();
ReplayObserver<T> u = new ReplayObserver<T>(buf);
//此时ObservableReplay中current字段的值所指对象也会发生改变
if (!curr.compareAndSet(null, u)) {
continue;
}
r = u;
}
InnerDisposable<T> inner = new InnerDisposable<T>(r, child);
child.onSubscribe(inner);
//通过ReplayObserver的observers字段将下游订阅者管理起来
r.add(inner);
if (inner.isDisposed()) {
r.remove(inner);
return;
}
//此处UnboundedReplayBuffer对象与下游订阅者建立联系
r.buffer.replay(inner);
break;
}
}
}
复制代码
当调用 replay.connect(consumer -> consumer.dispose()) 时,通过 current 获取上面得到的 ReplayObserver 对象,并调用该对象的 dispose() 方法(由 replay.connect(...) 中传入的 Consumer 实现可得),此时会将 ObservableReplay 中的 observers 字段设定为 TERMINATED ,同时将 ObservableReplay 自身身为 AtomicReference 角色所存储值设定为 DISPOSED ,即将 ObservableReplay 中 current 的值设定为了 DISPOSED 。
//ObservableReplay#connect
public void connect(Consumer<? super Disposable> connection) {
boolean doConnect;
ReplayObserver<T> ps;
for (;;) {
ps = current.get();
if (ps == null || ps.isDisposed()) {
ReplayBuffer<T> buf = bufferFactory.call();
ReplayObserver<T> u = new ReplayObserver<T>(buf);
if (!current.compareAndSet(ps, u)) {
continue;
}
ps = u;
}
doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true);
break;
}
try {
connection.accept(ps);
} catch (Throwable ex) {
if (doConnect) {
ps.shouldConnect.compareAndSet(true, false);
}
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
if (doConnect) {
source.subscribe(ps);
}
}
//ObservableReplay.ReplayObserver#dispose
public void dispose() {
observers.set(TERMINATED);
DisposableHelper.dispose(this);
}
//DisposableHelper#dispose
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
复制代码
可以看到, ReplayObserver 只是解除了与下游订阅者的关系,但并没有进一步对下游订阅者进行结束的操作,这样与 UnboundedReplayBuffer 对象建立联系的订阅者,如果buffer中的元素还未消费完毕,会持续消费直至所存元素下发完毕,但要注意的是,该buffer中并未存放结束事件(即通过调用 UnboundedReplayBuffer#complete 往该队列中存放 NotificationLite.complete() 元素)。同时下游订阅者也并未调用 dispose() 方法,所以下面所示源码中的 output.isDisposed() 结果为 false 。请注意下面所示源码中 <1> 处的代码:
public void replay(InnerDisposable<T> output) {
if (output.getAndIncrement() != 0) {
return;
}
final Observer<? super T> child = output.child;
int missed = 1;
for (;;) {
if (output.isDisposed()) {
return;
}
int sourceIndex = size;
Integer destinationIndexObject = output.index();
int destinationIndex = destinationIndexObject != null ? destinationIndexObject : 0;
while (destinationIndex < sourceIndex) {
Object o = get(destinationIndex);
//此处很关键
if (NotificationLite.accept(o, child)) {//<1>
return;
}
if (output.isDisposed()) {
return;
}
destinationIndex++;
}
output.index = destinationIndex;
missed = output.addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}
//io.reactivex.internal.util.NotificationLite#accept
public static <T> boolean accept(Object o, Observer<? super T> s) {
if (o == COMPLETE) {
s.onComplete();
return true;
} else
if (o instanceof ErrorNotification) {
s.onError(((ErrorNotification)o).e);
return true;
}
s.onNext((T)o);
return false;
}
复制代码
如果调用了 UnboundedReplayBuffer#complete ,那么在元素下发到最后时,就会出现 o == COMPLETE 为 true ,此时会调用下游订阅者的 onComplete() 方法。
//ObservableReplay.UnboundedReplayBuffer#complete
public void onComplete() {
if (!done) {
done = true;
buffer.complete();
replayFinal();
}
}
//ObservableReplay.UnboundedReplayBuffer#complete
public void complete() {
add(NotificationLite.complete());
size++;
}
//io.reactivex.internal.util.NotificationLite#complete
public static Object complete() {
return COMPLETE;
}
复制代码
至此,关于 replay_PublishSubject_test() 示例中所展现的疑点已经解读完毕。