Java 的源代码学习(2)—— HashMap 和 ConcurrentHashMap

Java 源代码学习系列歇菜了一年多了,实在是太离谱了,那么现在继续这个系列。

“Java 的源代码学习”系列

(1)基本类型和对应的类

(2)HashMap 和 ConcurrentHashMap (本文)

HashMap 这个数据结构在教科书上已经被讲烂了,而且我们在大学期间也写过无数次它的实现。不过“魔鬼藏在细节之中”,作为一个通用的类库,要考虑的东西远远不止在学校学的如何去实现其基本功能。有很多需要考虑的因素,为了更好地积(zhun)累(bei)经(mian)验(shi),还是要仔细读一下它的代码。

但是还是要先唠叨一下(因为各语言中容器类型是在是太多,而且有些取名都很奇葩)。在 Java 中,HashMap 是基于桶的散列器,并且不是线程安全的(而 Hashtable 是线程安全的),并且其允许 null 类型作为 key。

首先来看一下它的常量定义以及成员变量:

public class HashMap<K,V>
    extends AbstractMap<K,V>
    implements Map<K,V>, Cloneable, Serializable
{
    static final int DEFAULT_INITIAL_CAPACITY = 4;
    static final int MAXIMUM_CAPACITY = 1 << 30;
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
    static final HashMapEntry<?,?>[] EMPTY_TABLE = {};
    transient HashMapEntry<K,V>[] table = (HashMapEntry<K,V>[]) EMPTY_TABLE;
    transient int size;
    int threshold;
    final float loadFactor = DEFAULT_LOAD_FACTOR;
    transient int modCount;
    ...
}

这些都是大家都能背出来的内容。有趣的是 EMPTY_TABLE 是一个 HashMapEntry<?, ?>[] 类型,而作为成员变量的 table 会将其转型成 HashMapEntry<K, V>[] 类型作为初始值。而 modCount 相当于 .NET 源代码中的 version,用于在迭代时防止容器内容被更改。只有  threshold 变量没有 transient 修饰,因为其是可以进行自定义的内容。

很显然 HashMapEntry 会组织成一个链表,其包含四个字段:key、value、next 以及 hash。为啥它会定义一个 hash 字段呢?这个问题先放在这里。

我们直接看 resize 方法好了,看了这个方法后它的大致写法基本上都会明白了。

    void resize(int newCapacity) {
        HashMapEntry[] oldTable = table;
        int oldCapacity = oldTable.length;
        if (oldCapacity == MAXIMUM_CAPACITY) {
            threshold = Integer.MAX_VALUE;
            return;
        }

        HashMapEntry[] newTable = new HashMapEntry[newCapacity];
        transfer(newTable);
        table = newTable;
        threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
    }

    void transfer(HashMapEntry[] newTable) {
        int newCapacity = newTable.length;
        for (HashMapEntry<K,V> e : table) {
            while(null != e) {
                HashMapEntry<K,V> next = e.next;
                int i = indexFor(e.hash, newCapacity);
                e.next = newTable[i];
                newTable[i] = e;
                e = next;
            }
        }
    }

只是简单粗暴地把元素重新映射到新的数组里,实在是没有啥好看的。而至于每个元素映射到哪个桶中,仅仅是简单地取低位的哈希值。顺带一提,以下这个 indexFor 的方法决定了哈希表的长度必须是 2^n,否则 & 运算之后就会乱套了。

    static int indexFor(int h, int length) {
        // assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2";
        return h & (length-1);
    }

关于哈希值的计算,Java 使用的是 Single-word Wang/Jenkins 的哈希方法,具体你可以参考这里

这个类实在是没什么好看的了,那么我们现在来看线程安全的  ConcurrentHashMap 的代码吧。这个类就要复杂得多,我们好好地研究一下吧。

Hashtable 是一个线程安全的哈希表,然而它与 HashMap 的主要不同之处就是很多方法都是 synchronized 修饰的,这样意味着性能实在是堪忧,于是就有了 ConcurrentHashMap。而相对于非线程安全的 HashMap 以及性能堪忧的 Hashtable 来说,ConcurrentHashMap 显然会复杂很多。

