有可能很多人会问, LiveData
和 Rxjava
的区别是什么? 为何 Google 要在 Rxjava
很成熟的时候开发 LiveData
? 我想, LiveData
可以作为更好的 rxlifecycle
来使用。在使用 Rxjava
做数据流管理时,一个比较头疼的问题是,当数据回来时, Activty/Fragment
可能已经处于 onStop
的状态了,这个时候是不适合刷新 UI 的,很有可能触发 crash。 因此有人开发了 rxlifecycle
来解决这些问题,但是使用者必须继承于 RxActivity/RxFragment
或者实现接口 LifecycleProvider
。而 LiveData
则是官方给出的 lifecycle 友好型数据管理者。 它可以在 Rxjava 数据流 和 UI 刷新之间建立完美的沟通桥梁。当然 LiveData
还可以和 Room
配合使用,这个之后再说。
LiveData
本质也是一个观察者模式的应用,通过 setValue/postValue
来驱动 Observer
做出改变。 而 LiveData
又作为 lifecycle
的观察者,根据 lifecycle
的改变而表现出不同的行为。其核心方法就是 observe
了。
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
if (owner.getLifecycle().getCurrentState() == DESTROYED) {
// ignore
return;
}
// LifecycleBoundObserver 作为 lifecycle 的观察者。
LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
// 加入 map 中
ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
if (existing != null && !existing.isAttachedTo(owner)) {
throw new IllegalArgumentException("Cannot add the same observer"
+ " with different lifecycles");
}
if (existing != null) {
return;
}
owner.getLifecycle().addObserver(wrapper);
}
这里重点关注 LifecycleBoundObserver
, 它实现了 GenericLifecycleObserver
接口,因此在 lifecycle 改变时,会执行到 onStateChanged
方法
class LifecycleBoundObserver extends ObserverWrapper implements GenericLifecycleObserver {
@NonNull final LifecycleOwner mOwner;
LifecycleBoundObserver(@NonNull LifecycleOwner owner, Observer<T> observer) {
super(observer);
mOwner = owner;
}
@Override
boolean shouldBeActive() {
return mOwner.getLifecycle().getCurrentState().isAtLeast(STARTED);
}
@Override
public void onStateChanged(LifecycleOwner source, Lifecycle.Event event) {
// 如果 Activity/Fragment/... 已经处于 DESTROYED 状态,则移除 observer。这里的移除是从 lifecycle 里移除 LifecycleBoundObserver,以及从 LiveData 里移除 mObserver
if (mOwner.getLifecycle().getCurrentState() == DESTROYED) {
removeObserver(mObserver);
return;
}
// 通知 activeStateChanged
activeStateChanged(shouldBeActive());
}
@Override
boolean isAttachedTo(LifecycleOwner owner) {
return mOwner == owner;
}
@Override
void detachObserver() {
mOwner.getLifecycle().removeObserver(this);
}
}
activeStateChanged
实现为:
void activeStateChanged(boolean newActive) {
if (newActive == mActive) {
return;
}
// immediately set active state, so we'd never dispatch anything to inactive
// owner
mActive = newActive;
boolean wasInactive = LiveData.this.mActiveCount == 0;
LiveData.this.mActiveCount += mActive ? 1 : -1;
// 如果 active 的 Observer 数量从 0 变为 1,则执行钩子函数 onActive
if (wasInactive && mActive) {
onActive();
}
// 如果 active 的 Observer 数量从 1 变为 0,则执行钩子函数 onInactive
if (LiveData.this.mActiveCount == 0 && !mActive) {
onInactive();
}
if (mActive) {
// 处于 active 状态, 则触发一次数据更新, 传参表示只处理这个 Observer 的派发
dispatchingValue(this);
}
}
我们可以通过 setValue
与 postValue
来更新数据, setValue
在主线程调用, postValue
在子线程调用,最终都会通过 dispatchingValue
来通知 Observer 更新数据。
private void dispatchingValue(@Nullable ObserverWrapper initiator) {
if (mDispatchingValue) {
// 重入问题,设置标志位,给上层处理
mDispatchInvalidated = true;
return;
}
mDispatchingValue = true;
do {
mDispatchInvalidated = false;
if (initiator != null) {
// 如果 initiator 不为 null, 则标示只更新特定的 observer.
considerNotify(initiator);
initiator = null;
} else {
for (Iterator<Map.Entry<Observer<T>, ObserverWrapper>> iterator =
mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
considerNotify(iterator.next().getValue());
if (mDispatchInvalidated) {
break;
}
}
}
} while (mDispatchInvalidated);
mDispatchingValue = false;
}
经过上一篇文章,见到 mDispatchingValue
和 mDispatchInvalidated
我们就应该想到这是解决重入问题的方式。 这个方法会为每一个 observer 调用 considerNotify
方法:
private void considerNotify(ObserverWrapper observer) {
// 如果 Activity/Fragment 已经不是可见状态时,就直接返回,不更新 UI。这样很多因为更新 UI 时机不对的问题就不会发生了。
if (!observer.mActive) {
return;
}
if (!observer.shouldBeActive()) {
observer.activeStateChanged(false);
return;
}
// 每次 setValue 和 postValue 都会更新版本号,每个 observer 也记录上次更新的版本号,这样通过对比,可以避免布标要的更新
if (observer.mLastVersion >= mVersion) {
return;
}
observer.mLastVersion = mVersion;
//通知 observer 更新数据或 UI
observer.mObserver.onChanged((T) mData);
}
LiveData
的主要逻辑就是这些, 它也是学习运用 lifecycle
的一个很好的例子。除此之外, LiveData
提供了一些辅助类,帮助我们更好的开发。
MediatorLiveData
的作用和 Rxjava
中 merge
操作符有点像。 都是合并几个流到一个流中。使用方式如下:
val liveData1: LiveData<Integer> = ...;
val liveData2: LiveData<Integer> = ...;
val liveDataMerger = new MediatorLiveData<>();
liveDataMerger.addSource(liveData1){value -> liveDataMerger.setValue(value)};
liveDataMerger.addSource(liveData2){value -> liveDataMerger.setValue(value)};
liveDataMerger.observe(lifecycleOwner){
// onChange
}
简单看下它的源码:
public <S> void addSource(@NonNull LiveData<S> source, @NonNull Observer<S> onChanged) {
// Source 是将 onChanged 包裹了一下,决定何时开始 observe 与何时取消 observe。
// 因为 source 处于数据上游,在没有下游的情况下,是没必要触发 onChanged 的
Source<S> e = new Source<>(source, onChanged);
Source<?> existing = mSources.putIfAbsent(source, e);
if (existing != null && existing.mObserver != onChanged) {
throw new IllegalArgumentException(
"This source was already added with the different observer");
}
if (existing != null) {
return;
}
// 在有 active 的 Observer之后,才开始监听 source 的 changed。
// 有了下游,上游的数据才有往下传递的必要。
if (hasActiveObservers()) {
e.plug();
}
}
// removeSource 时,取消上游的订阅
public <S> void removeSource(@NonNull LiveData<S> toRemote) {
Source<?> source = mSources.remove(toRemote);
if (source != null) {
source.unplug();
}
}
@CallSuper
@Override
protected void onActive() {
// onActive 就代表有下游产生,这个时候才连接上游 source
for (Map.Entry<LiveData<?>, Source<?>> source : mSources) {
source.getValue().plug();
}
}
@CallSuper
@Override
protected void onInactive() {
// onInactive 就代表没有下游了,这个时候就不需要接收上游的数据改动了。
for (Map.Entry<LiveData<?>, Source<?>> source : mSources) {
source.getValue().unplug();
}
}
ComputableLiveData
是一个还没有对外开放的类,不过了解它还是有必要的,因为在 Room
里就会用到这个类。
ComputableLiveData
是个抽象类,需要子类实现 compute
方法。 而 compute
方法会在什么时机被执行呢?
ComputableLiveData
的 onActive
被调用时,即有观察者出现时。 invalidate()
方法时。
invalidate()
会触发 mInvalidationRunnable
在主线程里执行:
final Runnable mInvalidationRunnable = new Runnable() {
@MainThread
@Override
public void run() {
boolean isActive = mLiveData.hasActiveObservers();
// mInvalid 会被多线程访问与赋值,因此做了原子性访问控制
// 只有从 false 变为 true, 才需要触发 refresh
// 如果原本就是 true,代表原本处于 mInvalid 状态,要么是已经在 compute, 要么就是还没有 observer.
if (mInvalid.compareAndSet(false, true)) {
if (isActive) {
mExecutor.execute(mRefreshRunnable);
}
}
}
};
具体触发 compute 和 数据更新的操作是在 mRefreshRunnable
中实现的。 代码虽然不多,但其实挺不好读的,因为涉及到了多线程访问, 多次 invalidate 会触发多次 refresh, 如果处理不当,可能会出现数据乱序更新,这里强烈推荐 Advanced RxJava
的前两篇文章,对于并发下的思维模式有很大帮助。
final Runnable mRefreshRunnable = new Runnable() {
@WorkerThread
@Override
public void run() {
boolean computed;
do {
computed = false;
if (mComputing.compareAndSet(false, true)) { // (1)
try {
T value = null;
while (mInvalid.compareAndSet(true, false)) { // (2)
computed = true; // (3)
value = compute();
}
if (computed) { // (4)
mLiveData.postValue(value);
}
} finally {
mComputing.set(false); // (5)
}
}
} while (computed && mInvalid.get()); // (6)
}
};
compute()
LiveData
还默认提供了两个工具方法: map
与 switchMap
。 比较简单,但是很有用。可以算作是 MediatorLiveData 的应用吧
// 数据转换, 从 X 转换为 Y
public static <X, Y> LiveData<Y> map(@NonNull LiveData<X> source, @NonNull final Function<X, Y> func) {
final MediatorLiveData<Y> result = new MediatorLiveData<>();
result.addSource(source, new Observer<X>() {
@Override
public void onChanged(@Nullable X x) {
result.setValue(func.apply(x));
}
});
return result;
}
// 根据 trigger 和 func 生成新的 LiveData<Y>, 下游将监听 LiveData<Y>。类似于 rxjava 的 flatMap。
public static <X, Y> LiveData<Y> switchMap(@NonNull LiveData<X> trigger, @NonNull final Function<X, LiveData<Y>> func) {
final MediatorLiveData<Y> result = new MediatorLiveData<>();
result.addSource(trigger, new Observer<X>() {
LiveData<Y> mSource;
@Override
public void onChanged(@Nullable X x) {
LiveData<Y> newLiveData = func.apply(x);
if (mSource == newLiveData) {
return;
}
if (mSource != null) {
result.removeSource(mSource);
}
mSource = newLiveData;
if (mSource != null) {
result.addSource(mSource, new Observer<Y>() {
@Override
public void onChanged(@Nullable Y y) {
result.setValue(y);
}
});
}
}
});
return result;
}
LiveData
源码总体比较简单,使用也比较容易,但功能确实实用。