Guava LocalCache 缓存介绍及实现源码深入剖析
guava LocalCache与ConcurrentHashMap有以下不同
可以直观看到cache是以segment粒度来控制并发get和put等操作的
Map类结构简单说就是数组 + 链表,最基本的数据单元是entry
为了减少不必须的load加载,在value引用中增加了loading标识和wait方法等待加载获取值。这样,就可以等待上一个调用loader方法获取值,而不是重复去调用loader方法加重系统负担,而且可以更快的获取对应的值。
在Cache分别实现了基于Strong,Soft,Weak三种形式的ValueReference实现。
主要逻辑就两个:lockedGetOrLoad 和 waitForLoadingValue
下列代码只保留了load部分
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
ReferenceEntry<K, V> e;
ValueReference<K, V> valueReference = null;
LoadingValueReference<K, V> loadingValueReference = null;
boolean createNewEntry = true;
lock();
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
// 计算key在数组中的落点
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
// 沿着某个index 链表依次遍历
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
valueReference = e.getValueReference();
V value = valueReference.get();
if (value == null || map.isExpired(e, now){
enqueueNotification(...);
} else {
return value;
}
this.count = newCount; // write-volatile
break;
}
}
loadingValueReference = new LoadingValueReference<>();
if (e == null) {
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);table.set(index, e);
} else {
e.setValueReference(loadingValueReference);
}
unlock();
synchronized (e) {
return loadSync(key, hash,loadingValueReference, loader);
}
}
segment 简单说也是数组加链表,只是元素类型是ReferenceEntry,根据key 计算index,然后沿着链表匹配value,若相同,判断value元素是否有效,无效(null or 过期)则创建loadingValueReference 并更新到 ReferenceEntry。loadingValueReference.loadFuture 开始执行load逻辑。
只有ReferenceEntry 更新 其value引用 loadingValueReference 的部分是需要加锁的,之后线程竞争便转移到了 loadingValueReference 上
V loadSync(K key,int hash,
LoadingValueReference<K, V>,loadingValueReference,CacheLoader<? super K, V> loader)throws ExecutionException {
ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
return getAndRecordStats(key, hash,loadingValueReference, loadingFuture);
}
V waitForLoadingValue(ReferenceEntry<K, V> e, K key, ValueReference<K, V> valueReference)
throws ExecutionException {
checkState(!Thread.holdsLock(e), "Recursive load of: %s", key);
V value = valueReference.waitForValue();
if (value == null) {
throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + ".");
}
...
return value;
}
static class LoadingValueReference<K, V> implements ValueReference<K, V> {
volatile ValueReference<K, V> oldValue;
final SettableFuture<V> futureValue =SettableFuture.create();
final Stopwatch stopwatch =Stopwatch.createUnstarted();
public boolean set(@Nullable V newValue) {
return futureValue.set(newValue);
}
public V get() {
return oldValue.get();
}
public V waitForValue() throws ExecutionException {
// 对future.get的封装
return getUninterruptibly(futureValue);
}
public boolean setException(Throwable t) {
return futureValue.setException(t);
}
public void notifyNewValue(@Nullable V newValue) {
if (newValue != null) {
// future.get ==> waitForValue即可立即返回
set(newValue);
} else {
oldValue = unset();
}
}
}
所谓请求合并:当多个线程请求同一个key时,第一个线程执行loader逻辑,其余线程等待。
从上述代码可以看到
LoadingValueReference.waitForValue ==> future.get 准备等结果了 Guava Cache内存缓存使用实践-定时异步刷新及简单抽象封装
refreshAfterWrite 注意不是 expireAfterWrite
如果缓存过期,恰好有多个线程读取同一个key的值,那么guava只允许一个线程去加载数据,其余线程阻塞。这虽然可以防止大量请求穿透缓存,但是效率低下。使用refreshAfterWrite可以做到:只阻塞加载数据的线程,其余线程返回旧数据。
LoadingCache<String, Object> caches = CacheBuilder.newBuilder()
.maximumSize(100)
.refreshAfterWrite(10, TimeUnit.MINUTES)
.build(new CacheLoader<String, Object>() {
@Override
public Object load(String key) throws Exception {
return generateValueByKey(key);
}
});
真正加载数据的那个线程一定会阻塞,可以让这个加载过程是异步的,这样就可以让所有线程立马返回旧值
ListeningExecutorService backgroundRefreshPools = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(20)); LoadingCache<String, Object> caches = CacheBuilder.newBuilder()
.maximumSize(100)
.refreshAfterWrite(10, TimeUnit.MINUTES)
.build(new CacheLoader<String, Object>() {
@Override
public Object load(String key) throws Exception {
return generateValueByKey(key);
}
@Override
public ListenableFuture<Object> reload(String key, Object oldValue) throws Exception {
return backgroundRefreshPools.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
return generateValueByKey(key);
}
});
}
});