转载

10分钟掌握ConcurrentHashMap 3分钟清楚和HashMap、Hashtable的区别

10分钟掌握ConcurrentHashMap 3分钟清楚和HashMap、Hashtable的区别

ConcurrentHashMap 顾名思义就是同步的HashMap,也就是线程安全的HashMap,所以本篇介绍的ConcurrentHashMap和HashMap有着很重要的关系,所以建议之前没有了解过HashMap的可以先看看这篇关于HashMap的原理分析 《HashMap从认识到源码分析》 ,本篇继续以 JDK1.8 版本的源码进行分析,最后在介绍完ConcurrentHashMap之后会对ConcurrentHashMap、Hashtable和HashMap做一个比较和总结。

ConcurrentHashMap

我们先看一下 ConcurrentHashMap 实现了哪些接口、继承了哪些类,对 ConcurrentHashMap 有一个整体认知。

10分钟掌握ConcurrentHashMap 3分钟清楚和HashMap、Hashtable的区别

ConcurrentHashMap 继承 AbstractMap 接口,这个和 HashMap 一样,然后实现了 ConcurrentMap 接口,这个和 HashMap 不一样, HashMap 是直接实现的 Map 接口。

再细看 ConcurrentHashMap 的结构,这里列举几个重要的成员变量 tablenextTablebaseCountsizeCtltransferIndexcellsBusy
  • table :数据类型是Node数组,这里的Node和HashMap的Node一样都是内部类且实现了 Map.Entry 接口
  • nextTable :哈希表扩容时生成的数据,数组为扩容前的2倍
  • sizeCtl :多个线程的 共享变量 ,是操作的控制标识符,它的作用不仅包括 threshold 的作用,在不同的地方有不同的值也有不同的用途
    • -1 代表正在 初始化
    • -N 代表有 N-1 个线程正在进行 扩容 操作
    • 0 代表hash表还没有被初始化
    • 正数表示下一次进行扩容的容量大小
  • ForwardingNode :一个特殊的Node节点,Hash地址为-1,存储着nextTable的引用,只有table发生扩用的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或者已被移动
    10分钟掌握ConcurrentHashMap 3分钟清楚和HashMap、Hashtable的区别
    ConcurrentHashMapHashMap 一样都是采用 拉链法 处理哈希冲突,且都为了防止单链表过长影响查询效率,所以当链表长度超过某一个值时候将用红黑树代替链表进行存储,采用了 数组+链表+红黑树 的结构
    10分钟掌握ConcurrentHashMap 3分钟清楚和HashMap、Hashtable的区别
    所以从结构上看 HashMapConcurrentHashMap 还是很相似的,只是 ConcurrentHashMap 在某些操作上采用了 CAS + synchronized 来保证并发情况下的安全。
    说到 ConcurrentHashMap 处理并发情况下的线程安全问题,这不得不提到 Hashtable ,因为 Hashtable 也是线程安全的,那 ConcurrentHashMapHashtable 有什么区别或者有什么高明之处嘛?以至于官方都推荐使用 ConcurrentHashMap 来代替 Hashtable
  • 线程安全的实现Hashtable 采用 对象锁 (synchronized修饰对象方法)来保证线程安全,也就是一个 Hashtable 对象只有一把锁,如果线程1拿了对象A的锁进行有 synchronized 修饰的 put 方法,其他线程是无法操作对象A中有 synchronized 修饰的方法的(如 get 方法、 remove 方法等),竞争激烈所以效率低下。而 ConcurrentHashMap 采用 CAS + synchronized 来保证并发安全性,且 synchronized 关键字不是用在方法上而是用在了具体的对象上,实现了更小粒度的锁,等会源码分析的时候在细说这个SUN大师们的鬼斧神工
  • 数据结构的实现: Hashtable 采用的是 数组 + 链表 ,当链表过长会影响查询效率,而 ConcurrentHashMap 采用 数组 + 链表 + 红黑树 ,当链表长度超过某一个值,则将链表转成红黑树,提高查询效率。

构造函数

