ConcurrentHashMap 源码学习

本文 java version “1.8.0_221”

结构

与 HashMap 类似,使用数组 + 链表 + 红黑树存储键值对

upload successful

属性字段

transient volatile Node<K,V>[] table; // 存放 bin,第一次插入数据时候进行初始化,长度为 2 的倍数

private static final int MIN_TRANSFER_STRIDE = 16 // 扩容线程每次最少要迁移16 个 hash 桶,在扩容中,参与的单个线程允许处理的最少 table 桶首节点个数,虽然适当添加线程,会使得整个扩容过程变快,但需要考虑多线程内存同时分配的问题

private transient volatile int sizeCtl;

  • 默认为 0
  • -1 表示正在初始化
  • -(1+number) 表示有 number 个线程同时在扩容,必须竞争到这个共享变量,才能进行初始化或者扩容。
  • 正数, table 中元素数量阈值,超过这个阈值就会扩容

upload successful

static final int MOVED = -1; // hash for forwarding nodes

  • ForwardingNode,在扩容时使用,如果 index 处 hash 值为 -1,表示正在扩容。

static final int TREEBIN = -2; // hash for roots of trees

static final int RESERVED = -3; // hash for transient reservations

static final int HASH_BITS = 0x7fffffff; // usable bits of normal

哈希桶 Table 初始化

  • 根据共享变量 sizeCtl 的值来决定是否由当前线程执行初始化操作(单线程进行初始化)

  • sizeCtl 为正数时,表示 table 的阈值(= 0.75*n),元素个数超过这个值将会扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
  // 如果共享变量 sizeCtl < 0,说明有其它线程正在初始化或者扩容,让出 CPU,让其它线程先执行完
       if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
       //
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
  // 再判断 table 是否为空
               if ((tab = table) == null || tab.length == 0) {
              // sc = sizeCtl > 0 表示已经初始化,否则使用默认容量 16。
                   int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
                   // sc = 0.75*n,sc 表示阈值
                   sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

key 对应到哈希桶的过程

1
2
3
4
5
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

index = spread(key) & (length - 1)
  • hash 过程与 HashMap 类似,只是多了与 HASH_BITS 进行位运算

  • HASH_BITS 除了首位是 0,剩下的都是 1,按位与,得正数(首位为0)。就是为了让上面的 hash 值为正数

get 方法

  1. key,value 都不能为 null,key 为 null 时抛出 NullPointerException

  2. 将 key 进行 hash 然后找到数组位置处的索引 index

  3. 若 index == key,直接返回 index 处元素

  4. 若 index 处的 hash 值小于 0,进一步调用 Node 子类的 find 方法

  5. index 处的 hash 值不小于 0,遍历查找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static final int TREEBIN   = -2; // hash for roots of trees

public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
  // 如果找到这个节点,直接返回
       if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
       // 如果 hash 值小于 0,调用 Node 的查找方法,Node 可以为链表或树
       else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

put 方法

  1. table 为 null 或者 tab.size = 0,进行 resize。

  2. key 进行 hash 之后取模得到的索引位置,若在桶的位置元素为 null,那么直接插入元素。

  3. 桶位置元素不为 null,那么进行比较

  4. 判断 key 是否相同:

    • 如果 key 相同,e 保存该节点
    • 如果 key 不同,如果该是红黑树节点,那么执行红黑树的 put 方法;如果是链表节点,执行链表遍历操作,找到对应的节点并用 e 保存,如果链表长度大于 >=7,就将链表转为红黑树
  5. Node e 保存找到的节点,如果没有找到返回 null

  6. 如果 size 超过阈值,进行扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
// key value 均不能为 null
   if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
   // 获取当前 table,进入死循环,直至插入成功
   for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
       // 为空需要进行初始化
       if (tab == null || (n = tab.length) == 0)
           tab = initTable();
       // 索引所在位置没有元素,通过 casTabAt 方法插入元素并跳出循环
       else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
           if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))){
               break;                   // no lock when adding to empty bin
}
}
       // 索引位置处 hash = -1,表示其它线程正在扩容,helpTransfer 将帮助扩容,且元素赋值到扩容后的位置
       else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
       // 该位置处节点为普通节点,锁住该链表头结点并在尾部添加节点
       else {
V oldVal = null;
synchronized (f) {
          // 加锁之后再判断索引位置处 key 是否相同
               if (tabAt(tab, i) == f) {
              // hash >= 0,通过链表遍历方式找到并替换节点
                   if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
                              // 进行替换
                                   e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
                          // 链表尾部插入节点
                               pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
                   // index 处索引小于 0 (hash = -2 TREEBIN),并且是树节点,执行树的插入方法
                   else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// binCount != 0 说明向链表或者红黑树中添加或修改一个节点成功
           // binCount == 0 说明 put 操作将一个新节点添加成为某个桶的首节点
           if (binCount != 0) {
          // >= 8 就转为红黑树
               if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
               // oldVal != null 说明此次操作是修改操作,直接返回旧值,无需下面扩容检查
               if (oldVal != null)
return oldVal;
break;
}
}
}
   // 判断是否需要扩容
   addCount(1L, binCount);
return null;
}

