读Java7/8 中HashMap和ConcurrentHashMap源码,理解其数据结构与并发处理。
文章参考了https://javadoop.com/post/hashmap#toc_16,图片也是直接引用这位博主的
Java7 HashMap
总体结构
java7中,HashMap是一个数组,数组每个节点是一个单向链表,链表的每个节点是HashMap的一个元素。
几个重要的变量
loadFactor 负载因子,默认0.75
capacity 当前数组容量,2^n,每次扩容*2
size HashMap已存在元素数量
threshold 扩容阈值,=loadFactor*capacity
初始化
构造函数初始化很简单,主要是新建数组(new Entry[capacity])
1 | /** |
hash算法
1 | /** |
useAltHashing参数由配置项-Djdk.map.althashing.threshold决定是否开启。capacity>=threshold时开启。开启后会减少碰撞率
- 对字符串使用stringHash32单独计算hashCode
- hash时增加hashSeed参数
put
流程分三步:
- key=null, 直接去table[0]处理
- 计算key的hash、在数组中的index。遍历对应数组位置链表中元素,equals则替换旧值
- 不存在旧值则addEntry
1 | public V put(K key, V value) { |
put时,如果key不存在,则新增一个entry。
- 判断是否需要扩容。扩容时capacity*2,并重新计算key在数组中的index
- 将数组添加到数组对应index处链表的表头
1 | /** |
1 | //将新Entry放到链表表头 |
数组扩容
前面新增Entry的代码中,如果size>threshold(扩容阈值(capacity * loadFactor)) table扩容2倍
1 | //扩容 |
经过扩容之后,可能原来某个节点中的元素会被分到新数组的两个节点。例如原来数组长度16,table[0]位置的链表元素,现扩容后,会分配到新数组的newTable[0]和newTable[16]中。
扩容后是否重新计算hash值
看以下代码useAltHashing变了,hash值肯定也变了。这时候就重新计算每个元素的hash值
1 | boolean oldAltHashing = useAltHashing; |
get
get操作很简单
- key=null 固定取table[0]
- 对key进行hash,计算出数组index
- 遍历table[index]中链表,取equals(key)的元素
1 | public V get(Object key) { |
1 | /** |
Java7 ConcurrentHashMap
总体结构
ConcurrentHashMap由一个个Segment槽组成 ,每个槽中存放HashEntry数组,数组上节点存放链表。
Segment数组不可扩容,每个槽内的HashEntry数组可以扩容。
几个重要的变量
concurrencyLevel 并发级别/并发Segment数量
initialCapacity 指整个map数组容量,均分给Segment
loadFactor 负载因子,实际中给Segment内部使用
初始化
初始化segments数组,并初始化第一个元素
1 | /** |
put
大致流程:
- 根据key hashCode定位到Segment,如果空则初始化Segment
- 获取目标Segment的ReentrantLock锁
- put操作,遍历链表,这里与HashMap类似。可能会触发扩容
- 释放锁
根据key的hashCode定位到Segment后,在槽内进行put操作
1 | /** |
初始化槽
put时,如果根据key hashCode定位到的Segment为null时,需要先初始化一下
1 | /** |
注意最后的cas循环。 处理了多个线程同时初始化槽的情况
Segment内部put操作
Segment内部put用ReentrantLock来保证线程安全,锁住当前Segment
- 获取锁
- 遍历链表元素,进行put操作(这一步与HashMap相似)
- 释放锁
1 | /** |
scanAndLockForPut
注意上面put时代码
1 | //尝试获取Segment独占锁 |
如果第一次tryLock失败,进入scanAndLockForPut继续获取
put获取循环获取当前Segment独占锁,并实例化node(如果key不存在)
- 循环获取锁
- 循环获取锁时顺便遍历当前链表的每个节点,看key是否存在,不存在初始化node
- 直到获取到锁
1 | /** |
扩容
segment数组不能扩容,扩容只对某个segment槽下的HashEntry数组扩容,每次扩容容量*2(这一点与HashMap相似)
1 | /** |
扩容比HashMap要复杂一些。 作了一处优化
- 将oldTable某个节点的链表,最后一段链表(保证在newTable位置相同),直接把引用指向newTable[index]
- 剩下的链表节点一个个赋值
如果链表最后一段节点index一致的很多,这样可以提高效率
get
代码很简单,根据key定位Segment、槽内数组链表,遍历链表得到目标数据
1 | public V get(Object key) { |
Java8 HashMap
Java8的HashMap数据结构上是数组+链表+红黑树组成。
如果数组某个位置上的链表长度>=8,会将链表转换为红黑树,在这个位置上查找元素的效率为O(logN)。
初始化
1 | public HashMap() { |
Java8初始化HashMap有三个构造函数,与Java7相比只涉及参数。不实际创建数组。
注意this.threshold = tableSizeFor(initialCapacity); 初始化后,在第一次put的时候会触发一次扩容,根据扩容阈值threshold新建数组,新数组newCap=threshold(这点与java7不同)
hash算法
1 | /** |
为什么要有扰动函数处理?
注意:元素在数组位置index= (table.length - 1) & hash
如果直接取key的hash值的话,即时hashCode()返回的hash值很松散。如果hash低位重复多,高位不重复,得到的index也会有很多重复的。为了减少hash碰撞,同时考虑高位与低位数据。
put
1 | public V put(K key, V value) { |
封装了一层,主要看putVal方法
1 | /** |
put很简单,看一遍代码就ok了。还是遍历数组对应位置节点的元素,链表和红黑树两种遍历方式。 put元素后检查一下链表长度>=8转换为红黑树。
扩容
1 | /** |
扩容操作,与java7有些不同,table为空时只要调节几个参数就可以了,不为空时走具体扩容逻辑并处理数组各节点数据。
get
1 | public V get(Object key) { |
封装了一层,看下面
1 | /** |
看代码,很简单。
Java8 ConcurrentHashMap
总体结构
Java8 ConcurrentHashMap数据结构与HashMap一致,数组+链表/红黑树。 在操作上为了保证线程安全性,要复杂很多(主要是迁移代码复杂)
初始化
看代码
1 | /** |
1 | /** |
put
看代码。 计算hash找到数组对应位置头节点f,给f加监视器锁来保证线程安全。
1 | public V put(K key, V value) { |
1 | /** |
扩容
扩容的几个时机
- put时,元素当前位置链表长度>=8且table长度<64。 只执行扩容不转换红黑树
- 新增节点后,调用addCount记录元素个数,并检查是否扩容,当数组元素达到阈值后,扩容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51/**
* 数组扩容
* @param size 要扩容到的size(已经是size*2了)
*/
private final void tryPresize(int size) {
//size*1.5+1 向上取2^n
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
//数组为空时,进行初始化数组操作,与initTable()基本一样
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
"unchecked") (
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
//c<=sc(其他线程扩容过了 or 正在扩容) or n 超过数组限制最大值,直接退出
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
//迁移table,两次transfer
else if (tab == table) {
int rs = resizeStamp(n);
//第二次
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//CAS将SIZECTL+1,执行transfer
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 第一次 SIZECTL置为负数
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}数据迁移
这部分代码比较复杂
数据迁移的两个触发时机: - 其他线程操作时,检测到头节点f.hash==MOVED,说明正在数据迁移,CAS获取SIZECTL锁后执行迁移
- 扩容链表后,迁移数据
迁移数组的方法很复杂,看代码,每次只扩容其中一小段1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178/**
* 迁移数组
* @param tab
* @param nextTab
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//有n个位置需要迁移、将n分为多个任务包,每个任务包有stride个任务
int n = tab.length, stride;
//stride 单核下为n,多核下为n/8,最小为16 每次迁移任务包的任务个数
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//nextTab为null,先进行一次初始化(第一个发起迁移的线程调用此方法时,nextTab为null)
if (nextTab == null) { // initiating
try {
"unchecked") (
//数组容量翻倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
//赋值给全局变量
nextTable = nextTab;
//全局变量,控制迁移的位置
transferIndex = n;
}
int nextn = nextTab.length;
// 正在被迁移的Node,hash为MOVED, 标识用
// 原数组某个节点处设置为这个ForwardingNode,表示此处已被其他线程吹过了
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//表示做完一个位置的迁移工作,可以准备做下一个位置的了
boolean advance = true;
// 表示所有迁移操作已完成
boolean finishing = false; // to ensure sweep before committing nextTab
//i为位置索引,bound =为边界 迁移bound-i之间数据
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//while循环计算需要迁移的左右边界
//advance为true表示可以进行下个位置迁移
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
//nextIndex指向迁移位置,,如果<=0说明其他线程处理迁移了,把这个位置往前移到0
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//获得锁
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
//将迁移边界位置nextBound(左边界),也就是上次迁移位置往前移动stride个
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//最后一次迁移的线程,nextTable赋值给原来的table,退出
nextTable = null;
table = nextTab;
//sizeCtl 修改新数组长度的0.75倍
sizeCtl = (n << 1) - (n >>> 1);
return;
}
/**
* 判断迁移是否结束
* 1、前面每次迁移会将sizeCtl 加 1,这里每次-1代表做完了
* 2、sizeCtl迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2,如果相等说明迁移结束了
*/
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//迁移结束,直接退出
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//到这里迁移任务都做完了,下次循环进入if (finishing)分支
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果i处位置是空的,把ForwardingNode放入节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//此处已经是ForwardingNode了,
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//对数组该处头节点加锁,开始迁移工作
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//头节点hash>0,说明是链表的node节点
if (fh >= 0) {
//核心思想是扩容时将1条链表分成2部分,lastRun(不含)之后的节点与头节点f在新数组中不在一个hash位置上
//找到lastRun及之后的及节点一起迁移;lastRun之前的节点需要进行克隆,判断hash分到两个链表中。
// 这样在lastRun节点很靠前时可以显著提升效率
int runBit = fh & n;
Node<K,V> lastRun = f;
//循环找到lastRun(lastRun是最后1个与头节点hash不同的元素)
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
//这里分为ln和hn代表低位hash和高位hash,数组扩容2倍后,分别放到i和i+n处
if (runBit == 0) {
//头节点hash==0,说明头节点在新数组的i+n上,lastRun后面都在i上
ln = lastRun;
hn = null;
}
else {
//头节点hash>0,说明头节点在新数组的i上,lastRun后面都在i+n上
hn = lastRun;
ln = null;
}
//链表分为两部分,lastRun前存i,lastRun及之后元素存i+n位置
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
//迁移完毕
advance = true;
}
//红黑树处理
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
//迁移完毕
advance = true;
}
}
}
}
}
}get
简单,直接代码。 通过hash确定位置,然后遍历链表or红黑树1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//判断头节点是否是get的元素
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//说明正在扩容或者为红黑树
else if (eh < 0)
//调用红黑树的find查找
return (p = e.find(h, key)) != null ? p.val : null;
//遍历链表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}总结
数据结构
Java7
HashMap 数组+链表
ConcurrentHashMap Segment数组+节点数组+链表
Java8中
HashMap 数组+链表/红黑树
ConcurrentHashMap 数组+链表/红黑树
并发操作
Java7、8 HashMap线程不安全,不支持并发
Java7 ConcurrentHashMap,通过ReentrantLock获得Segment上的独占锁来对节点内数组操作。CAS初始化Segment。扩容时用volatile保持内存可见性
Java8 ConcurrentHashMap 通过给数组上头节点加监视器锁的方式来保证线程安全。扩容时CAS操作SIZECTL
看完一遍后,ConcurrentHashMap中的扩容和数据迁移代码比较难懂,其他地方都可以理解,不少地方看到后面就理解了。