ConcurrentHashMap 的构造函数有5个,从数量上看就和 HashMapHashtable (4个)的不同,多出的那个构造函数是 public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) ,即除了传入容量大小、负载因子之外还多传入了一个整型的 concurrencyLevel ,这个整型是我们预先估计的并发量,比如我们估计并发是 30 ,那么就可以传入 30

其他的4个构造函数的参数和 HashMap 的一样,而具体的初始化过程却又不相同, HashMapHashtable 传入的容量大小和负载因子都是为了计算出 初始阈值 (threshold),而 ConcurrentHashMap 传入的容量大小和负载因子是为了计算出 sizeCtl 用于初始化 table ,这个sizeCtl即table数组的大小,不同的构造函数计算sizeCtl方法都不一样。

//无参构造函数,什么也不做,table的初始化放在了第一次插入数据时,默认容量大小是16和HashMap的一样,默认sizeCtl为0
public ConcurrentHashMap() {
}

//传入容量大小的构造函数。
public ConcurrentHashMap(int initialCapacity) {
    //如果传入的容量大小小于0 则抛出异常。
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    //如果传入的容量大小大于允许的最大容量值 则cap取允许的容量最大值 否则cap =
    //((传入的容量大小 + 传入的容量大小无符号右移1位 + 1)的结果向上取最近的2幂次方),
    //即如果传入的容量大小是12 则 cap = 32(12 + (12 >>> 1) + 1=19
    //向上取2的幂次方即32),这里为啥一定要是2的幂次方,原因和HashMap的threshold一样,都是为
    //了让位运算和取模运算的结果一样。
    //MAXIMUM_CAPACITY即允许的最大容量值 为2^30。
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               //tableSizeFor这个函数即实现了将一个整数取2的幂次方。
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    //将上面计算出的cap 赋值给sizeCtl,注意此时sizeCtl为正数,代表进行扩容的容量大小。
    this.sizeCtl = cap;
}

//包含指定Map的构造函数。
//置sizeCtl为默认容量大小 即16。
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}

//传入容量大小和负载因子的构造函数。
//默认并发数大小是1。
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

//传入容量大小、负载因子和并发数大小的构造函数
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    //如果传入的容量大小 小于 传入的并发数大小,
    //则容量大小取并发数大小,这样做的原因是确保每一个Node只会分配给一个线程,而一个线程则
    //可以分配到多个Node,比如当容量大小为64,并发数大
    //小为16时,则每个线程分配到4个Node。
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    //size = 1.0 + (long)initialCapacity / loadFactor 这里计算方法和上面的构造函数不一样。
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    //如果size大于允许的最大容量值则 sizeCtl = 允许的最大容量值 否则 sizeCtl =
    //size取2的幂次方。
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}
复制代码

put方法

  1. 判断键值是否为 null ,为 null 抛出异常。
  2. 调用 spread() 方法计算key的hashCode()获得哈希地址,这个HashMap相似。
  3. 如果当前table为空,则初始化table,需要注意的是这里并没有加 synchronized ,也就是允许多个线程去 尝试 初始化table,但是在初始化函数里面使用了 CAS 保证只有一个线程去执行初始化过程。
  4. 使用 容量大小-1 & 哈希地址 计算出待插入键值的下标,如果该下标上的bucket为 null ,则直接调用实现 CAS 原子性操作的 casTabAt() 方法将节点插入到table中,如果插入成功则完成put操作,结束返回。插入失败(被别的线程抢先插入了)则继续往下执行。
  5. 如果该下标上的节点(头节点)的哈希地址为-1,代表需要扩容,该线程执行 helpTransfer() 方法协助扩容。
  6. 如果该下标上的bucket不为空,且又不需要扩容,则进入到bucket中,同时 锁住这个bucket ,注意只是锁住该下标上的bucket而已,其他的bucket并未加锁,其他线程仍然可以操作其他未上锁的bucket,这个就是ConcurrentHashMap为什么高效的原因之一。
  7. 进入到bucket里面,首先判断这个bucket存储的是红黑树(哈希地址小于0,原因后面分析)还是链表。
  8. 如果是 链表 ,则遍历链表看看是否有哈希地址和键key相同的节点,有的话则根据传入的参数进行覆盖或者不覆盖,没有找到相同的节点的话则将新增的节点 插入到链表尾部 。如果是 红黑树 ,则将节点插入。到这里 结束加锁
  9. 最后判断该bucket上的链表长度是否大于 链表转红黑树的阈值(8) ,大于则调用 treeifyBin() 方法将链表转成红黑树,以免链表过长影响效率。
  10. 调用 addCount() 方法,作用是将ConcurrentHashMap的键值对数量+1,还有另一个作用是检查ConcurrentHashMap是否需要扩容。