size()

  • baseCount 记录元素个数,通过 addCount 更新 baseCount,当更新失败时,addCount 中的 fullCount 会更新 counterCells

  • 元素个数 = baseCount + counterCells.length

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}

final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

扩容方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189

// 返回表长度 n 的标志位
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

// 帮助扩容
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
   // tab 不为 null,传进来的节点是 ForwardingNode,并且 ForwardingNode 下一个 tab 不为 null
   if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
       // 生成标志位 rs
       int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
           // 如果 sizeCtl 无符号右移 16 位与上面标志位不同
           // 或者 sizeCtl == rs + 1 (扩容结束了,不再有线程进行扩容)
           // 或者 sizeCtl == rs + 65535 (如果达到最大帮助线程的数量,即 65535)
// 或者转移下标正在调整 (扩容结束)
           // 结束循环
           if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
           // 以上情况不满足, sizeCtl++ ,增加一个线程进行扩容
           if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
               // 复制或者移动 bins 里面的 Node 到新的 table
               transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}


// 移动或者复制 Node 到新的 table
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
   // 计算单个线程允许处理的最少 table 桶首节点个数,不能小于 16
   if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
   // 如果刚开始扩容,就初始化 nextTab
   if (nextTab == null) {           // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
       // transferIndex 指向最后一个桶,方便从后向前遍历
       transferIndex = n;
}
int nextn = nextTab.length;
   // fwd 作为标记,标记那些已经完成迁移的桶
   ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
   // i 指向当前桶,bound 指向当前线程需要处理的桶结点的区间下限
   for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
           // transferIndex 本来指向最后一个桶,小于等于 0 说明处理完成   
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
           // 更新 transferIndex,处理的桶区间为 (nextBound,nextIndex)
           else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
       // 当前线程任务处理完成
       if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
       // 待迁移的桶为 null,在此位置添加 ForwardingNode 标记该桶已经处理过
       else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
       // 该桶已经处理过
       else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
      // 锁住该桶
           synchronized (f) {
          // 再判断下桶有没有发生变化
               if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
// 整个 for 循环为了找到整个桶中最后连续的 fh & n 不变的结点
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 如果 fh&n 不变的链表的 runbit 都是 0,则 nextTab[i] 内元素 ln 前逆序,ln 及其之后顺序
// 否则,nextTab[i+n]内元素全部相对原table逆序
// 这是通过一个节点一个节点的往 nextTab 添加
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
                       // 把两条链表整体迁移到 nextTab 中
                       setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 将原桶标识位已经处理
setTabAt(tab, i, fwd);
advance = true;
}
// 红黑树的复制算法
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
  • sizeCtl == rs + 1 扩容结束了,不再有线程进行扩容,这个判断可以在 addCount 方法中找到答案:默认第一个线程设置 sc == rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1。
    如果 sizeCtl == 标识符 + 1 ,说明库容结束了,没有必要再扩容了。

博客参考

并发编程——ConcurrentHashMap#helpTransfer() 分析

为并发而生的 ConcurrentHashMap(Java 8)

https://www.iteye.com/blog/wujiu-2378812