记得老夫以前分析过 M$ 写的线程安全的队列吗?实际上 Java 也是用的类似的方法(但是在 Java 1.8 以前不是这样做的,在 Java 1.8 以前,此类使用的是分段锁来进行并发控制,而到 Java 1.8 开始,全面使用 CAS,使并发度大为提高,然而 M$ 早就开始使用 CAS 实现容器了。我们仅分析最新版本的情况)。首先来看一下它定义的成员变量:

transient volatile Node<K,V>[] table;
private transient volatile Node<K,V>[] nextTable;

private transient volatile long baseCount;
private transient volatile int sizeCtl;
private transient volatile int transferIndex;
private transient volatile int cellsBusy;
private transient volatile CounterCell[] counterCells;

private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;

这么多乱七八糟的变量从名字貌似也看不出啥来,所以我们一边看代码一边分析好了。首先来看一下它的 get 方法。

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

看似有点复杂,但是大体上可以看出来它的意图是,先使用 spread 方法计算出 key 对应的结点位置。大家可能会觉得奇怪,既然都找到它所在的结点了,为什么还要还判断这个结点的哈希值是否与要找的 key 的哈希值相等呢?原来 ConcurrentHashMap 共有四种结点类型,从 get 方法中可以看出两种。

第一种是我们所熟悉的链表,也就是内层的 if 与 while 循环处理的情况。第二种,就是 eh < 0 的情况,此时会调用它的 find 方法进行查找。在 ConcurrentHashMap 的内部类中,共有三个类继承了 Node 类,分别是:TreeNode、TreeBin 以及 ForwardingNode。

这就很明确了,除了链表外,在同一个桶中如果元素太多,则会采用红黑树(也就是 TreeBin 这个类)来记录这些冲突的结点。与 table 直接关联的是 TreeBin 对象,而 TreeNode 作为红黑树的结点从属于 TreeBin。关于 TreeBin 的插入、创建与删除方法,乃可以自行参考红黑树的具体实现(比如维基百科上的描述),这里就不说了(因为实在是太复杂了)。

在计算对应的桶所在的位置时,spread 方法会将哈希值的高位与低位进行异或操作,这样做的好处是使绝大多数情况计算出来的位置都尽量分散。因为哈希表通常会比 hashCode 的范围(hashCode 是 int 类型)小很多,在计算对应的桶时,会用其低位作为下标((n – 1) & h) 这个计算),如果这些 key 的低位基本都一样(比如浮点数),那么就基本上会集中在同一块了。

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

而 tabAt 方法用户查找对应的 Node 对象,它的实现如下:

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

U 提供 Unsafe 对象的访问(这个类暂时不对开发者公开,如果你调用了,Java 会抛出 SecurityException 的异常,显然是 Java 认为 developers 的水平不高),和 C# 类似用于直接操作内存。这样,就可以绕过一堆托管代码直接访问到所在的结点,提高了效率。ABASE、ASHIFT 对应 table (实际上是一个数组)所在的起始位置和每段长度,通过初始化代码就可以看出来了:

private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final int ABASE;
private static final int ASHIFT;

static {
    try {
        SIZECTL = U.objectFieldOffset
            (ConcurrentHashMap.class.getDeclaredField("sizeCtl"));
        TRANSFERINDEX = U.objectFieldOffset
            (ConcurrentHashMap.class.getDeclaredField("transferIndex"));
        BASECOUNT = U.objectFieldOffset
            (ConcurrentHashMap.class.getDeclaredField("baseCount"));
        CELLSBUSY = U.objectFieldOffset
            (ConcurrentHashMap.class.getDeclaredField("cellsBusy"));

        CELLVALUE = U.objectFieldOffset
            (CounterCell.class.getDeclaredField("value"));

        ABASE = U.arrayBaseOffset(Node[].class);
        int scale = U.arrayIndexScale(Node[].class);
        if ((scale & (scale - 1)) != 0)
            throw new Error("array index scale not a power of two");
        ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
    } catch (ReflectiveOperationException e) {
        throw new Error(e);
    }

    // Reduce the risk of rare disastrous classloading in first call to
    // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
    Class<?> ensureLoaded = LockSupport.class;
}