public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    //不允许键值为null,这点与线程安全的Hashtable保持一致,和HashMap不同。
    if (key == null || value == null) throw new NullPointerException();
    //取键key的hashCode()和HashMap、Hashtable都一样,然后再执行spread()方法计算得到哈希地
    //址,这个spread()方法和HashMap的hash()方法一样,都是将hashCode()做无符号右移16位,只不
    //过spread()加多了 &0x7fffffff,让结果为正数。
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //如果table数组为空或者长度为0(未初始化),则调用initTable()初始化table,初始化函数
        //下面介绍。
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //调用实现了CAS原子性操作的tabAt方法
        //tabAt方法的第一个参数是Node数组的引用,第二个参数在Node数组的下标,实现的是在Nod
        //e数组中查找指定下标的Node,如果找到则返回该Node节点(链表头节点),否则返回null,
        //这里的i = (n - 1)&hash即是计算待插入的节点在table的下标,即table容量-1的结果和哈
        //希地址做与运算,和HashMap的算法一样。
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //如果该下标上并没有节点(即链表为空),则直接调用实现了CAS原子性操作的
            //casTable()方法,
            //casTable()方法的第一个参数是Node数组的引用,第二个参数是待操作的下标,第三
            //个参数是期望值,第四个参数是待操作的Node节点,实现的是将Node数组下标为参数二
            //的节点替换成参数四的节点,如果期望值和实际值不符返回false,否则参数四的节点成
            //功替换上去,返回ture,即插入成功。注意这里:如果插入成功了则跳出for循环,插入
            //失败的话(其他线程抢先插入了),那么会执行到下面的代码。
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //如果该下标上的节点的哈希地址为-1(即链表的头节点为ForwardingNode节点),则表示
        //table需要扩容,值得注意的是ConcurrentHashMap初始化和扩容不是用同一个方法,而
        //HashMap和Hashtable都是用同一个方法,当前线程会去协助扩容,扩容过程后面介绍。
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        //如果该下标上的节点既不是空也不是需要扩容,则表示这个链表可以插入值,将进入到链表
        //中,将新节点插入或者覆盖旧值。
        else {
            V oldVal = null;
            //通过关键字synchroized对该下标上的节点加锁(相当于锁住锁住
            //该下标上的链表),其他下标上的节点并没有加锁,所以其他线程
            //可以安全的获得其他下标上的链表进行操作,也正是因为这个所
            //以提高了ConcurrentHashMap的效率,提高了并发度。
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //如果该下标上的节点的哈希地址大于等于0,则表示这是
                    //个链表。
                    if (fh >= 0) {
                        binCount = 1;
                        //遍历链表。
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //如果哈希地址、键key相同 或者 键key不为空
                            //且键key相同,则表示存在键key和待插入的键
                            //key相同,则执行更新值value的操作。
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            //如果找到了链表的最后一个节点都没有找到相
                            //同键Key的,则是插入操作,将插入的键值新建
                            //个节点并且添加到链表尾部,这个和HashMap一
                            //样都是插入到尾部。
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //如果该下标上的节点的哈希地址小于0 且为树节点
                    //则将带插入键值新增到红黑树
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        //如果插入的结果不为null,则表示为替换
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash,
                        key,value)) != null){
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            //判断链表的长度是否大于等于链表的阈值(8),大于则将链表转成
            //红黑树,提高效率。这点和HashMap一样。
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}
复制代码

get方法

