Chiriri's blog Chiriri's blog
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)

Iekr

苦逼后端开发
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)
  • JavaSE

  • JavaEE

  • Linux

  • MySQL

  • NoSQL

  • Python

  • Python模块

  • 机器学习

  • 设计模式

  • 传智健康

  • 畅购商城

  • 博客项目

  • JVM

  • JUC

    • 多线程
    • CompletableFuture
    • 锁
    • LockSupport与线程中断
    • Java内存模型之JMM
    • Volatile与Java内存模型
    • CAS
    • 原子操作类
      • 基本类型原子类
      • 数组类型原子类
      • 引用类型原子类
        • AtomicReference
        • AtomicStampedReference
        • 解决ABA案例
        • AtomicMarkableReference
        • 解决ABA案例
      • 对象的属性修改原子类
      • 原子操作增强类
        • 原理分析
        • Striped64
        • Cell
        • LongAdder为什么这么快
        • 源码解读
        • increment()
        • add(1L)
        • longAccumulate
        • 计算
        • sum
        • 使用总结
        • 总结
        • AtomicLong
        • LongAdder vs AtomicLong Performance
        • LongAdder
    • ThreadLocal
    • Java对象内存布局和对象头
    • Synchronized与锁升级
    • AbstractQueuedSynchronizer之AQS
    • ReentrantLock、ReentrantReadWriteLock、StampedLock讲解
    • JUC总结
    • 并发集合
  • Golang

  • Kubernetes

  • 硅谷课堂

  • C

  • 源码

  • 神领物流

  • RocketMQ

  • 短链平台

  • 后端
  • JUC
Iekr
2023-12-04
目录

原子操作类

# 原子操作类

我们一共介绍 18 种原子操作,又称 18 罗汉增强

  • AtomicBoolean
  • AtomicInteger
  • AtomicIntegerArray
  • AtomicIntegerFieldUpdater
  • AtomicLong
  • AtomicLongArray
  • AtomicLongFieldUpdater
  • AtomicMarkableReference
  • AtomicReference
  • AtomicReferenceArray
  • AtomicReferenceFieldUpdater
  • AtomicStampedReference
  • DoubleAccumulator
  • DoubleAdder
  • LongAccumulator
  • LongAdder

# 基本类型原子类

基本类型原子类一共有 3 种

  • AtomicInteger
  • AtomicBoolean
  • AtomicLong

常用 API

public final int get(); //获取当前的值
public final int getAndSet(int newValue);//获取当前的值,并设置新的值
public final int getAndIncrement();//获取当前的值,并自增
public final int getAndDecrement(); //获取当前的值,并自减
public final int getAndAdd(int delta); //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update); //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
1
2
3
4
5
6

使用案例

package com.atguigu.juc.senior.test2;

import lombok.Getter;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

class MyNumber
{
    @Getter
    private AtomicInteger atomicInteger = new AtomicInteger();
    public void addPlusPlus()
    {
        atomicInteger.incrementAndGet();
    }
}

/**
 * @auther zzyy
 * @create 2020-07-03 17:16
 */