通过 get 方法,我们很容易就得知了 ConcurrentHashMap 的基本构造。不要高兴得太早,这才刚刚开始而已,对于并发操作,我们还没开始看!接下来赶紧看 putVal 方法。这个方法很长,我们分段来看:

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }

这个 for 循环是不是感觉很熟悉!看来大家的写法都差(xiang)不(hu)多(chao)。如果乃认真看过老夫以前写的那个循环队列的博客,那么这里的逻辑就相当好理解了。

对于 tab 还没创建的情况调用 initTable,这个先放一放。在第一个 else 子句中,如果这个桶还没创建,那就创建一个。需要考虑的是其他线程可能也在同时也在这个位置创建或者移动结点,所以就 CAS 一下(C# 里面对应的是 CompareExchange),如果失败,就在下一个 for 循环重试就好了,在下一次 for 循环时,结点可能已经被其他线程创建完或者移走了,此时会走到其他 if 的分支里去。另外还是要唠叨一下,CAS 的判断最好是在操作都很短的时候。

else if ((fh = f.hash) == MOVED)
    tab = helpTransfer(tab, f);

MOVED 是一个相当蛋疼的情况,我们也先放在一边。

else {
    V oldVal = null;
    synchronized (f) {
        if (tabAt(tab, i) == f) {
            if (fh >= 0) {
                binCount = 1;
                for (Node<K,V> e = f;; ++binCount) {
                    K ek;
                    if (e.hash == hash &&
                        ((ek = e.key) == key ||
                         (ek != null && key.equals(ek)))) {
                        oldVal = e.val;
                        if (!onlyIfAbsent)
                            e.val = value;
                        break;
                    }
                    Node<K,V> pred = e;
                    if ((e = e.next) == null) {
                        pred.next = new Node<K,V>(hash, key,
                                                  value, null);
                        break;
                    }
                }
            }

如果已经有对应的结点了,就把这个结点先锁上(也就是说,同一个结点只能一个线程在操作)。fh >= 0 的判断在 get 里面已经见过了,说明它是一个链表的结点。此时遍历这个链表,如果已经有这个 key 了,就更新,不然就插入到链表的末尾。

                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

对于红黑树的结点,进行类似的操作,这里我们把具体逻辑忽略掉。ReservationNode 的逻辑暂时看不出来是啥,这个暂时不管。最后 binCount 用来判断是不是需要把结点扩充为红黑树,以便在冲突结点过多时,提高查找的效率。最后的 addCount 方法,可想而知也很复杂,因为它牵扯到哈希表是否需要扩容。

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }

哇哦,一看就很复杂。最简单的情况,如果当前扩容操作还没开始,或者 baseCount 的数量成功增加(说明没有其他线程修改了),那么皆大欢喜,简单地完成了计数操作。不然的话,就要用到我们之前“不明觉厉”的 CounterCell 这个东西了。我们首先看一下 CounterCell 的定义(CounterCell 使用的是称为 Striped64 的累加器,关于它的分析参考自这里):

    static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

我们想一下如何对哈希表进行计数。假如只有一个线程访问,那毫无疑问直接把计数器更新就行了(在这里类中 baseCount 用于记录这个数值)。如果有一堆线程同时更新哈希表,那么会对这个变量进行竞争(我们上面看到的这个对 baseCount 进行 compareAndSwap 的操作就是),这样做会导致大部分线程 CAS 频繁失败,会降低性能。

为了分散这些线程对一个常用的变量的冲突,Java 1.8 提供了 Striped64 累加器,其思想就是维护一个基础值以及一个哈希表,没有竞争时直接更新基础值;如果有竞争,则将要更新的值先累加到哈希表的某个单元里面,这样就减少了冲突。所以,整个值就包括了基础值加上哈希表里所有值的总和。

ConcurrentHashMap 就在内部山寨了一下 Stripe64 的实现。这样 addCount 就能够理解是在干啥了。

  • 首先判断有没有 CounterCell,如果没有就说明【可能】没有其他线程竞争,接着尝试更新 baseCount,如果失败则说明有其他线程正在更新,否则 baseCount 更新完毕。
  • 找出这个线程所对应的 CounterCell(ThreadLocalRandom.getProbe() 可以用来干这种事),如果 CAS 成功,则表示没有其他线程在更新这个 CounterCell,更新完毕。
  • 否则调用 fullAddCount 进行某个 CounterCell 有冲突的情况时的更新操作。

我们来分析一下 fullAddCount 的代码。

private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }

这个用来初始化当前线程的哈希值,利用的是 ThreadLocalRandom 这个类针对每个线程提供的不同随机数种子,具体参考这个类的说明。

boolean collide = false;                // True if last slot nonempty
for (;;) {
    CounterCell[] as; CounterCell a; int n; long v;
    if ((as = counterCells) != null && (n = as.length) > 0) {
        if ((a = as[(n - 1) & h]) == null) {
            if (cellsBusy == 0) {            // Try to attach new Cell

死循环又来了,首先看一下对应的 CounterCell 是不是还没有创建。在这里 cellsBusy 是用于标记 CounterCell 是否正在被创建的自旋锁。

        CounterCell r = new CounterCell(x); // Optimistic create
        if (cellsBusy == 0 &&
            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            boolean created = false;
            try {               // Recheck under lock
                CounterCell[] rs; int m, j;
                if ((rs = counterCells) != null &&
                    (m = rs.length) > 0 &&
                    rs[j = (m - 1) & h] == null) {
                    rs[j] = r;
                    created = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (created)
                break;
            continue;           // Slot is now non-empty
        }
    }
    collide = false;
}

在这里 cellsBusy 被判断了很多次。如果当前线程成功地修改了 cellsBusy,就表示当前线程获取自旋锁成功,此时其他线程都要排队等待了。

    else if (!wasUncontended)       // CAS already known to fail
        wasUncontended = true;      // Continue after rehash
    else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
        break;
    else if (counterCells != as || n >= NCPU)
        collide = false;            // At max size or stale
    else if (!collide)
        collide = true;
    else if (cellsBusy == 0 &&
             U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
        try {
            if (counterCells == as) {// Expand table unless stale
                CounterCell[] rs = new CounterCell[n << 1];
                for (int i = 0; i < n; ++i)
                    rs[i] = as[i];
                counterCells = rs;
            }
        } finally {
            cellsBusy = 0;
        }
        collide = false;
        continue;                   // Retry with expanded table
    }
    h = ThreadLocalRandom.advanceProbe(h);
}

这一堆判断的注释写得很详细。counterCells != as 说明这个表已经被扩容过了(不再指向原来的表),则需要进入下一个循环重试,此外 counterCells 的长度被限制在 CPU 的个数内。最后一个 else if 判断尝试获取 cellsBusy 锁对 CounterCell 进行扩容。

else if (cellsBusy == 0 && counterCells == as &&
         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
    boolean init = false;
    try {                           // Initialize table
        if (counterCells == as) {
            CounterCell[] rs = new CounterCell[2];
            rs[h & 1] = new CounterCell(x);
            counterCells = rs;
            init = true;
        }
    } finally {
        cellsBusy = 0;
    }
    if (init)
        break;
}

如果整个 CounterCell 表都没有被创建,那么就创建一个初始容量为 2 的冲突表了。

        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                          // Fall back on using base
    }
}

最简单的情况,如果 CounterCell 表有了并且对应的格子也有了,直接尝试更新这个格子即可。

现在回过头来继续看 addCount 方法,sumCount 就是把这些乱七八糟的值合并起来作为哈希表的当前长度。

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

Hoo~ 别高兴得太早,addCount 方法我们只看完了第一个 if 语句,那么接下来的 if 语句就是用于检查哈希表是否需要进行扩容。

    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {

从成员变量的注释可以的得知,这里面的 sizeCtl 有以下几种取值:

  • 负数表示哈希表正在被创建或者扩容,-1 表示正在被创建, 比 -1 大的情况为 -(1+ 当前正在修改的线程数)。
  • 0 表示 table 为 null,或者只是初始容量(尚未被扩容过)。
  • 如果为正数,表示下一次需要扩容时的阈值。

哈希表的扩容是一个相当复杂操作,因为如果偷懒仅仅在扩容线程中锁定掉整个哈希表,那并发量就会急剧下降。所以 ConcurrentHashMap 提供了在扩容时同时允许多线程操作的能力,并且完全是基于 CAS 的。我们先来回顾一下上面看到的一些变量。

  • nextTable 指向在扩容操作时新生成的哈希表。每次扩容时,哈希表的容量会翻倍,这个操作只能由一个线程完成。
  • FowardingNode 这个结点类型(我们上面略过去了),用于连接这两个哈希表(是不是听起来感觉和 Redis 的 Skiplist 有点像?不过 Redis 的实现是相当简化的)。而它的 find 方法是从 nextTable 里进行查找,而不是像普通结点那样在自身的链表或红黑树里查找。

我们要重点学习一下它的 transfer 方法。ConcurrentHashMap 允许多个线程同时扩容哈希表,每个线程负责其中一部分结点的扩容工作。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range

这里的 stride 变量用于计算每个线程每个结点负责转移的结点数量。如果只有一个 CPU,那么只有一个线程进行扩容工作。这里有一个最小值 MIN_TRANSFER_STRIDE,目前设置的是 16,也就是说每个线程至少要分配 16 个结点(据说是为了减少内存竞争,不太明白原因,谁来解释一下)。

if (nextTab == null) {            // initiating
    try {
        @SuppressWarnings("unchecked")
        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
        nextTab = nt;
    } catch (Throwable ex) {      // try to cope with OOME
        sizeCtl = Integer.MAX_VALUE;
        return;
    }
    nextTable = nextTab;
    transferIndex = n;
}

如果新的哈希表还没创建,则创建一个二倍容量的新表。这里没有进行并发控制,是调用它的方法来保证的。可以看到 transferIndex 的初始值为 n ,表示哈希表从后往前遍历进行扩容。

int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
    Node<K,V> f; int fh;
    while (advance) {
        int nextIndex, nextBound;
        if (--i >= bound || finishing)
            advance = false;
        else if ((nextIndex = transferIndex) <= 0) {
            i = -1;
            advance = false;
        }
        else if (U.compareAndSwapInt
                 (this, TRANSFERINDEX, nextIndex,
                  nextBound = (nextIndex > stride ?
                               nextIndex - stride : 0))) {
            bound = nextBound;
            i = nextIndex - 1;
            advance = false;
        }
    }

这个 while 循环用来找一个尚未被处理的结点,这里的 advanced 表示当前结点是否被处理过。大家有可能会感觉奇怪,既然每个线程被分配到了不同的区域,那为什么还要在这里找它要处理的呢?假如我们的哈希表长度为 1024,现在有 2 个线程在扩容,每个线程负责 64 个结点。假如这两个线程处理完后没有其他线程进来,那后面的结点就不会被处理了。这里的第二个 else if 就是处理这种情况,找下一个区间进行处理(当然由于可能会和别的线程竞争同一个区域,所以要 CAS 一下)。如果发现所有结点都已经被处理或正在被处理(transferIndex <= 0)的,那就可以歇菜了。

if (i < 0 || i >= n || i + n >= nextn) {
    int sc;
    if (finishing) {
        nextTable = null;
        table = nextTab;
        sizeCtl = (n << 1) - (n >>> 1);
        return;
    }
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
        finishing = advance = true;
        i = n; // recheck before commit
    }
}

如果都处理完了,就把下一个阈值设为现在容量的 0.75 倍(就是有两个位运算的那个 sizeCtl 赋值的地方)。当然还是要考虑竞争的情况。

else if ((f = tabAt(tab, i)) == null)
    advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
    advance = true; // already processed

如果旧表中当前结点为 null,放置一个 ForwadingNode 结点,否则说明这个区间已经被其他线程占用了,直接进入下一次循环找其他区间。回想一下在 put 的时候,看到这个结点说明哈希表正在被扩容,会调用 helpTransfer 方法,此时会判断哈希表是否还有帮忙扩容的“名额”,如果没有直接返回进入下一次循环。

            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                         ...
                        }
                    }

上面就是真正的复制结点操作了(省略了 TreeBin 的情况)。初看起来有点奇怪,好像和原来的计算方法不同。仔细阅读后发现 (ph & n) == 0 这个判断把结点分成了两种,等于 0 的和不等于 0 的,这两种结点分别放在 i 和 i + n 位置上(因为扩容后,这样结点就被分散开了)。对于 TreeBin,还要额外考虑是不是需要从红黑树退化成链表(因为结点内的数量可能变少了)。

这里就能看出为什么要求哈希表的长度每次要翻倍了:这样可以保证扩容后每个元素要么挪到 i + n 的位置要么还在原地,这样多线程同时扩容就方便了太多。如果不这样做,每个结点的新位置可能会发生变化,那就会相当复杂了。

另外可以看到在扩容过成中会把头结点锁掉,而在 putVal 方法同样会对结点进行加锁。这样,如果某个桶在迁移完成之前,不接受新元素的插入。这样就简化了 put 操作的过程,不然 put 时桶同时在扩容,又得想一些新法子支持同时操作了(还有删除操作哦,啊哈哈)。(实际上老夫认为不加锁是可行的,只是会复杂很多。估计写这个库的人认为,这个并发粒度已经足够了。)

在判断当前线程是否需要进行辅助扩容时,用的判断是下面这个,这个就交给乃自己去阅读了。

if (check >= 0) {
    Node<K,V>[] tab, nt; int n, sc;
    while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
           (n = tab.length) < MAXIMUM_CAPACITY) {
        int rs = resizeStamp(n);
        if (sc < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                transferIndex <= 0)
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                transfer(tab, nt);
        }
        else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                     (rs << RESIZE_STAMP_SHIFT) + 2))
            transfer(tab, null);
        s = sumCount();
    }
}

呼~ConcurrentHashMap 的主要原理总算是研究清楚了。此外还有迭代器的实现。而迭代器的实现,估计到时候会和别的容器比较一下,再进行研究吧。

更新:Java 与 Redis 的哈希表实现的比较

老夫以前面试的时候被问过 Redis 里对哈希表扩容的具体实现,所以这里来记录一下。

Redis 的哈希表毫无疑问必须要支持多线程访问,但是其实现方式与 ConcurrentHashMap 相比要简化了很多(面试官为什么不问 ConcurrentHashMap 的细节我估计是他自己也搞不清)。

Redis 的哈希表内部使用了一个 ht[2] 的数组,看起来就和 ConcurrentHashMap 一样其中一个指向扩容后的新哈希表。但是在扩容时,Redis 在修改元素时,如果哈希表正在扩容,而此结点(指为了解决冲突而设计的链表)没有迁移,先将结点迁移到新的哈希表,然后将修改反应到新的哈希表上。

可以看出,Redis 每一次操作最多只需要把一个结点移动到新的位置;而 ConcurrentHashMap 在扩容操作完成之前方法都不会返回(在 helpTransfer 方法中,最外层的 for 的死循环只有迁移完成后才有 return 语句),否则该线程又会找下一个没处理的区域进行处理。