spread()
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //运用键key的hashCode()计算出哈希地址
    int h = spread(key.hashCode());
    //如果table不为空 且 table长度大于0 且 计算出的下标上bucket不为空,
    //则代表这个bucket存在,进入到bucket中查找,
    //其中(n - 1) & h为计算出键key相对应的数组下标的算法。
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        //如果哈希地址、键key相同则表示查找到,返回value,这里查找到的是头节点。
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        //如果bucket头节点的哈希地址小于0,则代表bucket为红黑树,在红黑树中查找。
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        //如果bucket头节点的哈希地址不小于0,则代表bucket为链表,遍历链表,在链表中查找。
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
复制代码

remove方法

  1. 调用 spread() 方法计算出键key的哈希地址。
  2. 计算出键key所在的数组下标,如果table为空或者bucket为空,则返回 null
  3. 判断当前table是否正在扩容,如果在扩容则调用helpTransfer方法协助扩容。
  4. 如果table和bucket都不为空,table也不处于在扩容状态,则 锁住当前bucket ,对bucket进行操作。
  5. 根据bucket的头结点判断bucket是链表还是红黑树。
  6. 在链表或者红黑树中移除哈希地址、键key相同的节点。
  7. 调用 addCount 方法,将当前table存储的键值对数量-1。
public V remove(Object key) {
    return replaceNode(key, null, null);
}
    
final V replaceNode(Object key, V value, Object cv) {
    //计算需要移除的键key的哈希地址。
    int hash = spread(key.hashCode());
    //遍历table。
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //table为空,或者键key所在的bucket为空,则跳出循环返回。
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        //如果当前table正在扩容,则调用helpTransfer方法,去协助扩容。
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            boolean validated = false;
            //将键key所在的bucket加锁。
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //bucket头节点的哈希地址大于等于0,为链表。
                    if (fh >= 0) {
                        validated = true;
                        //遍历链表。
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            //找到哈希地址、键key相同的节点,进行移除。
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    //如果bucket的头节点小于0,即为红黑树。
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        //找到节点,并且移除。
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            //调用addCount方法,将当前ConcurrentHashMap存储的键值对数量-1。
            if (validated) {
                if (oldVal != null) {
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}
复制代码

initTable初始化方法

table 的初始化主要由initTable()方法实现的,initTable()方法初始化一个合适大小的数组,然后设置sizeCtl。 我们知道 ConcurrentHashMap 是线程安全的,即支持多线程的,那么一开始很多个线程同时执行 put() 方法,而 table 又没初始化,那么就会很多个线程会去执行initTable()方法尝试初始化table,而 put 方法和 initTable 方法都是没有加锁的(synchronize),那SUN的大师们是怎么保证线程安全的呢? 通过源码可以看得出,table的初始化只能由一个线程完成,但是每个线程都可以争抢去初始化table。

  1. 判断table是否为 null ,即需不需要首次初始化,如果某个线程进到这个方法后,其他线程已经将table初始化好了,那么该线程结束该方法返回。
  2. 如果table为 null ,进入到while循环,如果 sizeCtl 小于0(其他线程正在对table初始化),那么该线程调用 Thread.yield() 挂起该线程,让出CPU时间,该线程也从运行态转成就绪态,等该线程从就绪态转成运行态的时候,别的线程已经table初始化好了,那么该线程结束while循环,结束初始化方法返回。如果从就绪态转成运行态后,table仍然为 null ,则继续while循环。
  3. 如果table为 nullsizeCtl 不小于0,则调用实现 CAS 原子性操作的 compareAndSwap() 方法将sizeCtl设置成-1,告诉别的线程我正在初始化table,这样别的线程无法对table进行初始化。如果设置成功,则再次判断table是否为空,不为空则初始化table,容量大小为默认的容量大小(16),或者为sizeCtl。其中sizeCtl的初始化是在构造函数中进行的,sizeCtl = ((传入的容量大小 + 传入的容量大小无符号右移1位 + 1)的结果向上取最近的2幂次方)
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    //如果table为null或者长度为0, //则一直循环试图初始化table(如果某一时刻别的线程将table初始化好了,那table不为null,该//线程就结束while循环)。
    while ((tab = table) == null || tab.length == 0) {
        //如果sizeCtl小于0,
        //即有其他线程正在初始化或者扩容,执行Thread.yield()将当前线程挂起,让出CPU时间,
        //该线程从运行态转成就绪态。
        //如果该线程从就绪态转成运行态了,此时table可能已被别的线程初始化完成,table不为
        //null,该线程结束while循环。
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        //如果此时sizeCtl不小于0,即没有别的线程在做table初始化和扩容操作,
        //那么该线程就会调用Unsafe的CAS操作compareAndSwapInt尝试将sizeCtl的值修改成
        //-1(sizeCtl=-1表示table正在初始化,别的线程如果也进入了initTable方法则会执行
        //Thread.yield()将它的线程挂起 让出CPU时间),
        //如果compareAndSwapInt将sizeCtl=-1设置成功 则进入if里面,否则继续while循环。
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                //再次确认当前table为null即还未初始化,这个判断不能少。
                if ((tab = table) == null || tab.length == 0) {
                    //如果sc(sizeCtl)大于0,则n=sc,否则n=默认的容量大
                    小16,
                    //这里的sc=sizeCtl=0,即如果在构造函数没有指定容量
                    大小,
                    //否则使用了有参数的构造函数,sc=sizeCtl=指定的容量大小。
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    //创建指定容量的Node数组(table)。
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    //计算阈值,n - (n >>> 2) = 0.75n当ConcurrentHashMap储存的键值对数量
                    //大于这个阈值,就会发生扩容。
                    //这里的0.75相当于HashMap的默认负载因子,可以发现HashMap、Hashtable如果
                    //使用传入了负载因子的构造函数初始化的话,那么每次扩容,新阈值都是=新容
                    //量 * 负载因子,而ConcurrentHashMap不管使用的哪一种构造函数初始化,
                    //新阈值都是=新容量 * 0.75。
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}
复制代码

transfer扩容方法

transfer() 方法为 ConcurrentHashMap 扩容操作的核心方法。由于 ConcurrentHashMap 支持多线程扩容,而且也没有进行加锁,所以实现会变得有点儿复杂。整个扩容操作分为两步:

  1. 构建一个nextTable,其大小为原来大小的 两倍 ,这个步骤是在单线程环境下完成的
  2. 将原来table里面的内容复制到nextTable中,这个步骤是允许 多线程 操作的,所以性能得到提升,减少了扩容的时间消耗。
//协助扩容方法
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    //如果当前table不为null 且 f为ForwardingNode节点 且 //新的table即nextTable存在的情况下才能协助扩容,该方法的作用是让线程参与扩容的复制。
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            //更新sizeCtl的值,+1,代表新增一个线程参与扩容
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

//扩容的方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    //根据服务器CPU数量来决定每个线程负责的bucket数量,避免因为扩容的线程过多反而影响性能。
    //如果CPU数量为1,则stride=1,否则将需要迁移的bucket数量(table大小)除以CPU数量,平分给
    //各个线程,但是如果每个线程负责的bucket数量小于限制的最小是(16)的话,则强制给每个线程
    //分配16个bucket数。
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    //如果nextTable还未初始化,则初始化nextTable,这个初始化和iniTable初始化一样,只能由
    //一个线程完成。
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    //分配任务和控制当前线程的任务进度,这部分是transfer()的核心逻辑,描述了如何与其他线
    //程协同工作。
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        //迁移过程(对当前指向的bucket),这部分的逻辑与HashMap类似,拿旧数组的容量当做一
        //个掩码,然后与节点的hash进行与操作,可以得出该节点的新增有效位,如果新增有效位为
        //0就放入一个链表A,如果为1就放入另一个链表B,链表A在新数组中的位置不变(跟在旧数
        //组的索引一致),链表B在新数组中的位置为原索引加上旧数组容量。
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}
复制代码

addCount、sumCount方法

addCount() 做的工作是更新table的size,也就是table存储的键值对数量,在使用 put()remove() 方法的时候都会在执行成功之后调用 addCount() 来更新table的size。对于 ConcurrentHashMap 来说,它到底有储存有多少个键值对,谁也不知道,因为他是支持并发的,储存的数量无时无刻都在变化着,所以说 ConcurrentHashMap 也只是统计一个大概的值,为了统计出这个值也是大费周章才统计出来的。

10分钟掌握ConcurrentHashMap 3分钟清楚和HashMap、Hashtable的区别
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    //如果计算盒子不是空,或者修改baseCount的值+x失败,则放弃对baseCount的修改。
    //这里的大概意思就是首先尝试直接修改baseCount,达到计数的目的,如果修改baseCount失败(
    //多个线程同时修改,则失败)
    //则使用CounterCell数组来达到计数的目的。
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        //如果计数盒子是空的 或者随机取余一个数组为空 或者修改这个槽位的变量失败,
        //即表示出现了并发,则执行fullAddCount()方法进行死循环插入,同时返回,
        //否则代表修改这个槽位的变量成功了,继续往下执行,不进入if。
        //每个线程都会通过ThreadLocalRandom.getProbe() & m寻址找到属于它的CounterCell,
        //然后进行计数。ThreadLocalRandom是一个线程私有的伪随机数生成器,
        //每个线程的probe都是不同的。CounterCell数组的大小永远是一个2的n次方,初始容量
        //为2,每次扩容的新容量都是之前容量乘以二,处于性能考虑,它的最大容量上限是机器
        //的CPU数量,所以说CounterCell数组的碰撞冲突是很严重的。
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
             //并发过大,使用CAS修改CounterCell失败时候执行fullAddCount,
            fullAddCount(x, uncontended);
            return;
        }
        //如果上面对盒子的赋值成功,且check<=1,则直接返回,否则调用sumConut()方法计算
        if (check <= 1)
            return;
        s = sumCount();
    }
    //如果check>=0,则检查是否需要扩容。
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}
复制代码