public class AtomicIntegerDemo
{
    public static void main(String[] args) throws InterruptedException
    {
        MyNumber myNumber = new MyNumber();
        CountDownLatch countDownLatch = new CountDownLatch(100);

        for (int i = 1; i <=100; i++) {
            new Thread(() -> {
                try
                {
                    for (int j = 1; j <=5000; j++)
                    {
                        myNumber.addPlusPlus();
                    }
                }finally {
                    countDownLatch.countDown();
                }
            },String.valueOf(i)).start();
        }

        countDownLatch.await();

        System.out.println(myNumber.getAtomicInteger().get());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

tsleep→countDownLatch

# 数组类型原子类

数组类型原子类一共有 3 种

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

使用案例

package com.atguigu.juc.prepare;

import java.util.concurrent.atomic.AtomicIntegerArray;

/**
 * @auther zzyy
 * @create 2020-04-16 14:59
 */
public class AtomicIntegerArrayDemo
{
    public static void main(String[] args)
    {
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);
        //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
        //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1,2,3,4,5});

        for (int i = 0; i <atomicIntegerArray.length(); i++) {
            System.out.println(atomicIntegerArray.get(i));
        }
        System.out.println();
        System.out.println();
        System.out.println();
        int tmpInt = 0;

        tmpInt = atomicIntegerArray.getAndSet(0,1122);
        System.out.println(tmpInt+"\t"+atomicIntegerArray.get(0));
        atomicIntegerArray.getAndIncrement(1);
        atomicIntegerArray.getAndIncrement(1);
        tmpInt = atomicIntegerArray.getAndIncrement(1);
        System.out.println(tmpInt+"\t"+atomicIntegerArray.get(1));
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 引用类型原子类

引用类型原子类一个有三种

  • AtomicReference
  • AtomicStampedReference
  • AtomicMarkableReference

实际开发的过程中我们使用的不一定是 int、long 等基本数据类型,也有可能时 BigDecimal 这样的类型,这时就需要用到原子引用作为容器。原子引用设置值使用的是 unsafe.compareAndSwapObject() 方法。原子引用中表示数据的类型需要重写 equals() 方法。

# AtomicReference

使用案例

package com.atguigu.Interview.study.thread;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;

import java.util.concurrent.atomic.AtomicReference;

@Getter
@ToString
@AllArgsConstructor
class User
{
    String userName;
    int    age;
}

/**
 * @auther zzyy
 * @create 2018-12-31 17:22
 */
public class AtomicReferenceDemo
{
    public static void main(String[] args)
    {
        User z3 = new User("z3",24);
        User li4 = new User("li4",26);

        AtomicReference<User> atomicReferenceUser = new AtomicReference<>();

        atomicReferenceUser.set(z3);
        System.out.println(atomicReferenceUser.compareAndSet(z3,li4)+"\t"+atomicReferenceUser.get().toString());
        System.out.println(atomicReferenceUser.compareAndSet(z3,li4)+"\t"+atomicReferenceUser.get().toString());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

自旋锁 SpinLockDemo

package com.atguigu.Interview.study.thread;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @auther zzyy
 * @create 2018-12-28 17:57
 * 题目:实现一个自旋锁
 * 自旋锁好处:循环比较获取没有类似wait的阻塞。
 *
 * 通过CAS操作完成自旋锁,A线程先进来调用myLock方法自己持有锁5秒钟,B随后进来后发现
 * 当前有线程持有锁,不是null,所以只能通过自旋等待,直到A释放锁后B随后抢到。
 */
public class SpinLockDemo
{
    AtomicReference<Thread> atomicReference = new AtomicReference<>();

    public void myLock()
    {
        Thread thread = Thread.currentThread();
        System.out.println(Thread.currentThread().getName()+"\t come in");
        while(!atomicReference.compareAndSet(null,thread))
        {

        }
    }

    public void myUnLock()
    {
        Thread thread = Thread.currentThread();
        atomicReference.compareAndSet(thread,null);
        System.out.println(Thread.currentThread().getName()+"\t myUnLock over");
    }

    public static void main(String[] args)
    {
        SpinLockDemo spinLockDemo = new SpinLockDemo();

        new Thread(() -> {
            spinLockDemo.myLock();
            //暂停一会儿线程
            try { TimeUnit.SECONDS.sleep( 5 ); } catch (InterruptedException e) { e.printStackTrace(); }
            spinLockDemo.myUnLock();
        },"A").start();
        //暂停一会儿线程,保证A线程先于B线程启动并完成
        try { TimeUnit.SECONDS.sleep( 1 ); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            spinLockDemo.myLock();
            spinLockDemo.myUnLock();
        },"B").start();

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

# AtomicStampedReference

携带版本号的引用类型原子类,可以解决 ABA 问题,解决修改过几次,状态戳原子引用

# 解决 ABA 案例

package com.atguigu.juc.cas;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;

/**
 * @auther zzyy
 * @create 2021-03-18 15:34
 */
public class ABADemo
{
    static AtomicInteger atomicInteger = new AtomicInteger(100);
    static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100,1);

    public static void main(String[] args)
    {
        abaProblem();
        abaResolve();
    }

    public static void abaResolve()
    {
        new Thread(() -> {
            int stamp = atomicStampedReference.getStamp();
            System.out.println("t3 ----第1次stamp  "+stamp);
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            atomicStampedReference.compareAndSet(100,101,stamp,stamp+1);
            System.out.println("t3 ----第2次stamp  "+atomicStampedReference.getStamp());
            atomicStampedReference.compareAndSet(101,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
            System.out.println("t3 ----第3次stamp  "+atomicStampedReference.getStamp());
        },"t3").start();

        new Thread(() -> {
            int stamp = atomicStampedReference.getStamp();
            System.out.println("t4 ----第1次stamp  "+stamp);
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
            boolean result = atomicStampedReference.compareAndSet(100, 20210308, stamp, stamp + 1);
            System.out.println(Thread.currentThread().getName()+"\t"+result+"\t"+atomicStampedReference.getReference());
        },"t4").start();
    }

    public static void abaProblem()
    {
        new Thread(() -> {
            atomicInteger.compareAndSet(100,101);
            atomicInteger.compareAndSet(101,100);
        },"t1").start();

        try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            atomicInteger.compareAndSet(100,20210308);
            System.out.println(atomicInteger.get());
        },"t2").start();
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

# AtomicMarkableReference

原子更新带有标记位的引用类型对象,解决是否修改过,它的定义就是将状态戳简化为 true|false,类似一次性筷子。

状态戳 (true/false) 原子引用

# 解决 ABA 案例

AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A -> C ,通过 AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。

但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了 AtomicMarkableReference

package com.atguigu.juc.senior.inner.atomic;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicMarkableReference;
import java.util.concurrent.atomic.AtomicStampedReference;

/**
 * @auther zzyy
 * @create 2020-05-23 10:56
 */
public class ABADemo
{
    static AtomicInteger atomicInteger = new AtomicInteger(100);
    static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(100,1);
    static AtomicMarkableReference<Integer> markableReference = new AtomicMarkableReference<>(100,false);

    public static void main(String[] args)
    {
        new Thread(() -> {
            atomicInteger.compareAndSet(100,101);
            atomicInteger.compareAndSet(101,100);
            System.out.println(Thread.currentThread().getName()+"\t"+"update ok");
        },"t1").start();

        new Thread(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            atomicInteger.compareAndSet(100,2020);
        },"t2").start();

        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }

        System.out.println(atomicInteger.get());

        System.out.println();
        System.out.println();
        System.out.println();

        System.out.println("============以下是ABA问题的解决,让我们知道引用变量中途被更改了几次=========================");
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName()+"\t 1次版本号"+stampedReference.getStamp());
            //故意暂停200毫秒,让后面的t4线程拿到和t3一样的版本号
            try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); }

            stampedReference.compareAndSet(100,101,stampedReference.getStamp(),stampedReference.getStamp()+1);
            System.out.println(Thread.currentThread().getName()+"\t 2次版本号"+stampedReference.getStamp());
            stampedReference.compareAndSet(101,100,stampedReference.getStamp(),stampedReference.getStamp()+1);
            System.out.println(Thread.currentThread().getName()+"\t 3次版本号"+stampedReference.getStamp());
        },"t3").start();

        new Thread(() -> {
            int stamp = stampedReference.getStamp();
            System.out.println(Thread.currentThread().getName()+"\t =======1次版本号"+stamp);
            //暂停2秒钟,让t3先完成ABA操作了,看看自己还能否修改
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            boolean b = stampedReference.compareAndSet(100, 2020, stamp, stamp + 1);
            System.out.println(Thread.currentThread().getName()+"\t=======2次版本号"+stampedReference.getStamp()+"\t"+stampedReference.getReference());
        },"t4").start();

        System.out.println();
        System.out.println();
        System.out.println();

        System.out.println("============AtomicMarkableReference不关心引用变量更改过几次,只关心是否更改过======================");

        new Thread(() -> {
            boolean marked = markableReference.isMarked();
            System.out.println(Thread.currentThread().getName()+"\t 1次版本号"+marked);
            try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); }
            markableReference.compareAndSet(100,101,marked,!marked);
            System.out.println(Thread.currentThread().getName()+"\t 2次版本号"+markableReference.isMarked());
            markableReference.compareAndSet(101,100,markableReference.isMarked(),!markableReference.isMarked());
            System.out.println(Thread.currentThread().getName()+"\t 3次版本号"+markableReference.isMarked());
        },"t5").start();

        new Thread(() -> {
            boolean marked = markableReference.isMarked();
            System.out.println(Thread.currentThread().getName()+"\t 1次版本号"+marked);
            //暂停几秒钟线程
            try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); }
            markableReference.compareAndSet(100,2020,marked,!marked);
            System.out.println(Thread.currentThread().getName()+"\t"+markableReference.getReference()+"\t"+markableReference.isMarked());
        },"t6").start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

# 对象的属性修改原子类

  • AtomicIntegerFieldUpdater:原子更新对象中 int 类型字段的值
  • AtomicLongFieldUpdater:原子更新对象中 Long 类型字段的值
  • AtomicReferenceFieldUpdater:原子更新引用类型字段的值

以一种线程安全的方式操作非线程安全对象内的某些字段,更新的对象属性必须使用 public volatile 修饰符。因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater() 创建一个更新器,并且需要设置想要更新的类和属性。

使用案例

AtomicIntegerFieldUpdater

package com.atguigu.itdachang;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;


class BankAccount
{
    private String bankName = "CCB";//银行
    public volatile int money = 0;//钱数
    AtomicIntegerFieldUpdater<BankAccount> accountAtomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(BankAccount.class,"money");

    //不加锁+性能高,局部微创
    public void transferMoney(BankAccount bankAccount)
    {
        accountAtomicIntegerFieldUpdater.incrementAndGet(bankAccount);
    }
}

/**
 * @auther zzyy
 * @create 2020-07-14 18:06
 * 以一种线程安全的方式操作非线程安全对象的某些字段。
 * 需求:
 * 1000个人同时向一个账号转账一元钱,那么累计应该增加1000元,
 * 除了synchronized和CAS,还可以使用AtomicIntegerFieldUpdater来实现。
 */
public class AtomicIntegerFieldUpdaterDemo
{

    public static void main(String[] args)
    {
        BankAccount bankAccount = new BankAccount();

        for (int i = 1; i <=1000; i++) {
            int finalI = i;
            new Thread(() -> {
                bankAccount.transferMoney(bankAccount);
            },String.valueOf(i)).start();
        }

        //暂停毫秒
        try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }

        System.out.println(bankAccount.money);

    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

AtomicReferenceFieldUpdater

 
package com.atguigu.juc.atomics;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

class MyVar
{
    // 这里使用了volatile关键字
    public volatile Boolean isInit = Boolean.FALSE;
    AtomicReferenceFieldUpdater<MyVar,Boolean> atomicReferenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(MyVar.class,Boolean.class,"isInit");


    public void init(MyVar myVar)
    {
        if(atomicReferenceFieldUpdater.compareAndSet(myVar,Boolean.FALSE,Boolean.TRUE))
        {
            System.out.println(Thread.currentThread().getName()+"\t"+"---init.....");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(Thread.currentThread().getName()+"\t"+"---init.....over");
        }else{
            System.out.println(Thread.currentThread().getName()+"\t"+"------其它线程正在初始化");
        }
    }


}


/**
 * @auther zzyy
 * @create 2021-03-18 17:20
 * 多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作,要求只能初始化一次
 */
public class AtomicIntegerFieldUpdaterDemo
{
    public static void main(String[] args) throws InterruptedException
    {
        MyVar myVar = new MyVar();

        for (int i = 1; i <=5; i++) {
            new Thread(() -> {
                myVar.init(myVar);
            },String.valueOf(i)).start();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

# 原子操作增强类

原子操作增强类一共有 4 种

  • DoubleAccumulator
  • DoubleAdder
  • LongAccumulator
  • LongAdder

image-20231204043530011

  1. 热点商品点赞计算器,点赞数加加统计,不要求实时精确
  2. 一个很大的 List,里面都是 int 类型,如何实现加加,说说思路

常用 api

image-20231204043610315

  • LongAdder 只能用来计算加法,且从零开始计算
 package com.atguigu.juc.atomics;

import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;

/**
 * @auther zzyy
 * @create 2021-03-19 15:59
 */
public class LongAdderAPIDemo
{
    public static void main(String[] args)
    {
        LongAdder longAdder = new LongAdder();

        longAdder.increment();
        longAdder.increment();
        longAdder.increment();

        System.out.println(longAdder.longValue());

        LongAccumulator longAccumulator = new LongAccumulator((x,y) -> x * y,2);

        longAccumulator.accumulate(1);
        longAccumulator.accumulate(2);
        longAccumulator.accumulate(3);

        System.out.println(longAccumulator.longValue());

    }
}
 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
  • LongAccumulator 提供了自定义的函数操作

long 类型的聚合器,需要传入一个 long 类型的二元操作,可以用来计算各种聚合操作,包括加乘等

package com.atguigu.juc.senior.inner.atomic;

import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.LongBinaryOperator;

/**
 * @auther zzyy
 * @create 2020-05-30 13:51
 */
public class LongAccumulatorDemo
{

    LongAdder longAdder = new LongAdder();
    public void add_LongAdder()
    {
        longAdder.increment();
    }

    //LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y,0);
    LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator()
    {
        @Override
        public long applyAsLong(long left, long right)
        {
            return left - right;
        }
    },777);

    public void add_LongAccumulator()
    {
        longAccumulator.accumulate(1);
    }

    public static void main(String[] args)
    {
        LongAccumulatorDemo demo = new LongAccumulatorDemo();

        demo.add_LongAccumulator();
        demo.add_LongAccumulator();
        System.out.println(demo.longAccumulator.longValue());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

LongAdder 高性能对比 Code 演示

package com.zzyy.study.day524;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;

class ClickNumberNet
{
    int number = 0;
    public synchronized void clickBySync()
    {
        number++;
    }

    AtomicLong atomicLong = new AtomicLong(0);
    public void clickByAtomicLong()
    {
        atomicLong.incrementAndGet();
    }

    LongAdder longAdder = new LongAdder();
    public void clickByLongAdder()
    {
        longAdder.increment();
    }

    LongAccumulator longAccumulator = new LongAccumulator((x,y) -> x + y,0);
    public void clickByLongAccumulator()
    {
        longAccumulator.accumulate(1);
    }
}

/**
 * @auther zzyy
 * @create 2020-05-21 22:23
 * 50个线程,每个线程100W次,总点赞数出来
 */
public class LongAdderDemo2
{
    public static void main(String[] args) throws InterruptedException
    {
        ClickNumberNet clickNumberNet = new ClickNumberNet();

        long startTime;
        long endTime;
        CountDownLatch countDownLatch = new CountDownLatch(50);
        CountDownLatch countDownLatch2 = new CountDownLatch(50);
        CountDownLatch countDownLatch3 = new CountDownLatch(50);
        CountDownLatch countDownLatch4 = new CountDownLatch(50);


        startTime = System.currentTimeMillis();
        for (int i = 1; i <=50; i++) {
            new Thread(() -> {
                try
                {
                    for (int j = 1; j <=100 * 10000; j++) {
                        clickNumberNet.clickBySync();
                    }
                }finally {
                    countDownLatch.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t clickBySync result: "+clickNumberNet.number);

        startTime = System.currentTimeMillis();
        for (int i = 1; i <=50; i++) {
            new Thread(() -> {
                try
                {
                    for (int j = 1; j <=100 * 10000; j++) {
                        clickNumberNet.clickByAtomicLong();
                    }
                }finally {
                    countDownLatch2.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch2.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t clickByAtomicLong result: "+clickNumberNet.atomicLong);

        startTime = System.currentTimeMillis();
        for (int i = 1; i <=50; i++) {
            new Thread(() -> {
                try
                {
                    for (int j = 1; j <=100 * 10000; j++) {
                        clickNumberNet.clickByLongAdder();
                    }
                }finally {
                    countDownLatch3.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch3.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t clickByLongAdder result: "+clickNumberNet.longAdder.sum());

        startTime = System.currentTimeMillis();
        for (int i = 1; i <=50; i++) {
            new Thread(() -> {
                try
                {
                    for (int j = 1; j <=100 * 10000; j++) {
                        clickNumberNet.clickByLongAccumulator();
                    }
                }finally {
                    countDownLatch4.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch4.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t clickByLongAccumulator result: "+clickNumberNet.longAccumulator.longValue());


    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

# 原理分析

架构

image-20231204043748106

LongAdder 是 Striped64 的子类

image-20231204043800333

image-20231204043804737

LongAdder 为什么这么快?

image-20231204043818373

LongAdder 是 Striped64 的子类

# Striped64

Striped64 有几个比较重要的成员函数

 
/** Number of CPUS, to place bound on table size        CPU数量,即cells数组的最大长度 */
static final int NCPU = Runtime.getRuntime().availableProcessors();

/**
 * Table of cells. When non-null, size is a power of 2.
cells数组,为2的幂,2,4,8,16.....,方便以后位运算
 */
transient volatile Cell[] cells;

/**基础value值,当并发较低时,只累加该值主要用于没有竞争的情况,通过CAS更新。
 * Base value, used mainly when there is no contention, but also as
 * a fallback during table initialization races. Updated via CAS.
 */
transient volatile long base;

/**创建或者扩容Cells数组时使用的自旋锁变量调整单元格大小(扩容),创建单元格时使用的锁。
 * Spinlock (locked via CAS) used when resizing and/or creating Cells. 
 */
transient volatile int cellsBusy;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

最重要的 2 个

image-20231204043909492

Striped64 中一些变量或者方法的定义

  • base: 类似于 AtomicLong 中全局的 value 值。在没有竞争情况下数据直接累加到 base 上,或者 cells 扩容时,也需要将数据写入到 base 上
  • collide: 表示扩容意向,false 一定不会扩容,true 可能会扩容。
  • cellsBusy: 初始化 ce 川 s 或者扩容 ce 川 s 需要获取锁,0:表示无锁状态 1:表示其他线程已经持有了锁
  • casCellsBusy (): 通过 CAS 操作修改 cellsBusy 的值,CAS 成功代表获取锁,返回 true
  • NCPU: 当前计算机 CPU 数量,Ce 川数组扩容时会使用到
  • getProbe (): 获取当前线程的 hash 值
  • advanceProbe (): 重置当前线程的 hash 值

# Cell

Cell 是 java.util.concurrent.atomic 下 Striped64 的一个内部类

image-20231204050136219

# LongAdder 为什么这么快

LongAdder 的基本思路就是分散热点,将 value 值分散到一个 Cell 数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行 CAS 操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的 long 值,只要将各个槽中的变量值累加返回。

sum () 会将所有 Cell 数组中的 value 和 base 累加作为返回值,核心的思想就是将之前 AtomicLong 一个 value 的更新压力分散到多个 value 中去,从而降级更新热点。

image-20231204050208638

 Value = Base +∑i=nnCell[i]

内部有一个 base 变量,一个 Cell [] 数组。

  • base 变量:非竞态条件下,直接累加到该变量上
  • Cell [] 数组:竞态条件下,累加个各个线程自己的槽 Cell [i] 中

# 源码解读

LongAdder 在无竞争的情况,跟 AtomicLong 一样,对同一个 base 进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组 cells,将一个 value 拆分进这个数组 cells。多个线程需要同时对 value 进行操作时候,可以对线程 id 进行 hash 得到 hash 值,再根据 hash 值映射到这个数组 cells 的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组 cells 的所有值和无竞争值 base 都加起来作为最终结果。

image-20231204050337078

 Value = Base +∑i=nnCell[i]

# increment()

# add(1L)

image-20231204083948740

  • as:表示 cells 引用
  • b:表示获取的 base 值
  • V:表示期望值,
  • m:表示 cells 数组的长度
  • a:表示当前线程命中的 cell 单元格

image-20231204084101269

image-20231204084106598

  1. 最初无竞争时只更新 base;
  2. 如果更新 base 失败后,首次新建一个 Cell [] 数组
  3. 当多个线程竞争同一个 Cell 比较激烈时,可能就要对 Cell [] 扩容
# longAccumulate

longAccumulate 入参说明

image-20231204084207786

Striped64 中一些变量或者方法的定义

image-20231204084220233

线程 hash 值:probe

image-20231204084249709

image-20231204084254799

image-20231204084259971

image-20231204084304796

总纲

image-20231204084330114

上述代码首先给当前线程分配一个 hash 值,然后进入一个 for (;;) 自旋,这个自旋分为三个分支:

  • CASE1:Cell [] 数组已经初始化
  • CASE2:Cell [] 数组未初始化 (首次新建)
  • CASE3:Cell [] 数组正在初始化中
# 计算

刚刚要初始化 Cell [] 数组 (首次新建),未初始化过 Cell [] 数组,尝试占有锁并首次初始化 cells 数组

image-20231204084418935

如果上面条件都执行成功就会执行数组的初始化及赋值操作, Cell [] rs = new Cell [2] 表示数组的长度为 2, rs [h & 1] = new Cell (x) 表示创建一个新的 Cell 元素,value 是 x 值,默认为 1。

h & 1 类似于我们之前 HashMap 常用到的计算散列桶 index 的算法,通常都是 hash & (table.len - 1)。同 hashmap 一个意思。

兜底,多个线程尝试 CAS 修改失败的线程会走到这个分支

image-20231204084444310

该分支实现直接操作 base 基数,将值累加到 base 上,也即其它线程正在初始化,多个线程正在更新 base 的值。

Cell 数组不再为空且可能存在 Cell 数组扩容,多个线程同时命中一个 cell 的竞争。

image-20231204084506387

image-20231204084518122

上面代码判断当前线程 hash 后指向的数据位置元素是否为空,如果为空则将 Cell 数据放入数组中,跳出循环。如果不空则继续循环。

image-20231204084532174

image-20231204084545460

说明当前线程对应的数组中有了数据,也重置过 hash 值, 这时通过 CAS 操作尝试对当前数中的 value 值进行累加 x 操作,x 默认为 1,如果 CAS 成功则直接跳出循环。

image-20231204084558277

image-20231204084604647

image-20231204084617355

流程图

image-20231204084625913

# sum

sum () 会将所有 Cell 数组中的 value 和 base 累加作为返回值。 核心的思想就是将之前 AtomicLong 一个 value 的更新压力分散到多个 value 中去,从而降级更新热点。

为啥在并发情况下 sum 的值不精确

sum 执行时,并没有限制对 base 和 cells 的更新 (一句要命的话)。所以 LongAdder 不是强一致性的,它是最终一致性的。

首先,最终返回的 sum 局部变量,初始被复制为 base,而最终返回时,很可能 base 已经被更新了,而此时局部变量 sum 不会更新,造成不一致。 其次,这里对 cell 的读取也无法保证是最后一次写入的值。所以,sum 方法在没有并发的情况下,可以获得正确的结果。

image-20231204084905784

# 使用总结

  • AtomicLong:
    • 线程安全,可允许一些性能损耗,要求高精度时可使用
    • 保证精度,性能代价
    • AtomicLong 是多个线程针对单个热点值 value 进行原子操作
  • LongAdder:
    • 当需要在高并发下有较好的性能表现,且对值的精确度要求不高时,可以使用
    • 保证性能,精度代价
    • LongAdder 是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行 CAS 操作

# 总结

# AtomicLong

  • 原理
    • CAS + 自旋
    • incrementAndGet
  • 场景
    • 低并发下的全局计算
    • AtomicLong 能保证并发情况下计数的准确性,其内部通过 CAS 来解决并发安全性的问题。
  • 缺陷
    • 高并发后性能急剧下降
    • AtomicLong 的自旋会成为瓶颈,N 个线程 CAS 操作修改线程的值,每次只有一个成功过,其它 N - 1 失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子 cpu 就打高了。

# LongAdder vs AtomicLong Performance

http://blog.palominolabs.com/2014/02/10/java-8-performance-improvements-longadder-vs-atomiclong/

# LongAdder

  • 原理
    • CAS+Base+Cell 数组分散
    • 空间换时间并分散了热点数据
  • 场景:高并发下的全局计算
  • 缺陷:sum 求和后还有计算线程修改结果的话,最后结果不够准确
编辑 (opens new window)
上次更新: 2025/01/01, 10:09:39
CAS
ThreadLocal

← CAS ThreadLocal→

最近更新
01
k8s
06-06
02
进程与线程
03-04
03
计算机操作系统概述
02-26
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Iekr | Blog
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式