所以:

  • Redis 保证了每一次更新操作的返回速度都不会太慢,缺点是哈希表可能会长期处于迁移状态。如果有一个结点一直没有被访问,那么这个结点一直就不会被迁移。这样导致的后果就是:可能会长时间占用多余的空间;在扩容完成前不能再次扩容,可能会出现负载因子已经很大了,但由于迁移还没完成所以不能进行下一次扩容。
  • ConcurrentHashMap 想办法让哈希表尽快完成扩容。如果只有少数线程而哈希表又很大的情况下,每个线程处理的结点较多,还有可能要处理红黑树退化的情况,更新操作可能长时间不会返回。但如果访问的线程多(而且和 CPU 的数量有关),每个线程分配的任务较少,则速度会加快。

为什么要根据 CPU 的数量来分配任务指标呢?乃想想如果只有一个 CPU(这里说的 CPU 数量是指核心数),但有 100 个线程来帮忙扩容哈希表。由于只有一个 CPU 同一时间只能运行一个线程,那么在一定的时间内多线程处理比单线程处理的数量不会多(加上线程开销的话还不如一个线程处理)。如果有 N 个 CPU,那机器可以同时跑 N 个线程,这样“共同协助”扩容才会使得速度加快。

✏️ 有任何想法?欢迎发邮件告诉老夫:daozhihun@outlook.com