HashMap 是非线程安全的,put操作可能导致死循环。其解决方案有 HashTable 和 Collections.synchronizedMap(hashMap) 。这两种方案都是对读写加锁,独占式,效率比较低下。
HashMap 在并发执行put操作时会引起死循环,因为多线程导致 HashMap 的 Entry 链表形成环形数据结构,则 Entry 的 next 节点永远不为空,会死循环获取 Entry。
HashTable 使用 synchronized 来保证线程安全,但是在线程竞争激烈的情况下,效率非常低。其原因是所有访问该容器的线程都必须竞争一把锁。
针对上述问题,ConcurrentHashMap 使用锁分段技术,容器里有多把锁,每一把锁用于其中一部分数据,当多线程访问不同数据段的数据时,线程间就不会存在锁的竞争。
在JDK 1.7中,ConcurrentHashMap 采用了 Segment 数组和 HashEntry 数组的方式进行实现。其中 Segment 是一种可重入锁(ReentrantLock),扮演锁的角色。而 HashEntry 则是用于存储键值对的数据。结构如下图所示:
一个 Segment 包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素。每个 Segment 守护一个 HashEntry 数组的元素。
初始化时,计算出 Segment 数组的大小 ssize 和每个 Segment 中 HashEntry 数组的大小 cap,并初始化 Segment 数组的第一个元素。其中 ssize 为2的幂次方,默认为16,cap 大小也是2的幂次方,最小值为2。最终结果根据初始化容量 initialCapacity 计算。
//计算segment数组长度
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//初始化segmentShift和SegmentMask
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
//计算每个Segment中HashEntry数组大小cap
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 初始化segment数组和segment[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
复制代码
首先,初始化了 segments 数组,其长度 ssize 是通过 concurrencyLevel 计算得出的。需要保证ssize的长度是2的N次方,segments 数组的长度最大是65536。
然后初始化了 segmentShift 和 segmentMask 这两个全局变量,用于定位 segment 的散列算法。segmentShift 是用于散列运算的位数, segmentMask 是散列运算的掩码。
之后根据 initialCapaicity 和 loadfactor 这两个参数来计算每个 Segment 中 HashEntry 数组的大小 cap。
最后根据以上确定的参数,初始化了 segment 数组以及 segment[0]。
整个 get 操作过程都不需要加锁,因此非常高效。首先将 key 经过 Hash 之后定位到 Segment,然后再通过一个 Hash 定位到具体元素。不需要加锁是因为 get 方法将需要使用的共享变量都定义成 volatile 类型,因此能在线程之间保持可见性,在多线程同时读时能保证不会读到过期的值。
put 方法需要对共享变量进行写入操作,为了线程安全,必须加锁。 put 方法首先定位到 Segment,然后在 Segment 里进行插入操作。插入操作首先要判断是否需要对 Segment 里的 HashEntry 数组进行扩容,然后定位添加元素的位置,将其放入到 HashEntry 数组。
Segment 的扩容比 HashMap 更恰当,因为后者是插入元素后判断是否已经到达容量,如果到达了就扩容,但是可能扩容后没有插入,进行了无效的扩容。Segment 在扩容时,首先创建一个原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组。同时为了高效, ConcurrentHashMap 不会对整个容器进行扩容,而是只对某个 segment 进行扩容。
每个 Segment 都有一个 volatile 修饰的全局变量 count ,求整个 ConcurrentHashMap 的 size 时很明显就是将所有的 count 累加即可。但是 volatile 修饰的变量却不能保证多线程的原子性,所有直接累加很容易出现并发问题。但是如果在调用 size 方法时锁住其余的操作,效率也很低。
ConcurrentHashMap 的做法是先尝试两次通过不加锁的方式进行计算,如果两次结果相同,说明结果正确。如果计算结果不同,则给每个 Segment 加锁,进行统计。
在JDK 1.8中,改变了分段锁的思路,采用了 Node数组 + CAS + Synchronized 来保证并发安全。底层仍然采用了数组+链表+红黑树的存储结构。
在JDK 1.8中,使用 Node 替换 HashEntry,两者作用相同。在 Node 中, val 和 next 两个变量都是 volatile 修饰的,保证了可见性。
使用 table 变量存放 Node 节点数组,默认为 null, 默认大小为16,且每次扩容时大小总是2的幂次方。在扩容时,使用 nextTable 存放新生成的数据,数组为 table 的两倍。
ForwardingNode 是一个特殊的 Node 节点,hash 值为-1,存储了 nextTable 的引用。只有table 发生扩容时,其发生作用,作为占位符放在 table 中表示当前节点为null或者已经被移动。
在 HashMap 中,其核心的数据结构是链表。而在 ConcurrentHashMap 中,如果链表的数据过长会转换为红黑树来处理。通过将链表的节点包装成 TreeNode 放在 TreeBin 中,然后经由 TreeBin 完成红黑树的转换。
TreeBin 不负责键值对的包装,用于在链表转换为红黑树时包装 TreeNode 节点,用来构建红黑树。
在构造函数中,ConcurrentHashMap 仅仅设置了一些参数。当首次插入元素时,才通过 initTable() 方法进行了初始化。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//有其他线程在初始化,挂起当前线程
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//获得了初始化的权利,使用CAS将sizeCtl设置为-1,表示本线程正在初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//进行初始化
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//下次扩容的大小,相当于0.75*n
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
复制代码
该方法的关键为sizeCtl。
sizeCtl:控制标识符,用来控制table初始化和扩容操作的,在不同的地方有不同的用途,其值也不同,所代表的含义也不同:
sizeCtl 默认为0。如果该值小于0,表示有其他线程在初始化,需要暂停该线程。如果该线程获取了初始化的权利,先将其设置为-1。最后将 sizeCtl 设置为 0.75*n,表示扩容的阈值。
put操作的核心思想依然是根据 hash 计算节点插入在 table 的位置,如果为空,直接插入,否则插入到链表或树中。
首先计算hash值,然后进入循环中遍历table,尝试插入。
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//详细代码接下来分别讲述
}
复制代码
首先判断 table 是否为空,如果为空或者是 null,则先进行初始化操作。
if (tab == null || (n = tab.length) == 0)
tab = initTable();
复制代码
如果已经初始化过,且插入的位置没有节点,直接插入该节点。使用CAS尝试插入该节点。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
复制代码
如果有线程在扩容,先帮助扩容。
//当前位置的hashcode等于-1,需要扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
复制代码
如果都不满足,使用 synchronized 锁写入数据。根据数据结构的不同,如果是链表则插入尾部;如果是树节点,使用树的插入操作。
else {
V oldVal = null;
//对该节点进行加锁处理(hash值相同的链表的头节点)
synchronized (f) {
if (tabAt(tab, i) == f) {
//fh > 0 表示为链表,将该节点插入到链表尾部
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//hash 和 key 都一样,替换value
if (e.hash == hash &&
((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
//putIfAbsent()
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//链表尾部 直接插入
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//树节点,按照树的插入操作进行插入
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
复制代码
在for循环的最后,判断链表的长度是否需要链表转换为树结构。
if (binCount != 0) {
// 如果链表长度已经达到临界值8,把链表转换为树结构
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
复制代码
最后,如果是更新节点,前边已经返回了 oldVal,否则就是插入新的节点。还需要使用 addCount() 方法,为 size 加一。
总结步骤如下: