Chiriri's blog Chiriri's blog
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)

Iekr

苦逼后端开发
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)
  • JavaSE

  • JavaEE

  • Linux

  • MySQL

  • NoSQL

  • Python

  • Python模块

  • 机器学习

  • 设计模式

  • 传智健康

  • 畅购商城

  • 博客项目

  • JVM

  • JUC

    • 多线程
    • CompletableFuture
    • 锁
    • LockSupport与线程中断
    • Java内存模型之JMM
    • Volatile与Java内存模型
    • CAS
    • 原子操作类
    • ThreadLocal
    • Java对象内存布局和对象头
    • Synchronized与锁升级
    • AbstractQueuedSynchronizer之AQS
    • ReentrantLock、ReentrantReadWriteLock、StampedLock讲解
    • JUC总结
    • 并发集合
      • ConcurrentHashMap
        • 并发死链
        • JDK 8 原理
        • 成员属性
        • 构造方法
        • 成员方法
        • get流程
        • put流程
        • size计算流程
        • JDK 7 原理
        • 构造器分析
        • put 流程
        • rehash 流程
        • get 流程
        • size 计算流程
      • BlockingQueue
        • 加锁分析
        • put 操作
        • take 操作
        • 性能比较
      • ConcurrentLinkedQueue
        • 模仿 ConcurrentLinkedQueue
        • 初始代码
        • offer方法
      • CopyOnWriteArrayList
        • get 弱一致性
        • 迭代器弱一致性
        • 安全失败
      • Collections
      • SkipListMap
        • 底层结构
        • 成员变量
        • 构造方法
        • 比较器
        • put 方法
        • get 方法
        • remove 方法
  • Golang

  • Kubernetes

  • 硅谷课堂

  • C

  • 源码

  • 神领物流

  • RocketMQ

  • 短链平台

  • 后端
  • JUC
Iekr
2023-12-26
目录

并发集合

# 并发集合

image-20231226191222743

线程安全集合类可以分为三大类:

  • 遗留的线程安全集合如 Hashtable , Vector
  • 使用 Collections 装饰的线程安全集合,如:
    • Collections.synchronizedCollection
    • Collections.synchronizedList
    • Collections.synchronizedMap
    • Collections.synchronizedSet
    • Collections.synchronizedNavigableMap
    • Collections.synchronizedNavigableSet
    • Collections.synchronizedSortedMap
    • Collections.synchronizedSortedSet
    • 说明:以上集合均采用修饰模式设计,将非线程安全的集合包装后,在调用方法时包裹了一层 synchronized 代码块。其并发性并不比遗留的安全集合好。
  • java.util.concurrent.*

重点介绍 java.util.concurrent.* 下的线程安全集合类,可以发现它们有规律,里面包含三类关键词: Blocking、CopyOnWrite、Concurrent

  • Blocking 大部分实现基于锁,并提供用来阻塞的方法
  • CopyOnWrite 之类容器修改开销相对较重
  • Concurrent 类型的容器
    • 内部很多操作使用 cas 优化,一般可以提供较高吞吐量
    • 弱一致性
      • 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍 历,这时内容是旧的
      • 求大小弱一致性,size 操作未必是 100% 准确
      • 读取弱一致性

遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出 ConcurrentModificationException,不再继续遍历

# ConcurrentHashMap

三种集合:

  • HashMap 是线程不安全的,性能好
  • Hashtable 线程安全基于 synchronized,综合性能差,已经被淘汰
  • ConcurrentHashMap 保证了线程安全,综合性能较好,不止线程安全,而且效率高,性能好

集合对比:

  1. Hashtable 继承 Dictionary 类,HashMap、ConcurrentHashMap 继承 AbstractMap,均实现 Map 接口
  2. Hashtable 底层是数组 + 链表,JDK8 以后 HashMap 和 ConcurrentHashMap 底层是数组 + 链表 + 红黑树
  3. HashMap 线程非安全,Hashtable 线程安全,Hashtable 的方法都加了 synchronized 关来确保线程同步
  4. ConcurrentHashMap、Hashtable 不允许 null 值,HashMap 允许 null 值
  5. ConcurrentHashMap、HashMap 的初始容量为 16,Hashtable 初始容量为 11,填充因子默认都是 0.75,两种 Map 扩容是当前容量翻倍:capacity * 2,Hashtable 扩容时是容量翻倍 + 1:capacity*2 + 1

image-20231226184721465

工作步骤:

  1. 初始化,使用 cas 来保证并发安全,懒惰初始化 table

  2. 树化,当 table.length <64 时,先尝试扩容,超过 64 时,并且 bin.length> 8 时,会将链表树化,树化过程会用 synchronized 锁住链表头

    说明:锁住某个槽位的对象头,是一种很好的细粒度的加锁方式,类似 MySQL 中的行锁

  3. put,如果该 bin 尚未创建,只需要使用 cas 创建 bin;如果已经有了,锁住链表头进行后续 put 操作,元素添加至 bin 的尾部

  4. get,无锁操作仅需要保证可见性,扩容过程中 get 操作拿到的是 ForwardingNode 会让 get 操作在新 table 进行搜索

  5. 扩容,扩容时以 bin 为单位进行,需要对 bin 进行 synchronized,但这时其它竞争线程也不是无事可做,它们会帮助把其它 bin 进行扩容

  6. size,元素个数保存在 baseCount 中,并发时的个数变动保存在 CounterCell [] 当中,最后统计数量时累加

//需求:多个线程同时往HashMap容器中存入数据会出现安全问题
public class ConcurrentHashMapDemo{
    public static Map<String,String> map = new ConcurrentHashMap();
    
    public static void main(String[] args){
        new AddMapDataThread().start();
        new AddMapDataThread().start();
        
        Thread.sleep(1000 * 5);//休息5秒,确保两个线程执行完毕
        System.out.println("Map大小:" + map.size());//20万
    }
}

public class AddMapDataThread extends Thread{
    @Override
    public void run() {
        for(int i = 0 ; i < 1000000 ; i++ ){
            ConcurrentHashMapDemo.map.put("键:"+i , "值"+i);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 并发死链

JDK1.7 的 HashMap 采用的头插法(拉链法)进行节点的添加,HashMap 的扩容长度为原来的 2 倍

resize () 中节点(Entry)转移的源代码:

void transfer(Entry[] newTable, boolean rehash) {
    int newCapacity = newTable.length;//得到新数组的长度   
    // 遍历整个数组对应下标下的链表,e代表一个节点
    for (Entry<K,V> e : table) {   
        // 当e == null时,则该链表遍历完了,继续遍历下一数组下标的链表 
        while(null != e) { 
            // 先把e节点的下一节点存起来
            Entry<K,V> next = e.next; 
            if (rehash) {              //得到新的hash值
                e.hash = null == e.key ? 0 : hash(e.key);  
            }
            // 在新数组下得到新的数组下标
            int i = indexFor(e.hash, newCapacity);  
             // 将e的next指针指向新数组下标的位置
            e.next = newTable[i];   
            // 将该数组下标的节点变为e节点
            newTable[i] = e; 
            // 遍历链表的下一节点
            e = next;                                   
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

其原因,是因为在多线程环境下使用了非线程安全的 map 集合

JDK 8 虽然将扩容算法做了调整,改用了尾插法,但仍不意味着能够在多线程环境下能够安全扩容,还会出现其它问题(如扩容丢数据)

# JDK 8 原理

Java 8 数组(Node) +( 链表 Node | 红黑树 TreeNode ) 以下数组简称(table),链表简称(bin)

  • 初始化,使用 cas 来保证并发安全,懒惰初始化 table
  • 树化,当 table.length <64 时,先尝试扩容,超过 64 时,并且 bin.length> 8 时,会将链表树化,树化过程 会用 synchronized 锁住链表头
  • put,如果该 bin 尚未创建,只需要使用 cas 创建 bin;如果已经有了,锁住链表头进行后续 put 操作,元素 添加至 bin 的尾部
  • get,无锁操作仅需要保证可见性,扩容过程中 get 操作拿到的是 ForwardingNode 它会让 get 操作在新 table 进行搜索
  • 扩容,扩容时以 bin 为单位进行,需要对 bin 进行 synchronized,但这时妙的是其它竞争线程也不是无事可 做,它们会帮助把其它 bin 进行扩容,扩容时平均只有 1/6 的节点会把复制到新 table 中
  • size,元素个数保存在 baseCount 中,并发时的个数变动保存在 CounterCell [] 当中。最后统计数量时累加 即可

源码分析 http://www.importnew.com/28263.html

其它实现 [Cliff Click's high scale lib](

# 成员属性

变量

// 存储数组
transient volatile Node<K,V>[] table;

// 散列表的长度
private static final int MAXIMUM_CAPACITY = 1 << 30;	// 最大长度
private static final int DEFAULT_CAPACITY = 16;			// 默认长度

// 并发级别,JDK7 遗留下来,1.8 中不代表并发级别
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

// 负载因子,JDK1.8 的 ConcurrentHashMap 中是固定值
private static final float LOAD_FACTOR = 0.75f;

// 阈值
static final int TREEIFY_THRESHOLD = 8;		// 链表树化的阈值
static final int UNTREEIFY_THRESHOLD = 6;	// 红黑树转化为链表的阈值
static final int MIN_TREEIFY_CAPACITY = 64;	// 当数组长度达到64且某个桶位中的链表长度超过8,才会真正树化

// 扩容相关
private static final int MIN_TRANSFER_STRIDE = 16;	// 线程迁移数据【最小步长】,控制线程迁移任务的最小区间
private static int RESIZE_STAMP_BITS = 16;			// 用来计算扩容时生成的【标识戳】
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;// 65535-1并发扩容最多线程数
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;		// 扩容时使用

// 节点哈希值
static final int MOVED     = -1; 			// 表示当前节点是 FWD 节点
static final int TREEBIN   = -2; 			// 表示当前节点已经树化,且当前节点为 TreeBin 对象
static final int RESERVED  = -3; 			// 表示节点时临时节点
static final int HASH_BITS = 0x7fffffff; 	// 正常节点的哈希值的可用的位数

// 扩容过程:volatile 修饰保证多线程的可见性
// 扩容过程中,会将扩容中的新 table 赋值给 nextTable 保持引用,扩容结束之后,这里会被设置为 null
private transient volatile Node<K,V>[] nextTable;
// 记录扩容进度,所有线程都要从 0 - transferIndex 中分配区间任务,简单说就是老表转移到哪了,索引从高到低转移
private transient volatile int transferIndex;

// 累加统计
// LongAdder 中的 baseCount 未发生竞争时或者当前LongAdder处于加锁状态时,增量累到到 baseCount 中
private transient volatile long baseCount;
// LongAdder 中的 cellsBuzy,0 表示当前 LongAdder 对象无锁状态,1 表示当前 LongAdder 对象加锁状态
private transient volatile int cellsBusy;
// LongAdder 中的 cells 数组,
private transient volatile CounterCell[] counterCells;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

控制变量

sizeCtl < 0 :

  • -1 表示当前 table 正在初始化(有线程在创建 table 数组),当前线程需要自旋等待

  • 其他负数表示当前 map 的 table 数组正在进行扩容,高 16 位表示扩容的标识戳;低 16 位表示 (1 + nThread) 当前参与并发扩容的线程数量 + 1

sizeCtl = 0 :表示创建 table 数组时使用 DEFAULT_CAPACITY 为数组大小

sizeCtl > 0 :

  • 如果 table 未初始化,表示初始化大小
  • 如果 table 已经初始化,表示下次扩容时的触发条件(阈值,元素个数,不是数组的长度)
private transient volatile int sizeCtl;		// volatile 保持可见性
1

内部类

Node 节点:

// Node 节点
static class Node<K,V> implements Entry<K,V> {
    // 节点哈希值
    final int hash;
    final K key;
    volatile V val;
    // 单向链表
    volatile Node<K,V> next;
}
1
2
3
4
5
6
7
8
9

TreeBin 节点:

 static final class TreeBin<K,V> extends Node<K,V> {
     // 红黑树根节点
     TreeNode<K,V> root;
     // 链表的头节点
     volatile TreeNode<K,V> first;
     // 等待者线程
     volatile Thread waiter;

     volatile int lockState;
     // 写锁状态 写锁是独占状态,以散列表来看,真正进入到 TreeBin 中的写线程同一时刻只有一个线程
     static final int WRITER = 1;
     // 等待者状态(写线程在等待),当 TreeBin 中有读线程目前正在读取数据时,写线程无法修改数据
     static final int WAITER = 2;
     // 读锁状态是共享,同一时刻可以有多个线程 同时进入到 TreeBi 对象中获取数据,每一个线程都给 lockState + 4
     static final int READER = 4;
 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

TreeNode 节点:

static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;   //双向链表
    boolean red;
}
1
2
3
4
5
6
7

ForwardingNode 节点:转移节点

 static final class ForwardingNode<K,V> extends Node<K,V> {
     // 持有扩容后新的哈希表的引用
     final Node<K,V>[] nextTable;
     ForwardingNode(Node<K,V>[] tab) {
         // ForwardingNode 节点的 hash 值设为 -1
         super(MOVED, null, null, null);
         this.nextTable = tab;
     }
 }
1
2
3
4
5
6
7
8
9

代码块

变量:

// 表示sizeCtl属性在 ConcurrentHashMap 中内存偏移地址
private static final long SIZECTL;
// 表示transferIndex属性在 ConcurrentHashMap 中内存偏移地址
private static final long TRANSFERINDEX;
// 表示baseCount属性在 ConcurrentHashMap 中内存偏移地址
private static final long BASECOUNT;
// 表示cellsBusy属性在 ConcurrentHashMap 中内存偏移地址
private static final long CELLSBUSY;
// 表示cellValue属性在 CounterCell 中内存偏移地址
private static final long CELLVALUE;
// 表示数组第一个元素的偏移地址
private static final long ABASE;
// 用位移运算替代乘法
private static final int ASHIFT;
1
2
3
4
5
6
7
8
9
10
11
12
13
14

赋值方法:

// 表示数组单元所占用空间大小,scale 表示 Node[] 数组中每一个单元所占用空间大小,int 是 4 字节
int scale = U.arrayIndexScale(ak);
// 判断一个数是不是 2 的 n 次幂,比如 8:1000 & 0111 = 0000
if ((scale & (scale - 1)) != 0)
    throw new Error("data type scale not a power of two");

// numberOfLeadingZeros(n):返回当前数值转换为二进制后,从高位到低位开始统计,看有多少个0连续在一起
// 8 → 1000 numberOfLeadingZeros(8) = 28
// 4 → 100 numberOfLeadingZeros(4) = 29   int 值就是占4个字节
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);

// ASHIFT = 31 - 29 = 2 ,int 的大小就是 2 的 2 次方,获取次方数
// ABASE + (5 << ASHIFT) 用位移运算替代了乘法,获取 arr[5] 的值
1
2
3
4
5
6
7
8
9
10
11
12
13

# 构造方法

无参构造, 散列表结构延迟初始化,默认的数组大小是 16:

public ConcurrentHashMap() {
}
1
2

有参构造

public ConcurrentHashMap(int initialCapacity) {
    // 指定容量初始化
    if (initialCapacity < 0) throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               // 假如传入的参数是 16,16 + 8 + 1 ,最后得到 32
               // 传入 12, 12 + 6 + 1 = 19,最后得到 32,尽可能的大,与 HashMap不一样
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    // sizeCtl > 0,当目前 table 未初始化时,sizeCtl 表示初始化容量
    this.sizeCtl = cap;
}
1
2
3
4
5
6
7
8
9
10
11
private static final int tableSizeFor(int c) {
    int n = c - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
1
2
3
4
5
6
7
8
9

HashMap 部分详解了该函数,核心思想就是把最高位是 1 的位以及右边的位全部置 1,结果加 1 后就是 2 的 n 次幂

多个参数构造方法:

可以看到实现了懒惰初始化,在构造方法中仅仅计算了 table 的大小,以后在第一次使用时才会真正创建

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 初始容量小于并发级别
    if (initialCapacity < concurrencyLevel)  
        // 把并发级别赋值给初始容量
        initialCapacity = concurrencyLevel; 
	// loadFactor 默认是 0.75
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    // sizeCtl > 0,当目前 table 未初始化时,sizeCtl 表示初始化容量
    this.sizeCtl = cap;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

集合构造方法:

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;	// 默认16
    putAll(m);
}
public void putAll(Map<? extends K, ? extends V> m) {
    // 尝试触发扩容
    tryPresize(m.size());
    for (Entry<? extends K, ? extends V> e : m.entrySet())
        putVal(e.getKey(), e.getValue(), false);
}
1
2
3
4
5
6
7
8
9
10
private final void tryPresize(int size) {
    // 扩容为大于 2 倍的最小的 2 的 n 次幂
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
    	tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        // 数组还未初始化,【一般是调用集合构造方法才会成立,put 后调用该方法都是不成立的】
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);// 扩容阈值:n - 1/4 n
                    }
                } finally {
                    sizeCtl = sc;	// 扩容阈值赋值给sizeCtl
                }
            }
        }
        // 未达到扩容阈值或者数组长度已经大于最大长度
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        // 与 addCount 逻辑相同
        else if (tab == table) {
           
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 成员方法

// 获取 Node[] 中第 i 个 Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)
 
// cas 修改 Node[] 中第 i 个 Node 的值, c 为旧值, v 为新值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)
 
// 直接修改 Node[] 中第 i 个 Node 的值, v 为新值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)
1
2
3
4
5
6
7
8

# get 流程

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // spread 方法能确保返回结果是正数
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 如果头结点已经是要查找的 key
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // hash 为负数表示该 bin 在扩容中或是 treebin, 这时调用 find 方法来查找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 正常遍历链表, 用 equals 比较
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  • 如果 table 不为空且长度大于 0 且索引位置有元素
    • if 头节点 key 的 hash 值相等
      • 头节点的 key 指向同一个地址或者 equals
        • 返回 value
    • else if 头节点的 hash 为负数(bin 在扩容或者是 treebin)
      • 调用 find 方法查找
    • 进入循环(e 不为空):
      • 节点 key 的 hash 值相等,且 key 指向同一个地址或 equals
        • 返回 value
  • 返回 null

# put 流程

以下数组简称(table),链表简称(bin)

public V put(K key, V value) {
    return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 其中 spread 方法会综合高位低位, 具有更好的 hash 性
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        // f 是链表头节点
        // fh 是链表头结点的 hash
        // i 是链表在 table 中的下标
        Node<K,V> f; int n, i, fh;
        // 要创建 table
        if (tab == null || (n = tab.length) == 0)
            // 初始化 table 使用了 cas, 无需 synchronized 创建成功, 进入下一轮循环
            tab = initTable();
        // 要创建链表头节点
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 添加链表头使用了 cas, 无需 synchronized
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;
        }
        // 帮忙扩容
        else if ((fh = f.hash) == MOVED)
            // 帮忙之后, 进入下一轮循环
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 锁住链表头节点
            synchronized (f) {
                // 再次确认链表头节点没有被移动
                if (tabAt(tab, i) == f) {
                    // 链表
                    if (fh >= 0) {
                        binCount = 1;
                        // 遍历链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 找到相同的 key
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                // 更新
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            // 已经是最后的节点了, 新增 Node, 追加至链表尾
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 红黑树
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        // putTreeVal 会看 key 是否已经在树中, 是, 则返回对应的 TreeNode
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                              value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
                // 释放链表头节点的锁
            }

            if (binCount != 0) { 
                if (binCount >= TREEIFY_THRESHOLD)
                    // 如果链表长度 >= 树化阈值(8), 进行链表转为红黑树
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 增加 size 计数
    addCount(1L, binCount);
    return null;
}
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            Thread.yield();
        // 尝试将 sizeCtl 设置为 -1(表示初始化 table)
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            // 获得锁, 创建 table, 这时其它线程会在 while() 循环中 yield 直至 table 创建
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}
// check 是之前 binCount 的个数
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if (
        // 已经有了 counterCells, 向 cell 累加
        (as = counterCells) != null ||
        // 还没有, 向 baseCount 累加
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)
    ) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (
            // 还没有 counterCells
            as == null || (m = as.length - 1) < 0 ||
            // 还没有 cell
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            // cell cas 增加计数失败
            !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
        ) {
            // 创建累加单元数组和cell, 累加重试
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        // 获取元素个数
        s = sumCount();
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // newtable 已经创建了,帮忙扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // 需要扩容,这时 newtable 未创建
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
  • 进入 for 循环:
    • if table 为 null 或者长度 为 0
      • 初始化表
    • else if 索引处无节点
      • 创建节点,填入 key 和 value,放入 table,退出循环
    • else if 索引处节点的 hash 值为 MOVE(ForwardingNode),表示正在扩容和迁移
      • 帮忙
    • else
      • 锁住头节点
        • if 再次确认头节点没有被移动
          • if 头节点 hash 值大于 0(表示这是一个链表)
            • 遍历链表找到对应 key,如果没有,创建。
          • else if 节点为红黑树节点
            • 调用 putTreeVal 查看是否有对应 key 的数节点
              • 如果有且为覆盖模式,将值覆盖,返回旧值
              • 如果没有,创建并插入,返回 null
        • 解锁
      • if binCount 不为 0
        • 如果 binCount 大于树化阈值 8
          • 树化
        • 如果旧值不为 null
          • 返回旧值
        • break
  • 增加 size 计数
  • return null

# size 计算流程

size 计算实际发生在 put,remove 改变集合元素的操作之中

  • 没有竞争发生,向 baseCount 累加计数
  • 有竞争发生,新建 counterCells,向其中的一个 cell 累加计
    • counterCells 初始有两个 cell
    • 如果计数竞争比较激烈,会创建新的 cell 来累加计数
public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    // 将 baseCount 计数与所有 cell 计数累加
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# JDK 7 原理

ConcurrentHashMap 对锁粒度进行了优化,分段锁技术,将整张表分成了多个数组(Segment),每个数组又是一个类似 HashMap 数组的结构。允许多个修改操作并发进行,Segment 是一种可重入锁,继承 ReentrantLock,并发时锁住的是每个 Segment,其他 Segment 还是可以操作的,这样不同 Segment 之间就可以实现并发,大大提高效率。

底层结构: Segment 数组 + HashEntry 数组 + 链表(数组 + 链表是 HashMap 的结构)

  • 优点:如果多个线程访问不同的 segment,实际是没有冲突的,这与 JDK8 中是类似的

  • 缺点:Segments 数组默认大小为 16,这个容量初始化指定后就不能改变了,并且不是懒惰初始化

# 构造器分析

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // ssize 必须是 2^n, 即 2, 4, 8, 16 ... 表示了 segments 数组的大小
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // segmentShift 默认是 32 - 4 = 28
    this.segmentShift = 32 - sshift;
    // segmentMask 默认是 15 即 0000 0000 0000 1111
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;
    // 创建 segments and segments[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

构造完成,如下图所示

image-20231226191529154

可以看到 ConcurrentHashMap 没有实现懒惰初始化,空间占用不友好

其中 this.segmentShift 和 this.segmentMask 的作用是决定将 key 的 hash 结果匹配到哪个 segment

例如,根据某一 hash 值求 segment 位置,先将高位向低位移动 this.segmentShift 位

image-20231226191546479

# put 流程

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    // 计算出 segment 下标
    int j = (hash >>> segmentShift) & segmentMask;

    // 获得 segment 对象, 判断是否为 null, 是则创建该 segment
    if ((s = (Segment<K,V>)UNSAFE.getObject 
         (segments, (j << SSHIFT) + SBASE)) == null) {
        // 这时不能确定是否真的为 null, 因为其它线程也发现该 segment 为 null,
        // 因此在 ensureSegment 里用 cas 方式保证该 segment 安全性
        s = ensureSegment(j);
    }
    // 进入 segment 的put 流程
    return s.put(key, hash, value, false);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

segment 继承了可重入锁(ReentrantLock),它的 put 方法为

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 尝试加锁
    HashEntry<K,V> node = tryLock() ? null :
    // 如果不成功, 进入 scanAndLockForPut 流程
    // 如果是多核 cpu 最多 tryLock 64 次, 进入 lock 流程
    // 在尝试期间, 还可以顺便看该节点在链表中有没有, 如果没有顺便创建出来
    scanAndLockForPut(key, hash, value);

    // 执行到这里 segment 已经被成功加锁, 可以安全执行
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                // 更新
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) { 
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    } break;
                }
                e = e.next;
            }
            else {
                // 新增
                // 1) 之前等待锁时, node 已经被创建, next 指向链表头
                if (node != null)
                    node.setNext(first);
                else
                    // 2) 创建新 node
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1; 
                // 3) 扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    // 将 node 作为链表头
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

# rehash 流程

发生在 put 中,因为此时已经获得了锁,因此 rehash 时不需要考虑线程安全

private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    int sizeMask = newCapacity - 1;
    for (int i = 0; i < oldCapacity ; i++) {
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            int idx = e.hash & sizeMask;
            if (next == null) // Single node on list
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                // 过一遍链表, 尽可能把 rehash 后 idx 不变的节点重用
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                newTable[lastIdx] = lastRun;
                // 剩余节点需要新建
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 扩容完成, 才加入新的节点
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;

    // 替换为新的 HashEntry table
    table = newTable;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

附,调试代码

public static void main(String[] args) {
    ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
    for (int i = 0; i < 1000; i++) {
        int hash = hash(i);
        int segmentIndex = (hash >>> 28) & 15;
        if (segmentIndex == 4 && hash % 8 == 2) {
            System.out.println(i + "\t" + segmentIndex + "\t" + hash % 2 + "\t" + hash % 4 +
                               "\t" + hash % 8);
        }
    }
    map.put(1, "value");
    map.put(15, "value"); // 2 扩容为 4 15 的 hash%8 与其他不同
    map.put(169, "value");
    map.put(197, "value"); // 4 扩容为 8
    map.put(341, "value");
    map.put(484, "value");
    map.put(545, "value"); // 8 扩容为 16
    map.put(912, "value");
    map.put(941, "value");
    System.out.println("ok");
}
private static int hash(Object k) {
    int h = 0;
    if ((0 != h) && (k instanceof String)) {
        return sun.misc.Hashing.stringHash32((String) k);
    }
    h ^= k.hashCode();
    // Spread bits to regularize both segment and index locations,
    // using variant of single-word Wang/Jenkins hash.
    h += (h << 15) ^ 0xffffcd7d;
    h ^= (h >>> 10);
    h += (h << 3);
    h ^= (h >>> 6);
    h += (h << 2) + (h << 14);
    int v = h ^ (h >>> 16);
    return v;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# get 流程

get 时并未加锁,用了 UNSAFE 方法保证了可见性,扩容过程中,get 先发生就从旧表取内容,get 后发生就从新 表取内容

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    // u 为 segment 对象在数组中的偏移量
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // s 即为 segment
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
             (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# size 计算流程

  • 计算元素个数前,先不加锁计算两次,如果前后两次结果如一样,认为个数正确返回
  • 如果不一样,进行重试,重试次数超过 3,将所有 segment 锁住,重新计算个数返回
public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum; // sum of modCounts
    long last = 0L; // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            if (retries++ == RETRIES_BEFORE_LOCK) {
                // 超过重试次数, 需要创建所有 segment 并加锁
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

# BlockingQueue

基本的入队出队

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    static class Node<E> {
        E item;
        /**
 		* 下列三种情况之一
 		* - 真正的后继节点
 		* - 自己, 发生在出队时
 		* - null, 表示是没有后继节点, 是最后了
 		*/
        Node<E> next;
        Node(E x) { item = x; }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

初始化链表 last = head = new Node<E>(null); Dummy 节点用来占位,item 为 null

img

当一个节点入队 last = last.next = node;

img

再来一个节点入队 last = last.next = node;

img

出队

//临时变量h用来指向哨兵
Node<E> h = head;
//first用来指向第一个元素
Node<E> first = h.next;
h.next = h; // help GC
//head赋值为first,表示first节点就是下一个哨兵。
head = first;
E x = first.item;
//删除first节点中的数据,表示真正成为了哨兵,第一个元素出队。
first.item = null;
return x;
1
2
3
4
5
6
7
8
9
10
11

h = head

img

first = h.next

img

h.next = h

img

head = first

image.png

E x = first.item;
first.item = null;
return x;
1
2
3

img

# 加锁分析

高明之处在于用了两把锁和 dummy 节点

  • 用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行
  • 用两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
    • 消费者与消费者线程仍然串行
    • 生产者与生产者线程仍然串行

线程安全分析

  • 当节点总数大于 2 时(包括 dummy 节点),putLock 保证的是 last 节点的线程安全,takeLock 保证的是 head 节点的线程安全。两把锁保证了入队和出队没有竞争
  • 当节点总数等于 2 时(即一个 dummy 节点,一个正常节点)这时候,仍然是两把锁锁两个对象,不会竞争
  • 当节点总数等于 1 时(就一个 dummy 节点)这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞
// 用于 put(阻塞) offer(非阻塞)
private final ReentrantLock putLock = new ReentrantLock();
// 用户 take(阻塞) poll(非阻塞)
private final ReentrantLock takeLock = new ReentrantLock();
1
2
3
4

# put 操作

public void put(E e) throws InterruptedException {
    //LinkedBlockingQueue不支持空元素
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    // count 用来维护元素计数
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 满了等待
        while (count.get() == capacity) {
            // 倒过来读就好: 等待 notFull
            notFull.await();
        }
        // 有空位, 入队且计数加一
        enqueue(node);
        c = count.getAndIncrement(); 
        // 除了自己 put 以外, 队列还有空位, 由自己叫醒其他 put 线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 如果队列中有一个元素, 叫醒 take 线程
    if (c == 0)
        // 这里调用的是 notEmpty.signal() 而不是 notEmpty.signalAll() 是为了减少竞争
        signalNotEmpty();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# take 操作

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 如果队列中只有一个空位时, 叫醒 put 线程
    // 如果有多个线程进行出队, 第一个线程满足 c == capacity, 但后续线程 c < capacity
    if (c == capacity)
        // 这里调用的是 notFull.signal() 而不是 notFull.signalAll() 是为了减少竞争
        signalNotFull()
        return x;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

由 put 唤醒 put 是为了避免信号不足

# 性能比较

主要列举 LinkedBlockingQueue 与 ArrayBlockingQueue 的性能比较

  • Linked 支持有界,Array 强制有界
  • Linked 实现是链表,Array 实现是数组
  • Linked 是懒惰的,而 Array 需要提前初始化 Node 数组
  • Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的
  • Linked 两把锁,Array 一把锁

# ConcurrentLinkedQueue

ConcurrentLinkedQueue 的设计与 LinkedBlockingQueue 非常像,也是

  • 两把【锁】,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
  • dummy 节点的引入让两把【锁】将来锁住的是不同对象,避免竞争
  • 只是这【锁】使用了 cas 来实现

事实上,ConcurrentLinkedQueue 应用还是非常广泛的

例如之前讲的 Tomcat 的 Connector 结构时,Acceptor 作为生产者向 Poller 消费者传递事件信息时,正是采用了 ConcurrentLinkedQueue 将 SocketChannel 给 Poller 使用

graph LR

subgraph Connector->NIO EndPoint
t1(LimitLatch)
t2(Acceptor)
t3(SocketChannel 1)
t4(SocketChannel 2)
t5(Poller)
subgraph Executor
t7(worker1)
t8(worker2)
end
t1 --> t2
t2 --> t3
t2 --> t4
t3 --有读--> t5
t4 --有读--> t5
t5 --socketProcessor--> t7
t5 --socketProcessor--> t8
end


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 模仿 ConcurrentLinkedQueue

# 初始代码

package cn.itcast.concurrent.thirdpart.test;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
public class Test3 {
    public static void main(String[] args) {
        MyQueue<String> queue = new MyQueue<>();
        queue.offer("1");
        queue.offer("2");
        queue.offer("3");
        System.out.println(queue);
    }
}
class MyQueue<E> implements Queue<E> {
    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        for (Node<E> p = head; p != null; p = p.next.get()) {
            E item = p.item;
            if (item != null) {
                sb.append(item).append("->");
            }
        }
        sb.append("null");
        return sb.toString();
    }
    @Override
    public int size() {
        return 0;
    }
    @Override
    public boolean isEmpty() {
        return false;
    }
    @Override
    public boolean contains(Object o) {
        return false;
    }
    @Override
    public Iterator<E> iterator() {
        return null;
    }
    @Override
    public Object[] toArray() {
        return new Object[0];
    }
    @Override
    public <T> T[] toArray(T[] a) {
        return null;
    }
    @Override
    public boolean add(E e) {
        return false;
    }
    @Override
    public boolean remove(Object o) {
        return false;
    }
    @Override
    public boolean containsAll(Collection<?> c) {
        return false;
    }
    @Override
    public boolean addAll(Collection<? extends E> c) {
        return false;
    }
    @Override
    public boolean removeAll(Collection<?> c) {
        return false;
    }
    @Override
    public boolean retainAll(Collection<?> c) {
        return false;
    }
    @Override
    public void clear() {
    }
    @Override
    public E remove() {
        return null;
    }
    @Override
    public E element() {
        return null;
    }
    @Override
    public E peek() {
        return null;
    }
    public MyQueue() {
        head = last = new Node<>(null, null);
    }
    private volatile Node<E> last;
    private volatile Node<E> head;
    private E dequeue() {
        /*Node<E> h = head;
 		Node<E> first = h.next;
 		h.next = h;
 		head = first;
        E x = first.item;
 		first.item = null;
 		return x;*/
        return null;
    }
    @Override
    public E poll() {
        return null;
    }
    @Override
    public boolean offer(E e) {
        return true;
    }
    static class Node<E> {
        volatile E item;
        public Node(E item, Node<E> next) {
            this.item = item;
            this.next = new AtomicReference<>(next);
        }
        AtomicReference<Node<E>> next;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122

# offer 方法

public boolean offer(E e) {
    Node<E> n = new Node<>(e, null);
    while(true) {
        // 获取尾节点
        AtomicReference<Node<E>> next = last.next;
        // S1: 真正尾节点的 next 是 null, cas 从 null 到新节点
        if(next.compareAndSet(null, n)) {
            // 这时的 last 已经是倒数第二, next 不为空了, 其它线程的 cas 肯定失败
            // S2: 更新 last 为倒数第一的节点
            last = n;
            return true;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# CopyOnWriteArrayList

CopyOnWriteArraySet 是它的马甲 底层实现采用了 写入时拷贝 的思想,增删改操作会将底层数组拷贝一份,更 改操作在新数组上执行,这时不影响其它线程的并发读,读写分离。 以新增为例:

public boolean add(E e) {
    synchronized (lock) {
        // 获取旧的数组
        Object[] es = getArray();
        int len = es.length;
        // 拷贝新的数组(这里是比较耗时的操作,但不影响其它读线程)
        es = Arrays.copyOf(es, len + 1);
        // 添加新元素
        es[len] = e;
        // 替换旧的数组
        setArray(es);
        return true;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

这里的源码版本是 Java 11,在 Java 1.8 中使用的是可重入锁而不是 synchronized

其它读操作并未加锁,例如:

public void forEach(Consumer<? super E> action) {
    Objects.requireNonNull(action);
    for (Object x : getArray()) {
        @SuppressWarnings("unchecked") E e = (E) x;
        action.accept(e);
    }
}
1
2
3
4
5
6
7

适合『读多写少』的应用场景

# get 弱一致性

数据一致性就是读到最新更新的数据:

  • 强一致性:当更新操作完成之后,任何多个后续进程或者线程的访问都会返回最新的更新过的值

  • 弱一致性:系统并不保证进程或者线程的访问都会返回最新的更新过的值,也不会承诺多久之后可以读到

img

时间点 操作
1 Thread-0 getArray()
2 Thread-1 getArray()
3 Thread-1 setArray(arrayCopy)
4 Thread-0 array[index]

Thread-0 读到了脏数据

不容易测试,但问题确实存在

# 迭代器弱一致性

CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();
list.add(1);
list.add(2);
list.add(3);
Iterator<Integer> iter = list.iterator();
new Thread(() -> {
    list.remove(0);
    System.out.println(list);
}).start();
sleep1s();
//此时主线程的iterator依旧指向旧的数组。
while (iter.hasNext()) {
    System.out.println(iter.next());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

不要觉得弱一致性就不好

  • 数据库的 MVCC 都是弱一致性的表现
  • 并发高和一致性是矛盾的,需要权衡

# 安全失败

在 java.util 包的集合类就都是快速失败的,而 java.util.concurrent 包下的类都是安全失败

  • 快速失败:在 A 线程使用迭代器对集合进行遍历的过程中,此时 B 线程对集合进行修改(增删改),或者 A 线程在遍历过程中对集合进行修改,都会导致 A 线程抛出 ConcurrentModificationException 异常

    • AbstractList 类中的成员变量 modCount,用来记录 List 结构发生变化的次数,结构发生变化是指添加或者删除至少一个元素的操作,或者是调整内部数组的大小,仅仅设置元素的值不算结构发生变化
    • 在进行序列化或者迭代等操作时,需要比较操作前后 modCount 是否改变,如果改变了抛出 CME 异常
  • 安全失败:采用安全失败机制的集合容器,在迭代器遍历时直接在原集合数组内容上访问,但其他线程的增删改都会新建数组进行修改,就算修改了集合底层的数组容器,迭代器依然引用着以前的数组(快照思想),所以不会出现异常

    ConcurrentHashMap 不会出现并发时的迭代异常,因为在迭代过程中 CHM 的迭代器并没有判断结构的变化,迭代器还可以根据迭代的节点状态去寻找并发扩容时的新表进行迭代

    ConcurrentHashMap map = new ConcurrentHashMap();
    // KeyIterator
    Iterator iterator = map.keySet().iterator();
    
    1
    2
    3
     Traverser(Node<K,V>[] tab, int size, int index, int limit) {
         // 引用还是原来集合的 Node 数组,所以其他线程对数据的修改是可见的
         this.tab = tab;
         this.baseSize = size;
         this.baseIndex = this.index = index;
         this.baseLimit = limit;
         this.next = null;
     }
    
    1
    2
    3
    4
    5
    6
    7
    8
    public final boolean hasNext() { return next != null; }
    public final K next() {
        Node<K,V> p;
        if ((p = next) == null)
            throw new NoSuchElementException();
        K k = p.key;
        lastReturned = p;
        // 在方法中进行下一个节点的获取,会进行槽位头节点的状态判断
        advance();
        return k;
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

# Collections

Collections 类是用来操作集合的工具类,提供了集合转换成线程安全的方法:

 public static <T> Collection<T> synchronizedCollection(Collection<T> c) {
     return new SynchronizedCollection<>(c);
 }
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
    return new SynchronizedMap<>(m);
}
1
2
3
4
5
6

源码:底层也是对方法进行加锁

public boolean add(E e) {
    synchronized (mutex) {return c.add(e);}
}
1
2
3

# SkipListMap

# 底层结构

跳表 SkipList 是一个有序的链表,默认升序,底层是链表加多级索引的结构。跳表可以对元素进行快速查询,类似于平衡树,是一种利用空间换时间的算法

对于单链表,即使链表是有序的,如果查找数据也只能从头到尾遍历链表,所以采用链表上建索引的方式提高效率,跳表的查询时间复杂度是 O(logn),空间复杂度 O (n)

ConcurrentSkipListMap 提供了一种线程安全的并发访问的排序映射表,内部是跳表结构实现,通过 CAS + volatile 保证线程安全

平衡树和跳表的区别:

  • 对平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整;而对跳表的插入和删除,只需要对整个结构的局部进行操作
  • 在高并发的情况下,保证整个平衡树的线程安全需要一个全局锁;对于跳表则只需要部分锁,拥有更好的性能

BaseHeader 存储数据,headIndex 存储索引,纵向上所有索引都指向链表最下面的节点

# 成员变量

// 标识索引头节点位置
private static final Object BASE_HEADER = new Object();

// 跳表的顶层索引
private transient volatile HeadIndex<K,V> head;

// 比较器,为 null 则使用自然排序
final Comparator<? super K> comparator;

// Node 节点
static final class Node<K, V>{
    final K key;  				// key 是 final 的, 说明节点一旦定下来, 除了删除, 一般不会改动 key
    volatile Object value; 		// 对应的 value
    volatile Node<K, V> next; 	// 下一个节点,单向链表
}

// 索引节点 Index,只有向下和向右的指针
static class Index<K, V>{
    final Node<K, V> node; 		// 索引指向的节点,每个都会指向数据节点
    final Index<K, V> down; 	// 下边level层的Index,分层索引
    volatile Index<K, V> right; // 右边的Index,单向

    // 在 index 本身和 succ 之间插入一个新的节点 newSucc
    final boolean link(Index<K, V> succ, Index<K, V> newSucc){
        Node<K, V> n = node;
        newSucc.right = succ;
        // 把当前节点的右指针从 succ 改为 newSucc
        return n.value != null && casRight(succ, newSucc);
    }

    // 断开当前节点和 succ 节点,将当前的节点 index 设置其的 right 为 succ.right,就是把 succ 删除
    final boolean unlink(Index<K, V> succ){
        return node.value != null && casRight(succ, succ.right);
    }
}


// 头索引节点 HeadIndex
static final class HeadIndex<K,V> extends Index<K,V> {
    final int level;	// 表示索引层级,所有的 HeadIndex 都指向同一个 Base_header 节点
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

# 构造方法

public ConcurrentSkipListMap() {
    this.comparator = null;	// comparator 为 null,使用 key 的自然序,如字典序
    initialize();
}

private void initialize() {
    keySet = null;
    entrySet = null;
    values = null;
    descendingMap = null;
    // 初始化索引头节点,Node 的 key 为 null,value 为 BASE_HEADER 对象,下一个节点为 null
    // head 的分层索引 down 为 null,链表的后续索引 right 为 null,层级 level 为第 1 层
    head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null), null, null, 1);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 比较器

// x 是比较者,y 是被比较者,比较者大于被比较者 返回正数,小于返回负数,相等返回 0
static final int cpr(Comparator c, Object x, Object y) {
    return (c != null) ? c.compare(x, y) : ((Comparable)x).compareTo(y);
}
1
2
3
4

# put 方法

findPredecessor() :寻找前置节点

从最上层的头索引开始向右查找(链表的后续索引),如果后续索引的节点的 key 大于要查找的 key,则头索引移到下层链表,在下层链表查找,以此反复,一直查找到没有下层的分层索引为止,返回该索引的节点。如果后续索引的节点的 key 小于要查找的 key,则在该层链表中向后查找。由于查找的 key 可能永远大于索引节点的 key,所以只能找到目标的前置索引节点。如果遇到空值索引的存在,通过 CAS 来断开索引

private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
    if (key == null)
        throw new NullPointerException(); // don't postpone errors
    for (;;) {
        // 1.初始数据 q 是 head,r 是最顶层 h 的右 Index 节点
        for (Index<K,V> q = head, r = q.right, d;;) {
            // 2.右索引节点不为空,则进行向下查找
            if (r != null) {
                Node<K,V> n = r.node;
                K k = n.key;
                // 3.n.value 为 null 说明节点 n 正在删除的过程中,此时【当前线程帮其删除索引】
                if (n.value == null) {
                    // 在 index 层直接删除 r 索引节点
                    if (!q.unlink(r))
                        // 删除失败重新从 head 节点开始查找,break 一个 for 到步骤 1,又从初始值开始
                        break;
                    
                    // 删除节点 r 成功,获取新的 r 节点,
                    r = q.right;
                    // 回到步骤 2,还是从这层索引开始向右遍历
                    continue;
                }
                // 4.若参数 key > r.node.key,则继续向右遍历, continue 到步骤 2 处获取右节点
                //   若参数 key < r.node.key,说明需要进入下层索引,到步骤 5
                if (cpr(cmp, key, k) > 0) {
                    q = r;
                    r = r.right;
                    continue;
                }
            }
            // 5.先让 d 指向 q 的下一层,判断是否是 null,是则说明已经到了数据层,也就是第一层
            if ((d = q.down) == null) 
                return q.node;
            // 6.未到数据层, 进行重新赋值向下扫描
            q = d;		// q 指向 d
            r = d.right;// r 指向 q 的后续索引节点,此时(q.key < key < r.key)
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

image-20231226193052879

put() :添加数据

public V put(K key, V value) {
    // 非空判断,value不能为空
    if (value == null)
        throw new NullPointerException();
    return doPut(key, value, false);
}

private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K,V> z;
    // 非空判断,key 不能为空
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    // outer 循环,【把待插入数据插入到数据层的合适的位置,并在扫描过程中处理已删除(value = null)的数据】
    outer: for (;;) {
        //0.for (;;)
        //1.将 key 对应的前继节点找到, b 为前继节点,是数据层的, n 是前继节点的 next, 
		//  若没发生条件竞争,最终 key 在 b 与 n 之间 (找到的 b 在 base_level 上)
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            // 2.n 不为 null 说明 b 不是链表的最后一个节点
            if (n != null) {
                Object v; int c;
                // 3.获取 n 的右节点
                Node<K,V> f = n.next;
                // 4.条件竞争,并发下其他线程在 b 之后插入节点或直接删除节点 n, break 到步骤 0
                if (n != b.next)              
                    break;
                //  若节点 n 已经删除, 则调用 helpDelete 进行【帮助删除节点】
                if ((v = n.value) == null) {
                    n.helpDelete(b, f);
                    break;
                }
                // 5.节点 b 被删除中,则 break 到步骤 0,
				//  【调用findPredecessor帮助删除index层的数据, node层的数据会通过helpDelete方法进行删除】
                if (b.value == null || v == n) 
                    break;
                // 6.若 key > n.key,则进行向后扫描
                //   若 key < n.key,则证明 key 应该存储在 b 和 n 之间
                if ((c = cpr(cmp, key, n.key)) > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                // 7.key 的值和 n.key 相等,则可以直接覆盖赋值
                if (c == 0) {
                    // onlyIfAbsent 默认 false,
                    if (onlyIfAbsent || n.casValue(v, value)) {
                        @SuppressWarnings("unchecked") V vv = (V)v;
                        // 返回被覆盖的值
                        return vv;
                    }
                    // cas失败,break 一层循环,返回 0 重试
                    break;
                }
                // else c < 0; fall through
            }
            // 8.此时的情况 b.key < key < n.key,对应流程图1中的7,创建z节点指向n
            z = new Node<K,V>(key, value, n);
            // 9.尝试把 b.next 从 n 设置成 z
            if (!b.casNext(n, z))
                // cas失败,返回到步骤0,重试
                break;
            // 10.break outer 后, 上面的 for 循环不会再执行, 而后执行下面的代码
            break outer;
        }
    }
	// 【以上插入节点已经完成,剩下的任务要根据随机数的值来表示是否向上增加层数与上层索引】
    
    // 随机数
    int rnd = ThreadLocalRandom.nextSecondarySeed();
    
    // 如果随机数的二进制与 10000000000000000000000000000001 进行与运算为 0
    // 即随机数的二进制最高位与最末尾必须为 0,其他位无所谓,就进入该循环
    // 如果随机数的二进制最高位与最末位不为 0,不增加新节点的层数
    
    // 11.判断是否需要添加 level,32 位
    if ((rnd & 0x80000001) == 0) {
        // 索引层 level,从 1 开始,就是最底层
        int level = 1, max;
        // 12.判断最低位前面有几个 1,有几个leve就加几:0..0 0001 1110,这是4个,则1+4=5
        //    【最大有30个就是 1 + 30 = 31
        while (((rnd >>>= 1) & 1) != 0)
            ++level;
        // 最终会指向 z 节点,就是添加的节点 
        Index<K,V> idx = null;
        // 指向头索引节点
        HeadIndex<K,V> h = head;
        
        // 13.判断level是否比当前最高索引小,图中 max 为 3
        if (level <= (max = h.level)) {
            for (int i = 1; i <= level; ++i)
                // 根据层数level不断创建新增节点的上层索引,索引的后继索引留空
                // 第一次idx为null,也就是下层索引为空,第二次把上次的索引作为下层索引,【类似头插法】
                idx = new Index<K,V>(z, idx, null);
            // 循环以后的索引结构
            // index-3	← idx
            //   ↓
            // index-2
            //   ↓
            // index-1
            //   ↓
            //  z-node
        }
        // 14.若 level > max,则【只增加一层 index 索引层】,3 + 1 = 4
        else { 
            level = max + 1;
            //创建一个 index 数组,长度是 level+1,假设 level 是 4,创建的数组长度为 5
            Index<K,V>[] idxs = (Index<K,V>[])new Index<?,?>[level+1];
            // index[0]的数组 slot 并没有使用,只使用 [1,level] 这些数组的 slot
            for (int i = 1; i <= level; ++i)
                idxs[i] = idx = new Index<K,V>(z, idx, null);
              		// index-4   ← idx
                    //   ↓
                  	// ......
                    //   ↓
                    // index-1
                    //   ↓
                    //  z-node
            
            for (;;) {
                h = head;
                // 获取头索引的层数,3
                int oldLevel = h.level;
                // 如果 level <= oldLevel,说明其他线程进行了 index 层增加操作,退出循环
                if (level <= oldLevel)
                    break;
                // 定义一个新的头索引节点
                HeadIndex<K,V> newh = h;
                // 获取头索引的节点,就是 BASE_HEADER
                Node<K,V> oldbase = h.node;
                // 升级 baseHeader 索引,升高一级,并发下可能升高多级
                for (int j = oldLevel + 1; j <= level; ++j)
                    // 参数1:底层node,参数二:down,为以前的头节点,参数三:right,新建
                    newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                // 执行完for循环之后,baseHeader 索引长这个样子,这里只升高一级
                // index-4             →             index-4	← idx
                //   ↓                                  ↓
                // index-3                           index-3     
                //   ↓                                  ↓
                // index-2                           index-2
                //   ↓                                  ↓
                // index-1                           index-1
                //   ↓                                  ↓
                // baseHeader    →    ....      →     z-node
                
                // cas 成功后,head 字段指向最新的 headIndex,baseHeader 的 index-4
                if (casHead(h, newh)) {
                    // h 指向最新的 index-4 节点
                    h = newh;
                    // 让 idx 指向 z-node 的 index-3 节点,
					// 因为从 index-3 - index-1 的这些 z-node 索引节点 都没有插入到索引链表
                    idx = idxs[level = oldLevel];
                    break;
                }
            }
        }
        // 15.【把新加的索引插入索引链表中】,有上述两种情况,一种索引高度不变,另一种是高度加 1
        // 要插入的是第几层的索引
        splice: for (int insertionLevel = level;;) {
            // 获取头索引的层数,情况 1 是 3,情况 2 是 4
            int j = h.level;
            // 【遍历 insertionLevel 层的索引,找到合适的插入位置】
            for (Index<K,V> q = h, r = q.right, t = idx;;) {
                // 如果头索引为 null 或者新增节点索引为 null,退出插入索引的总循环
                if (q == null || t == null)
                    // 此处表示有其他线程删除了头索引或者新增节点的索引
                    break splice;
                // 头索引的链表后续索引存在,如果是新层则为新节点索引,如果是老层则为原索引
                if (r != null) {
                    // 获取r的节点
                    Node<K,V> n = r.node;
                    // 插入的key和n.key的比较值
                    int c = cpr(cmp, key, n.key);
                    // 【删除空值索引】
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;
                        r = q.right;
                        continue;
                    }
                    // key > r.node.key,向右扫描
                    if (c > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
                // 执行到这里,说明 key < r.node.key,判断是否是第 j 层插入新增节点的前置索引
                if (j == insertionLevel) {
                    // 【将新索引节点 t 插入 q r 之间】
                    if (!q.link(r, t))
                        break; 
                    // 如果新增节点的值为 null,表示该节点已经被其他线程删除
                    if (t.node.value == null) {
                        // 找到该节点
                        findNode(key);
                        break splice;
                    }
                    // 插入层逐层自减,当为最底层时退出循环
                    if (--insertionLevel == 0)
                        break splice;
                }
				// 其他节点随着插入节点的层数下移而下移
                if (--j >= insertionLevel && j < level)
                    t = t.down;
                q = q.down;
                r = q.right;
            }
        }
    }
    return null;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212

findNode()

private Node<K,V> findNode(Object key) {
    // 原理与doGet相同,无非是 findNode 返回节点,doGet 返回 value
    if ((c = cpr(cmp, key, n.key)) == 0)
        return n;
}
1
2
3
4
5

# get 方法

get(key) :获取对应的数据

public V get(Object key) {
    return doGet(key);
}

// 扫描过程会对已 value = null 的元素进行删除处理
private V doGet(Object key) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        // 1.找到最底层节点的前置节点
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            // 2.【如果该前置节点的链表后续节点为 null,说明不存在该节点】
            if (n == null)
                break outer;
            // b → n → f
            Node<K,V> f = n.next;
            // 3.如果n不为前置节点的后续节点,表示已经有其他线程删除了该节点
            if (n != b.next) 
                break;
            // 4.如果后续节点的值为null,【需要帮助删除该节点】
            if ((v = n.value) == null) {
                n.helpDelete(b, f);
                break;
            }
            // 5.如果前置节点已被其他线程删除,重新循环
            if (b.value == null || v == n)
                break;
             // 6.如果要获取的key与后续节点的key相等,返回节点的value
            if ((c = cpr(cmp, key, n.key)) == 0) {
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
            // 7.key < n.key,因位 key > b.key,b 和 n 相连,说明不存在该节点或者被其他线程删除了
            if (c < 0)
                break outer;
            b = n;
            n = f;
        }
    }
    return null;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# remove 方法

remove()

public V remove(Object key) {
    return doRemove(key, null);
}
final V doRemove(Object key, Object value) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        // 1.找到最底层目标节点的前置节点,b.key < key
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            // 2.如果该前置节点的链表后续节点为 null,退出循环,说明不存在这个元素
            if (n == null)
                break outer;
            // b → n → f
            Node<K,V> f = n.next;
            if (n != b.next)                    // inconsistent read
                break;
            if ((v = n.value) == null) {        // n is deleted
                n.helpDelete(b, f);
                break;
            }
            if (b.value == null || v == n)      // b is deleted
                break;
            //3.key < n.key,说明被其他线程删除了,或者不存在该节点
            if ((c = cpr(cmp, key, n.key)) < 0)
                break outer;
            //4.key > n.key,继续向后扫描
            if (c > 0) {
                b = n;
                n = f;
                continue;
            }
            //5.到这里是 key = n.key,value 不为空的情况下判断 value 和 n.value 是否相等
            if (value != null && !value.equals(v))
                break outer;
            //6.【把 n 节点的 value 置空】
            if (!n.casValue(v, null))
                break;
            //7.【给 n 添加一个删除标志 mark】,mark.next = f,然后把 b.next 设置为 f,成功后n出队
            if (!n.appendMarker(f) || !b.casNext(n, f))
                // 对 key 对应的 index 进行删除,调用了 findPredecessor 方法
                findNode(key);
            else {
                // 进行操作失败后通过 findPredecessor 中进行 index 的删除
                findPredecessor(key, cmp);
                if (head.right == null)
                    // 进行headIndex 对应的index 层的删除
                    tryReduceLevel();
            }
            @SuppressWarnings("unchecked") V vv = (V)v;
            return vv;
        }
    }
    return null;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

经过 findPredecessor() 中的 unlink() 后索引已经被删除

appendMarker() :添加删除标记节点

boolean appendMarker(Node<K,V> f) {
    // 通过 CAS 让 n.next 指向一个 key 为 null,value 为 this,next 为 f 的标记节点
    return casNext(f, new Node<K,V>(f));
}
1
2
3
4

helpDelete() :将添加了删除标记的节点清除,参数是该节点的前驱和后继节点

void helpDelete(Node<K,V> b, Node<K,V> f) {
    // this 节点的后续节点为 f,且本身为 b 的后续节点,一般都是正确的,除非被别的线程删除
    if (f == next && this == b.next) {
        // 如果 n 还还没有被标记
        if (f == null || f.value != f) 
            casNext(f, new Node<K,V>(f));
        else
            // 通过 CAS,将 b 的下一个节点 n 变成 f.next,即成为图中的样式
            b.casNext(this, f.next);
    }
}
1
2
3
4
5
6
7
8
9
10
11

tryReduceLevel() :删除索引

private void tryReduceLevel() {
    HeadIndex<K,V> h = head;
    HeadIndex<K,V> d;
    HeadIndex<K,V> e;
    if (h.level > 3 &&
        (d = (HeadIndex<K,V>)h.down) != null &&
        (e = (HeadIndex<K,V>)d.down) != null &&
        e.right == null &&
        d.right == null &&
        h.right == null &&
        // 设置头索引
        casHead(h, d) && 
        // 重新检查
        h.right != null) 
        // 重新检查返回true,说明其他线程增加了索引层级,把索引头节点设置回来
        casHead(d, h);   
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
编辑 (opens new window)
上次更新: 2025/01/01, 10:09:39
JUC总结
Go 基础

← JUC总结 Go 基础→

最近更新
01
k8s
06-06
02
进程与线程
03-04
03
计算机操作系统概述
02-26
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Iekr | Blog
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式