继续分析 Local Cache ,本次的文章会讲述 Segment 的结构,缓存读写及失效逻辑。
上一篇说过,LocalCache 本质上就是一个 Map ,Segment 组成的数组就是 LocalCache 的存储结果。这个和 ConcurrentHashMap 是比较类似的。下面分析一下 Segment ,主要从加载和失效两个模块考虑。
// LocalCache.java:3952
V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
// 先调用reHash方法,扰动一下
int hash = hash(checkNotNull(key));
// 通过Hash值定位到segment,然后调用get方法
return segmentFor(hash).get(key, hash, loader);
}
复制代码
// Segment 加载
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
checkNotNull(key);
checkNotNull(loader);
try {
if (count != 0) { // read-volatile
// don't call getLiveEntry, which would ignore loading values
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e != null) {
long now = map.ticker.read();
// 检查是否过期、被回收、或正在加载,则返回null
V value = getLiveValue(e, now);
if (value != null) {
// 记录访问信息,写入最近常用队列
recordRead(e, now);
statsCounter.recordHits(1);
// 如果命中,则考虑是否需要刷新,取得新值。Guava 自动刷新机制。
// 如果考虑二级缓存时,本地缓存可以用来做一级缓存,Redis做二级缓存。
// 一级缓存的失效时间可以更短一点,二级缓存失效时,Guava可以临时返回已给Null,并通知Redis重新加载,这样有效防止缓存雪崩和缓存穿透
return scheduleRefresh(e, key, hash, value, now, loader);
}
// 如果是正在加载,则阻塞等待加载完成
ValueReference<K, V> valueReference = e.getValueReference();
if (valueReference.isLoading()) {
return waitForLoadingValue(e, key, valueReference);
}
}
}
// 以上为无锁访问,如果没有结果,则在下面加锁加载
// at this point e is either null or expired;
return lockedGetOrLoad(key, hash, loader); // 从此处进入
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof Error) {
throw new ExecutionError((Error) cause);
} else if (cause instanceof RuntimeException) {
throw new UncheckedExecutionException(cause);
}
throw ee;
} finally {
// 读取后尝试执行清理
postReadCleanup();
}
}
复制代码
上述一段代码,先会进行无锁访问,假如找到数据,就直接返回,没有在开始加锁加载数据。
和我们直觉把整个加载过程上锁不一样,这里的加载过程是这样的:
这样的加载过程可以有更好的并发,因为第一个 lock 锁的是 Segment ,粒度是很大的,用来加载过程会大大影响性能。Cache 的方法是,先构建 ReferenceEntry 对象,然后对 ReferenceEntry 上锁(一个 key 一个 ReferenceEntry,粒度极小),再进行加载。那么锁 Segment 的操作是为了构建 ReferenceEntry 对象,并设置一个 Loading 的中间状态,这样可以保证其他过来 get 同一个 key 的线程不会重复加载数据。
关键字:
这里的技术类似:先发布对象引用占位,然后再等加载逐渐完成。
// Segment 加载
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
ReferenceEntry<K, V> e;
ValueReference<K, V> valueReference = null;
LoadingValueReference<K, V> loadingValueReference = null;
// 此处会上锁并进行加载,但加载过程并不需要上锁,上锁只是判断是否决定上锁
// ,决定后,把对象设置一个loading状态,然后释放锁去加载数据
boolean createNewEntry = true;
lock();
try {
// re-read ticker once inside the lock
long now = map.ticker.read();
preWriteCleanup(now);
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
// 通过以上的 hash 值,选定一个槽位
ReferenceEntry<K, V> first = table.get(index);
for (e = first; e != null; e = e.getNext()) {
// 遍历这个槽位的链表
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
valueReference = e.getValueReference();
if (valueReference.isLoading()) {
// 预防在并发过程中,有其他线程先执行了加载操作,并处于 loading 过程
createNewEntry = false;
} else {
V value = valueReference.get();
if (value == null) {
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
} else if (map.isExpired(e, now)) {
// This is a duplicate check, as preWriteCleanup already purged expired
// entries, but let's accommodate an incorrect expiration queue.
// 二次忍忍
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
} else {
recordLockedRead(e, now);
statsCounter.recordHits(1);
// we were concurrent with loading; don't consider refresh
return value;
}
// immediately reuse invalid entries
writeQueue.remove(e);
accessQueue.remove(e);
this.count = newCount; // write-volatile
}
break;
}
}
if (createNewEntry) {
loadingValueReference = new LoadingValueReference<>();
if (e == null) {
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
} else {
e.setValueReference(loadingValueReference);
}
}
} finally {
unlock();
postWriteCleanup();
}
if (createNewEntry) {
try {
// Synchronizes on the entry to allow failing fast when a recursive load is
// detected. This may be circumvented when an entry is copied, but will fail fast most
// of the time.
synchronized (e) {
return loadSync(key, hash, loadingValueReference, loader);
}
} finally {
statsCounter.recordMisses(1);
}
} else {
// The entry already exists. Wait for loading.
return waitForLoadingValue(e, key, valueReference);
}
}
复制代码
从这个枚举处可以找到 Cache 项失效的各种原因,Segment.removeEntry 方法是移除 K-V 的数据。根据官方文档,Cache 的过期策略有以下几种:
还有一种文档不提及的,我认为也算是一个失效策略吗,GC 清除软引用
以上清理方式都是在用户读写缓存过程中,自动进行回收的,假如需要定时回收,可以直接调用 Cache.cleanUp() 方法。
// RemovalCause 策略枚举
public enum RemovalCause {
/**
* The entry was manually removed by the user. This can result from the user invoking {@link
* Cache#invalidate}, {@link Cache#invalidateAll(Iterable)}, {@link Cache#invalidateAll()}, {@link
* Map#remove}, {@link ConcurrentMap#remove}, or {@link Iterator#remove}.
*
* 用户手动移出
*/
EXPLICIT {
@Override
boolean wasEvicted() {
return false;
}
},
/**
* The entry itself was not actually removed, but its value was replaced by the user. This can
* result from the user invoking {@link Cache#put}, {@link LoadingCache#refresh}, {@link Map#put},
* {@link Map#putAll}, {@link ConcurrentMap#replace(Object, Object)}, or {@link
* ConcurrentMap#replace(Object, Object, Object)}.
*
* 用户手动替换
*/
REPLACED {
@Override
boolean wasEvicted() {
return false;
}
},
/**
* The entry was removed automatically because its key or value was garbage-collected. This can
* occur when using {@link CacheBuilder#weakKeys}, {@link CacheBuilder#weakValues}, or {@link
* CacheBuilder#softValues}.
*
* 因为 GC 原因, 软引用会被清理
*/
COLLECTED {
@Override
boolean wasEvicted() {
return true;
}
},
/**
* The entry's expiration timestamp has passed. This can occur when using {@link
* CacheBuilder#expireAfterWrite} or {@link CacheBuilder#expireAfterAccess}.
*
* 因为过期时间到,所以需要清理
*/
EXPIRED {
@Override
boolean wasEvicted() {
return true;
}
},
/**
* The entry was evicted due to size constraints. This can occur when using {@link
* CacheBuilder#maximumSize} or {@link CacheBuilder#maximumWeight}.
*
* 因为 容量 清理
*/
SIZE {
@Override
boolean wasEvicted() {
return true;
}
};
/**
* Returns {@code true} if there was an automatic removal due to eviction (the cause is neither
* {@link #EXPLICIT} nor {@link #REPLACED}).
*/
abstract boolean wasEvicted();
}
// Segment.java
/**
* 清理Segment中的entry,cause 是清楚的原因
*
* @param entry
* @param hash
* @param cause
* @return
*/
@VisibleForTesting
@GuardedBy("this")
boolean removeEntry(ReferenceEntry<K, V> entry, int hash, RemovalCause cause) {
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
if (e == entry) {
++modCount;
ReferenceEntry<K, V> newFirst =
removeValueFromChain(
first,
e,
e.getKey(),
hash,
e.getValueReference().get(),
e.getValueReference(),
cause);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return true;
}
}
return false;
}
/**
* 获取Value,并顺便检测是否过期,假如过期就返回null
*
* Gets the value from an entry. Returns null if the entry is invalid, partially-collected,
* loading, or expired.
*/
V getLiveValue(ReferenceEntry<K, V> entry, long now) {
if (entry.getKey() == null) {
tryDrainReferenceQueues();
return null;
}
V value = entry.getValueReference().get();
if (value == null) {
tryDrainReferenceQueues();
return null;
}
if (map.isExpired(entry, now)) {
// 判断 entry 是否过期,假如是,则尝试对所有segment下所有的 K-V 做一次过期检测
tryExpireEntries(now); // 从此处切入
return null;
}
return value;
}
@GuardedBy("this")
void expireEntries(long now) {
drainRecencyQueue();
ReferenceEntry<K, V> e;
while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
// 从 writeQueue 队列中,逐个 entry 去检查,假如可以删除,则把Entry从table的Entry链中删除
if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
throw new AssertionError();
}
}
while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
// 当上
if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
throw new AssertionError();
}
}
}
/**
* Performs routine cleanup following a read. Normally cleanup happens during writes. If cleanup
* is not observed after a sufficient number of reads, try cleaning up from the read thread.
*/
void postReadCleanup() {
// postRead 读数据后置处理。设置一个readCount计数器,假如达到某个阈值,也触发一次 cleanUp,达到清理的效果
if ((readCount.incrementAndGet() & DRAIN_THRESHOLD) == 0) {
cleanUp();
}
}
复制代码
在 Cache 容量有限的情况下,LRU 算法是一个缓存实现局部性非常重要的环节。作用是尽量让热点数据都在缓存里。
在观察 LRU 算法如何实现前,先从 LRU 如何删除数据开始看。从代码上看, Cache 使用了 recencyQueue 来记录活跃的数据
@GuardedBy("this")
void evictEntries(ReferenceEntry<K, V> newest) {
if (!map.evictsBySize()) { // 不限制容量,那就跳过此方法
return;
}
drainRecencyQueue();
// If the newest entry by itself is too heavy for the segment, don't bother evicting
// anything else, just that
if (newest.getValueReference().getWeight() > maxSegmentWeight) { // 单个Value的Weight比设定的 maxSegmentWeight 直接拒绝插入缓存
if (!removeEntry(newest, newest.getHash(), RemovalCause.SIZE)) {
throw new AssertionError();
}
}
while (totalWeight > maxSegmentWeight) { // 将超过容量的数据逐个删除
ReferenceEntry<K, V> e = getNextEvictable(); // 找到下一次需要被删除的
if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) {
throw new AssertionError();
}
}
}
// 在找到下一个需要删除的
ReferenceEntry<K, V> getNextEvictable() {
for (ReferenceEntry<K, V> e : accessQueue) { // 直接遍历队列,取出weight > 0 的 Entry删除
int weight = e.getValueReference().getWeight();
if (weight > 0) {
return e;
}
}
throw new AssertionError();
}
@GuardedBy("this")
void drainRecencyQueue() {
ReferenceEntry<K, V> e;
while ((e = recencyQueue.poll()) != null) {
// An entry may be in the recency queue despite it being removed from
// the map . This can occur when the entry was concurrently read while a
// writer is removing it from the segment or after a clear has removed
// all of the segment's entries.
if (accessQueue.contains(e)) { // 这段代码实在没看懂
accessQueue.add(e);
}
}
}
// 尝试删除数据 entry
@GuardedBy("this")
boolean removeEntry(ReferenceEntry<K, V> entry, int hash, RemovalCause cause) {
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
if (e == entry) { // 从链表中找到了该数据,直接删除
++modCount;
ReferenceEntry<K, V> newFirst =
removeValueFromChain(
first,
e,
e.getKey(),
hash,
e.getValueReference().get(),
e.getValueReference(),
cause);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return true;
}
}
return false;
}
复制代码
设计一个可用的 Cache 绝对不是一个普通的 Map 这么简单,联系第一篇关于 Cache 的文章,这里小结一下关于 Guava Cache 的知识。
回归到读 LocalCache 的源头,我是希望可以了解 设计一个缓存要考虑什么? , 局部性原理 是一个系统性能提升的最直接的方式(编程上,硬件上当然也可以),缓存的出现就是根据 局部性原理 所设计的。
缓存作为存储金字塔的一部分,一定需要考虑以下几个问题:
在设计何时加载的问题上,Guava Cache 提供了一个 Loader 接口,让用户可以自定义加载过程,在由 Cache 在找不到对象的时候主动调用 Loader 去加载,还通过一个巧妙的方法,既保证了 Loader 的只运行一次,还能保证锁粒度极小,保证并发加载时,安全且高性能。
失效处理上,Guava Cache 提供了基于容量、有限时间(读有限时、写有限时)等失效策略,在官方文档上也写明,在基于限时的情况下,并不是使用一个线程去单独清理过期 K-V,而是把这个清理工作,均摊到每次访问中。假如需要定时清理,也可以调用 CleanUp 方法,定时调用就可以了。
在 Cache 容量有限时, LRU 算法是一个通用的解决方案,在源码中,Guava Cache 并不是严格地保证全局 LRU 的,只是针对一个 Segment 实现 LRU 算法。这个前提是 Segment 对用户来说是随机的,所以全局的 LRU 算法和单个 Segment 的算法是基本一致的。
在 Guava Cache 里,并没有实现任何的写回策略。原因在于,Guava Cache 是一个本地缓存,直接修改对象的数据,Cache 的数据就已经是最新的了,所以在数据能够写入 DB 后,数据就已经完成一致了。