ConcurrentHashMap的原理

cnblogs 2024-08-10 12:09:00 阅读 60

背景

我们知道hashmap是一个线程不安全的数据结构,在多线程编程的时候,多个线程同时向hashmap中put元素的时候,会发生数据丢失。多线程put操作后,再get操作导致死循环。

多线程put非NULL元素后,get操作得到NULL值。

使用

为了保证并发安全,我们使用hashmap的时候,建议是使用ConcurrentHashMap。

底层原理

1.7的时候,底层数据结构是大数组Segment(容量为16)和小数组HashEntry。默认是16个Segmement,每个HashEntry会存放一些键值对或者链表。

Segment继承了可重入锁,有加锁和释放锁的操作,这样就能保证多个线程访问ConcurrentHashMap的时候,同一时间只能有一个线程能够操作相应的节点,保证了线程安全。

性能相对于hashtable而言,效率提高了16倍。即当线程访问一个Segment 的时候,只对这一个Segment加锁,对于其他段的Segment,则可以继续被其他线程访问,不会有冲突。

1.8的时候,底层数据结构更新为数组+链表+红黑树。1.8不再有分段锁的设计,而是采用CAS和synchrionzed来保证并发安全。

CAS主要是用于put的过程中进行初始化,synchronzied主要是用于往map中插入元素的时候保证线程安全。

采用CAS自旋重试的方式进行初始化,是为了保证只有一个线程完成map的初始化问题,因为多个线程同时初始化,会产生数据丢失的问题。这边使用CAS原子操作,通过修改sizeCtl变量成功与否来代表是否抢占到了锁。如果抢占到了,由该线程完成map的初始化工作;

如果没抢占到,那就进行While循环,自旋重试,直到该map初始化成功,循环自动退出,自旋也随之结束。

初始化结束以后,对于一个put(key, val)操作,首先计算出该key的哈希值,从而得到该key在数组中的插入位置。

首先使用CAS的方式插入key-val, 先判断该位置是否为null, 为null, CAS插入元素。如果成功,则该key-val成功插入;

如果不为null,说明发生了碰撞,改用synchronized关键字,对头节点加锁,然后将key-val插入链表或者红黑树。

插入前,判断是链表还是红黑树,不同的结构处理方式不同;

如果是链表,那么从链表的头节点开始向下遍历,遍历的每个节点:

使用equlas判断key相等否,相等,则修改该key 的value; 否则,把当前的key value插入链表的最后一个节点。

如果是红黑树,那么putTreeVal完成值的存储。

要说明的是,这种锁控制在单个数据节点上,16位的数组可以支持16个线程并发写入数据。

源码

<code>public V put(K key, V value) {

return putVal(key, value, false);

}

/** Implementation for put and putIfAbsent */

final V putVal(K key, V value, boolean onlyIfAbsent) {

//1、数据检查

if (key == null || value == null) throw new NullPointerException();

//2、求key哈希

int hash = spread(key.hashCode());

int binCount = 0; //记录遍历的节点数,可以用于判断是否要链表转化为红黑树

for (Node<K,V>[] tab = table;;) { //死循环

Node<K,V> f; int n, i, fh;

if (tab == null || (n = tab.length) == 0) //检查 table 是否初始化

tab = initTable();

//使用哈希值计算索引 i 并检查该位置是否为空。如果为空,使用 CAS 操作插入新节点,并跳出循环。

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {

if (casTabAt(tab, i, null,

new Node<K,V>(hash, key, value, null)))

break; // no lock when adding to empty bin

}

else if ((fh = f.hash) == MOVED) // MOVEDd标志用于判断是否已经节点迁移

//当一个桶(bin)中的所有节点都被迁移到新的数组中后,原来的位置上会放置一个特殊的转发节点,表示这个桶已经处理完毕。此时,转发节点的 hash 字段会被设置为 MOVED(即 -1)。

tab = helpTransfer(tab, f);//协助迁移

else { //如果碰撞了 需要使用synchronized,放弃cas,f是table那个碰撞节点

V oldVal = null;

synchronized (f) {

if (tabAt(tab, i) == f) { // 经典的双重检查,防止当前线程获取table锁之前,tabAt(tab, i)被其它线程改变了

if (fh >= 0) {// 哈希值>=0代表是链表,<0代表是红黑树

binCount = 1;

for (Node<K,V> e = f;; ++binCount) {

K ek;

if (e.hash == hash &&

((ek = e.key) == key ||

(ek != null && key.equals(ek)))) { // 三比较,hashcode==hashcode,key==key,key.equals(key)

oldVal = e.val;

if (!onlyIfAbsent)

e.val = value;

break;

}

Node<K,V> pred = e;

if ((e = e.next) == null) {

pred.next = new Node<K,V>(hash, key,

value, null);

break;

}

}

}

else if (f instanceof TreeBin) { // 红黑树

Node<K,V> p;

binCount = 2; //binCount 被初始化为 2,因为红黑树中的节点数计算方式不同于链表。 具体原因我不知道

if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,

value)) != null) {

oldVal = p.val;

if (!onlyIfAbsent)

p.val = value;

}

}

}

}

if (binCount != 0) { // 判断是否要扩容 TREEIFY_THRESHOLD=8

if (binCount >= TREEIFY_THRESHOLD)

treeifyBin(tab, i);

if (oldVal != null)

return oldVal;

break;

}

}

}

addCount(1L, binCount); //计数 里面通过cas维护元素个数。

return null;

}



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。