size、mappingCount方法

sizemappingCount 方法都是用来统计table的size的,这两者不同的地方在 size 返回的是一个 int 类型,即可以表示size的范围是[-2^31,2^31-1],超过这个范围就返回int能表示的最大值, mappingCount 返回的是一个 long 类型,即可以表示size的范围是[-2^63,2^63-1]。

这两个方法都是调用的sumCount()方法实现统计。

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}
    
public long mappingCount() {
    long n = sumCount();
    return (n < 0L) ? 0L : n; // ignore transient negative values
}
复制代码

HashMap、Hashtable、ConcurrentHashMap三者对比

/ HashMap Hashtable ConcurrentHashMap
是否线程安全
线程安全采用的方式 采用 synchronized 类锁,效率低 采用 CAS + synchronized ,锁住的只有当前操作的 bucket ,不影响其他线程对其他bucket的操作,效率高
数据结构 数组+链表+红黑树(链表长度超过8则转红黑树) 数组+链表 数组+链表+红黑树(链表长度超过8则转红黑树)
是否允许 null 键值
哈希地址算法 (key的hashCode)^(key的hashCode无符号右移16位) key的hashCode ( (key的hashCode)^(key的hashCode无符号右移16位) )&0x7fffffff
定位算法 哈希地址&(容量大小-1) (哈希地址&0x7fffffff)%容量大小 哈希地址&(容量大小-1)
扩容算法 当键值对数量大于阈值,则容量扩容到原来的2倍 当键值对数量大于等于阈值,则容量扩容到原来的2倍+1 当键值对数量大于等于sizeCtl, 单线程创建新哈希表,多线程复制bucket到新哈希表 ,容量扩容到原来的2倍
链表插入 将新节点插入到链表 尾部 将新节点插入到链表 头部 将新节点插入到链表 尾部
继承的类 继承 abstractMap 抽象类 继承 Dictionary 抽象类 继承 abstractMap 抽象类
实现的接口 实现 Map 接口 实现 Map 接口 实现 ConcurrentMap 接口
默认容量大小 16 11 16
默认负载因子 0.75 0.75 0.75
统计size方式 直接返回成员变量 size 直接返回成员变量 count 遍历 CounterCell 数组的值进行累加,最后加上 baseCount 的值即为 size
原文  https://juejin.im/post/5c8276216fb9a049d51a4cd6
正文到此结束
Loading...