ReentrantLock、ReentrantReadWriteLock、StampedLock讲解
# ReentrantLock、ReentrantReadWriteLock、StampedLock 讲解
本章路线总纲:无锁→独占锁→读写锁→邮戳锁
# ReentrantLock
相对于 synchronized 它具备如下特点
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
与 synchronized 一样,都支持可重入
基本语法
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
2
3
4
5
6
7
8
# 可重入
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁,如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
method1();
}
public static void method1() {
lock.lock();
try {
log.debug("execute method1");
method2();
} finally {
lock.unlock();
}
}
public static void method2() {
lock.lock();
try {
log.debug("execute method2");
method3();
} finally {
lock.unlock();
}
}
public static void method3() {
lock.lock();
try {
log.debug("execute method3");
} finally {
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
输出
17:59:11.862 [main] c.TestReentrant - execute method1
17:59:11.865 [main] c.TestReentrant - execute method2
17:59:11.865 [main] c.TestReentrant - execute method3
2
3
# 可打断
可打断指的是处于阻塞状态等待锁的线程可以被打断等待。注意 lock.lockInterruptibly() 和 lock.trylock() 方法是可打断的, lock.lock() 不是。可打断的意义在于避免得不到锁的线程无限制地等待下去,防止死锁的一种方式。
示例
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("等锁的过程中被打断");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(1);
t1.interrupt();
log.debug("执行打断");
} finally {
lock.unlock();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
输出
18:02:40.520 [main] c.TestInterrupt - 获得了锁
18:02:40.524 [t1] c.TestInterrupt - 启动...
18:02:41.530 [main] c.TestInterrupt - 执行打断
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchr
onizer.java:898)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchron
izer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at cn.itcast.n4.reentrant.TestInterrupt.lambda$main$0(TestInterrupt.java:17)
at java.lang.Thread.run(Thread.java:748)
18:02:41.532 [t1] c.TestInterrupt - 等锁的过程中被打断
2
3
4
5
6
7
8
9
10
11
12
13
14
注意如果是不可中断模式,那么即使使用了 interrupt 也不会让等待中断
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
lock.lock();
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(1);
t1.interrupt();
log.debug("执行打断");
sleep(1);
} finally {
log.debug("释放了锁");
lock.unlock();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
输出
18:06:56.261 [main] c.TestInterrupt - 获得了锁
18:06:56.265 [t1] c.TestInterrupt - 启动...
18:06:57.266 [main] c.TestInterrupt - 执行打断 // 这时 t1 并没有被真正打断, 而是仍继续等待锁
18:06:58.267 [main] c.TestInterrupt - 释放了锁
18:06:58.267 [t1] c.TestInterrupt - 获得了锁
2
3
4
5
# 锁超时
立刻失败
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
if (!lock.tryLock()) {
log.debug("获取立刻失败,返回");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(2);
} finally {
lock.unlock();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
输出
18:15:02.918 [main] c.TestTimeout - 获得了锁
18:15:02.921 [t1] c.TestTimeout - 启动...
18:15:02.921 [t1] c.TestTimeout - 获取立刻失败,返回
2
3
超时失败
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("获取等待 1s 后失败,返回");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(2);
} finally {
lock.unlock();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
输出
18:19:40.537 [main] c.TestTimeout - 获得了锁
18:19:40.544 [t1] c.TestTimeout - 启动...
18:19:41.547 [t1] c.TestTimeout - 获取等待 1s 后失败,返回
2
3
# 哲学家就餐问题

有五位哲学家,围坐在圆桌旁。
- 他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭后接着思考。
- 吃饭时要用两根筷子吃,桌上共有 5 根筷子,每位哲学家左右手边各有一根筷子。
- 如果筷子被身边的人拿着,自己就得等待
筷子类
class Chopstick {
String name;
public Chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "筷子{" + name + '}';
}
}
2
3
4
5
6
7
8
9
10
11
12
哲学家类
class Philosopher extends Thread {
Chopstick left;
Chopstick right;
public Philosopher(String name, Chopstick left, Chopstick right) {
super(name);
this.left = left;
this.right = right;
}
private void eat() {
log.debug("eating...");
Sleeper.sleep(1);
}
@Override
public void run() {
while (true) {
// 获得左手筷子
synchronized (left) {
// 获得右手筷子
synchronized (right) {
// 吃饭
eat();
}
// 放下右手筷子
}
// 放下左手筷子
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
就餐
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c5, c1).start();
2
3
4
5
6
7
8
9
10
11
执行不多会,就执行不下去了
12:33:15.575 [苏格拉底] c.Philosopher - eating...
12:33:15.575 [亚里士多德] c.Philosopher - eating...
12:33:16.580 [阿基米德] c.Philosopher - eating...
12:33:17.580 [阿基米德] c.Philosopher - eating...
// 卡在这里, 不向下运行
2
3
4
5
使用 jconsole 检测死锁,发现
-------------------------------------------------------------------------
名称: 阿基米德
状态: cn.itcast.Chopstick@1540e19d (筷子1) 上的BLOCKED, 拥有者: 苏格拉底
总阻止数: 2, 总等待数: 1
堆栈跟踪:
cn.itcast.Philosopher.run(TestDinner.java:48)
- 已锁定 cn.itcast.Chopstick@6d6f6e28 (筷子5)
-------------------------------------------------------------------------
名称: 苏格拉底
状态: cn.itcast.Chopstick@677327b6 (筷子2) 上的BLOCKED, 拥有者: 柏拉图
总阻止数: 2, 总等待数: 1
堆栈跟踪:
cn.itcast.Philosopher.run(TestDinner.java:48)
- 已锁定 cn.itcast.Chopstick@1540e19d (筷子1)
-------------------------------------------------------------------------
名称: 柏拉图
状态: cn.itcast.Chopstick@14ae5a5 (筷子3) 上的BLOCKED, 拥有者: 亚里士多德
总阻止数: 2, 总等待数: 0
堆栈跟踪:
cn.itcast.Philosopher.run(TestDinner.java:48)
- 已锁定 cn.itcast.Chopstick@677327b6 (筷子2)
-------------------------------------------------------------------------
名称: 亚里士多德
状态: cn.itcast.Chopstick@7f31245a (筷子4) 上的BLOCKED, 拥有者: 赫拉克利特
总阻止数: 1, 总等待数: 1
堆栈跟踪:
cn.itcast.Philosopher.run(TestDinner.java:48)
- 已锁定 cn.itcast.Chopstick@14ae5a5 (筷子3)
-------------------------------------------------------------------------
名称: 赫拉克利特
状态: cn.itcast.Chopstick@6d6f6e28 (筷子5) 上的BLOCKED, 拥有者: 阿基米德
总阻止数: 2, 总等待数: 0
堆栈跟踪:
cn.itcast.Philosopher.run(TestDinner.java:48)
- 已锁定 cn.itcast.Chopstick@7f31245a (筷子4)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
这种线程没有按预期结束,执行不下去的情况,归类为【活跃性】问题,除了死锁以外,还有活锁和饥饿者两种情
# 使用 tryLock 解决哲学家就餐问题
筷子类
class Chopstick extends ReentrantLock {
String name;
public Chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "筷子{" + name + '}';
}
}
2
3
4
5
6
7
8
9
10
哲学家类
class Philosopher extends Thread {
Chopstick left;
Chopstick right;
public Philosopher(String name, Chopstick left, Chopstick right) {
super(name);
this.left = left;
this.right = right;
}
@Override
public void run() {
while (true) {
// 尝试获得左手筷子
if (left.tryLock()) {
try {
// 尝试获得右手筷子
if (right.tryLock()) {
try {
eat();
} finally {
right.unlock();
}
}
} finally {
left.unlock();
}
}
}
}
private void eat() {
log.debug("eating...");
Sleeper.sleep(1);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 公平锁
ReentrantLock 默认是不公平的
ReentrantLock lock = new ReentrantLock(false);
lock.lock();
for (int i = 0; i < 500; i++) {
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "t" + i).start();
}
// 1s 之后去争抢锁
Thread.sleep(1000);
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " start...");
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "强行插入").start();
lock.unlock();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
强行插入,有机会在中间输出,该实验不一定总能复现
t39 running...
t40 running...
t41 running...
t42 running...
t43 running...
强行插入 start...
强行插入 running...
t44 running...
t45 running...
t46 running...
t47 running...
t49 running...
2
3
4
5
6
7
8
9
10
11
12
改为公平锁后
ReentrantLock lock = new ReentrantLock(true);
强行插入,总是在最后输出
t465 running...
t464 running...
t477 running...
t442 running...
t468 running...
t493 running...
t482 running...
t485 running...
t481 running...
强行插入 running...
2
3
4
5
6
7
8
9
10
公平锁一般没有必要,会降低并发度。
# 条件变量
synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入 waitSet 等待
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比
- synchronized 是那些不满足条件的线程都在一间休息室等消息
- 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤 醒
# 使用要点
- await 前需要获得锁
- await 执行后,会释放锁,进入 conditionObject 等待
- await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
- 竞争 lock 锁成功后,从 await 后继续执行
# 详细 API
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
/*
* <pre> {@code
* boolean aMethod(long timeout, TimeUnit unit) {
* long nanos = unit.toNanos(timeout);
* lock.lock();
* try {
* while (!conditionBeingWaitedFor()) {
* if (nanos <= 0L)
* return false;
* nanos = theCondition.awaitNanos(nanos);
* }
* // ...
* } finally {
* lock.unlock();
* }
* }}</pre>
*
* @param nanosTimeout the maximum time to wait, in nanoseconds
* @return an estimate of the {@code nanosTimeout} value minus
* the time spent waiting upon return from this method.
* A positive value may be used as the argument to a
* subsequent call to this method to finish waiting out
* the desired time. A value less than or equal to zero
* indicates that no time remains.
* @throws InterruptedException if the current thread is interrupted
* (and interruption of thread suspension is supported)
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;
/**
* Causes the current thread to wait until it is signalled or interrupted,
* or the specified waiting time elapses. This method is behaviorally
* equivalent to:
* <pre> {@code awaitNanos(unit.toNanos(time)) > 0}</pre>
*
* @param time the maximum time to wait
* @param unit the time unit of the {@code time} argument
* @return {@code false} if the waiting time detectably elapsed
* before return from the method, else {@code true}
* @throws InterruptedException if the current thread is interrupted
* (and interruption of thread suspension is supported)
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;
/**
* Causes the current thread to wait until it is signalled or interrupted,
* or the specified deadline elapses.
*
* <pre> {@code
* boolean aMethod(Date deadline) {
* boolean stillWaiting = true;
* lock.lock();
* try {
* while (!conditionBeingWaitedFor()) {
* if (!stillWaiting)
* return false;
* stillWaiting = theCondition.awaitUntil(deadline);
* }
* // ...
* } finally {
* lock.unlock();
* }
* }}</pre>
* @param deadline the absolute time to wait until
* @return {@code false} if the deadline has elapsed upon return, else
* {@code true}
* @throws InterruptedException if the current thread is interrupted
* (and interruption of thread suspension is supported)
*/
boolean awaitUntil(Date deadline) throws InterruptedException;
/**
* Wakes up one waiting thread.
*/
void signal();
/**
* Wakes up all waiting threads.
*/
void signalAll();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
例子:
static ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;
public static void main(String[] args) {
new Thread(() -> {
try {
lock.lock();
while (!hasCigrette) {
try {
waitCigaretteQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的烟");
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
try {
lock.lock();
while (!hasBreakfast) {
try {
waitbreakfastQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的早餐");
} finally {
lock.unlock();
}
}).start();
sleep(1);
sendBreakfast();
sleep(1);
sendCigarette();
}
private static void sendCigarette() {
lock.lock();
try {
log.debug("送烟来了");
hasCigrette = true;
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}
private static void sendBreakfast() {
lock.lock();
try {
log.debug("送早餐来了");
hasBreakfast = true;
waitbreakfastQueue.signal();
} finally {
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
输出
18:52:27.680 [main] c.TestCondition - 送早餐来了
18:52:27.682 [Thread-1] c.TestCondition - 等到了它的早餐
18:52:28.683 [main] c.TestCondition - 送烟来了
18:52:28.683 [Thread-0] c.TestCondition - 等到了它的烟
2
3
4
# 同步模式之顺序控制
# 固定运行顺序
比如,必须先 2 后 1 打印
# wait notify 版
// 用来同步的对象
static Object obj = new Object();
// t2 运行标记, 代表 t2 是否执行过
static boolean t2runed = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (obj) {
// 如果 t2 没有执行过
while (!t2runed) {
try {
// t1 先等一会
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println(1);
});
Thread t2 = new Thread(() -> {
System.out.println(2);
synchronized (obj) {
// 修改运行标记
t2runed = true;
// 通知 obj 上等待的线程(可能有多个,因此需要用 notifyAll)
obj.notifyAll();
}
});
t1.start();
t2.start();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# Park Unpark 版
可以看到,实现上很麻烦:
- 首先,需要保证先 wait 再 notify,否则 wait 线程永远得不到唤醒。因此使用了『运行标记』来判断该不该 wait
- 第二,如果有些干扰线程错误地 notify 了 wait 线程,条件不满足时还要重新等待,使用了 while 循环来解决 此问题
- 最后,唤醒对象上的 wait 线程需要使用 notifyAll,因为『同步对象』上的等待线程可能不止一个
可以使用 LockSupport 类的 park 和 unpark 来简化上面的题目:
Thread t1 = new Thread(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) { }
// 当没有『许可』时,当前线程暂停运行;有『许可』时,用掉这个『许可』,当前线程恢复运行
LockSupport.park();
System.out.println("1");
});
Thread t2 = new Thread(() -> {
System.out.println("2");
// 给线程 t1 发放『许可』(多次连续调用 unpark 只会发放一个『许可』)
LockSupport.unpark(t1);
});
t1.start();
t2.start();
2
3
4
5
6
7
8
9
10
11
12
13
park 和 unpark 方法比较灵活,他俩谁先调用,谁后调用无所谓。并且是以线程为单位进行『暂停』和『恢复』, 不需要『同步对象』和『运行标记』
# 交替输出
线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现
# wait notify 版
class SyncWaitNotify {
private int flag;
private int loopNumber;
public SyncWaitNotify(int flag, int loopNumber) {
this.flag = flag;
this.loopNumber = loopNumber;
}
public void print(int waitFlag, int nextFlag, String str) {
for (int i = 0; i < loopNumber; i++) {
synchronized (this) {
while (this.flag != waitFlag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(str);
flag = nextFlag;
this.notifyAll();
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1, 5);
new Thread(() -> {
syncWaitNotify.print(1, 2, "a");
}).start();
new Thread(() -> {
syncWaitNotify.print(2, 3, "b");
}).start();
new Thread(() -> {
syncWaitNotify.print(3, 1, "c");
}).start();
2
3
4
5
6
7
8
9
10
# Lock 条件变量版
class AwaitSignal extends ReentrantLock {
public void start(Condition first) {
this.lock();
try {
log.debug("start");
first.signal();
} finally {
this.unlock();
}
}
public void print(String str, Condition current, Condition next) {
for (int i = 0; i < loopNumber; i++) {
this.lock();
try {
current.await();
log.debug(str);
next.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
this.unlock();
}
}
}
// 循环次数
private int loopNumber;
public AwaitSignal(int loopNumber) {
this.loopNumber = loopNumber;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
AwaitSignal as = new AwaitSignal(5);
Condition aWaitSet = as.newCondition();
Condition bWaitSet = as.newCondition();
Condition cWaitSet = as.newCondition();
new Thread(() -> {
as.print("a", aWaitSet, bWaitSet);
}).start();
new Thread(() -> {
as.print("b", bWaitSet, cWaitSet);
}).start();
new Thread(() -> {
as.print("c", cWaitSet, aWaitSet);
}).start();
as.start(aWaitSet);
2
3
4
5
6
7
8
9
10
11
12
13
14
注意:该实现没有考虑 a,b,c 线程都就绪再开始
# Park Unpark 版
class SyncPark {
private int loopNumber;
private Thread[] threads;
public SyncPark(int loopNumber) {
this.loopNumber = loopNumber;
}
public void setThreads(Thread... threads) {
this.threads = threads;
}
public void print(String str) {
for (int i = 0; i < loopNumber; i++) {
LockSupport.park();
System.out.print(str);
LockSupport.unpark(nextThread());
}
}
private Thread nextThread() {
Thread current = Thread.currentThread();
int index = 0;
for (int i = 0; i < threads.length; i++) {
if(threads[i] == current) {
index = i;
break;
}
}
if(index < threads.length - 1) {
return threads[index+1];
} else {
return threads[0];
}
}
public void start() {
for (Thread thread : threads) {
thread.start();
}
LockSupport.unpark(threads[0]);
}
}
SyncPark syncPark = new SyncPark(5);
Thread t1 = new Thread(() -> {
syncPark.print("a");
});
Thread t2 = new Thread(() -> {
syncPark.print("b");
});
Thread t3 = new Thread(() -> {
syncPark.print("c\n");
});
syncPark.setThreads(t1, t2, t3);
syncPark.start();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# ReentrantReadWriteLock
读写锁定义为一个资源能够被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程。
『读写锁 ReentrantReadWriteLock』并不是真正意义上的读写分离,它只允许读读共存,而读写和写写依然是互斥的,大多实际场景是 “读 / 读” 线程间并不存在互斥关系,只有 "读 / 写" 线程或 "写 / 写" 线程间的操作需要互斥的。因此引入 ReentrantReadWriteLock。
一个 ReentrantReadWriteLock 同时只能存在一个写锁但是可以存在多个读锁,但不能同时存在写锁和读锁 (切菜还是拍蒜选一个)。也即一个资源可以被多个读操作访问或一个写操作访问,但两者不能同时进行。只有在读多写少情境之下,读写锁才具有较高的性能体现。
# 特点
- 可重入
- 读写分离
# 升级
无锁无序→加锁→读写锁
package com.atguigu.juc.rwlock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class MyResource
{
Map<String,String> map = new HashMap<>();
//=====ReentrantLock 等价于 =====synchronized
Lock lock = new ReentrantLock();
//=====ReentrantReadWriteLock 一体两面,读写互斥,读读共享
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public void write(String key,String value)
{
rwLock.writeLock().lock();
try
{
System.out.println(Thread.currentThread().getName()+"\t"+"---正在写入");
map.put(key,value);
//暂停毫秒
try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t"+"---完成写入");
}finally {
rwLock.writeLock().unlock();
}
}
public void read(String key)
{
rwLock.readLock().lock();
try
{
System.out.println(Thread.currentThread().getName()+"\t"+"---正在读取");
String result = map.get(key);
//后续开启注释修改为2000,演示一体两面,读写互斥,读读共享,读没有完成时候写锁无法获得
//try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t"+"---完成读取result:"+result);
}finally {
rwLock.readLock().unlock();
}
}
}
/**
* @auther zzyy
* @create 2021-03-28 11:04
*/
public class ReentrantReadWriteLockDemo
{
public static void main(String[] args)
{
MyResource myResource = new MyResource();
for (int i = 1; i <=10; i++) {
int finalI = i;
new Thread(() -> {
myResource.write(finalI +"", finalI +"");
},String.valueOf(i)).start();
}
for (int i = 1; i <=10; i++) {
int finalI = i;
new Thread(() -> {
myResource.read(finalI +"");
},String.valueOf(i)).start();
}
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
//读全部over才可以继续写
for (int i = 1; i <=3; i++) {
int finalI = i;
new Thread(() -> {
myResource.write(finalI +"", finalI +"");
},"newWriteThread==="+String.valueOf(i)).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# 降级
从写锁→读锁,ReentrantReadWriteLock 可以降级
《Java 并发编程的艺术》中关于锁降级的说明:
锁的严苛程度变强叫做升级,反之叫做降级

锁降级:将写入锁降级为读锁 (类似 Linux 文件读写权限理解,就像写权限要高于读权限一样)
可以降级,锁降级:遵循获取写锁→再获取读锁→再释放写锁的次序,写锁能够降级成为读锁。 如果一个线程占有了写锁,在不释放写锁的情况下,它还能占有读锁,即写锁降级为读锁。

Java8 官网说明

重入还允许通过获取写入锁定,然后读取锁然后释放写锁从写锁到读取锁,但是,从读锁定升级到写锁是不可能的。
锁降级是为了让当前线程感知到数据的变化,目的是保证数据可见性
演示
package com.atguigu.juc.rwlock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @auther zzyy
* @create 2021-03-28 10:18
* 锁降级:遵循获取写锁→再获取读锁→再释放写锁的次序,写锁能够降级成为读锁。
*
* 如果一个线程占有了写锁,在不释放写锁的情况下,它还能占有读锁,即写锁降级为读锁。
*/
public class LockDownGradingDemo
{
public static void main(String[] args)
{
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
writeLock.lock();
System.out.println("-------正在写入");
readLock.lock();
System.out.println("-------正在读取");
writeLock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
如果有线程在读,那么写线程是无法获取写锁的,是悲观锁的策略
不可锁升级
线程获取读锁是不能直接升级为写入锁的。

在 ReentrantReadWriteLock 中,当读锁被使用时,如果有线程尝试获取写锁,该写线程会被阻塞。所以,需要释放所有读锁,才可获取写锁。

# 写锁和读锁是互斥的
写锁和读锁是互斥的(这里的互斥是指线程间的互斥,当前线程可以获取到写锁又获取到读锁,但是获取到了读锁不能继续获取写锁),这是因为读写锁要保持写操作的可见性。 因为,如果允许读锁在被获取的情况下对写锁的获取,那么正在运行的其他读线程无法感知到当前写线程的操作。
因此,分析读写锁 ReentrantReadWriteLock,会发现它有个潜在的问题:读锁全完,写锁有望;写锁独占,读写全堵;
如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,见前面演示中的 LockDownGradingDemo,即 ReadWriteLock 读的过程中不允许写,只有等待线程都释放了读锁,当前线程才能获取写锁,也就是写入必须等待,这是一种悲观的读锁,人家还在读着那,你先别去写,省的数据乱。
分析 StampedLock,会发现它改进之处在于:
读的过程中也允许获取写锁介入 (读和写两个操作也让你 “共享”(注意引号)),这样会导致我们读的数据就可能不一致! 所以,需要额外的方法来判断读的过程中是否有写入,这是一种乐观的读锁
显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。
# 源码总结
读写锁之读写规矩,再说降级
锁降级 下面的示例代码摘自 ReentrantWriteReadLock 源码中: ReentrantWriteReadLock 支持锁降级,遵循按照获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁,不支持锁升级。

- 代码中声明了一个 volatile 类型的 cacheValid 变量,保证其可见性。
- 首先获取读锁,如果 cache 不可用,则释放读锁,获取写锁,在更改数据之前,再检查一次 cacheValid 的值,然后修改数据,将 cacheValid 置为 true,然后在释放写锁前获取读锁;此时,cache 中数据可用,处理 cache 中数据,最后释放读锁。这个过程就是一个完整的锁降级的过程,目的是保证数据可见性。
如果违背锁降级的步骤 如果当前的线程 C 在修改完 cache 中的数据后,没有获取读锁而是直接释放了写锁,那么假设此时另一个线程 D 获取了写锁并修改了数据,那么 C 线程无法感知到数据已被修改,则数据出现错误。
如果遵循锁降级的步骤 线程 C 在释放写锁之前获取读锁,那么线程 D 在获取写锁时将被阻塞,直到线程 C 完成数据处理过程,释放读锁。这样可以保证返回的数据是这次更新的数据,该机制是专门为了缓存设计的。
# 邮戳锁 StampedLock
无锁→独占锁→读写锁→邮戳锁
StampedLock 是 JDK1.8 中新增的一个读写锁,也是对 JDK1.5 中的读写锁 ReentrantReadWriteLock 的优化。
邮戳锁也叫票据锁,stamp(戳记,long 类型),代表了锁的状态。当 stamp 返回零时,表示线程获取锁失败。 并且,当释放锁或者转换锁的时候,都要传入最初获取的 stamp 值。
# 锁饥饿问题
它是由锁饥饿问题引出
锁饥饿问题:
ReentrantReadWriteLock 实现了读写分离,但是一旦读操作比较多的时候,想要获取写锁就变得比较困难了, 假如当前 1000 个线程,999 个读,1 个写,有可能 999 个读取线程长时间抢到了锁,那 1 个写线程就悲剧了 因为当前有可能会一直存在读锁,而无法获得写锁,根本没机会写。
# 如何缓解锁饥饿问题?
- 使用 “公平” 策略可以一定程度上缓解这个问题
new ReentrantReadWriteLock(true); - 但是 “公平” 策略是以牺牲系统吞吐量为代价的
# StampedLock 类
StampedLock 类的乐观读锁闪亮登场
ReentrantReadWriteLock 允许多个线程同时读,但是只允许一个线程写,在线程获取到写锁的时候,其他写操作和读操作都会处于阻塞状态, 读锁和写锁也是互斥的,所以在读的时候是不允许写的,读写锁比传统的 synchronized 速度要快很多, 原因就是在于 ReentrantReadWriteLock 支持读并发
StampedLock 横空出世 ReentrantReadWriteLock 的读锁被占用的时候,其他线程尝试获取写锁的时候会被阻塞。 但是,StampedLock 采取乐观获取锁后,其他线程尝试获取写锁时不会被阻塞,这其实是对读锁的优化, 所以,在获取乐观读锁后,还需要对结果进行校验。
# StampedLock 的特点
- 所有获取锁的方法,都返回一个邮戳(Stamp),Stamp 为零表示获取失败,其余都表示成功;
- 所有释放锁的方法,都需要一个邮戳(Stamp),这个 Stamp 必须是和成功获取锁时得到的 Stamp 一致;
- StampedLock 是不可重入的,危险 (如果一个线程已经持有了写锁,再去获取写锁的话就会造成死锁)
StampedLock 有三种访问模式
- Reading(读模式):功能和 ReentrantReadWriteLock 的读锁类似
- Writing(写模式):功能和 ReentrantReadWriteLock 的写锁类似
- Optimistic reading(乐观读模式):无锁机制,类似于数据库中的乐观锁,支持读写并发,很乐观认为读取时没人修改,假如被修改再实现升级为悲观读模式
乐观读模式演示,读的过程中也允许获取写锁介入
package com.atguigu.itdachang;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;
/**
* @auther zzyy
* @create 2020-07-22 16:03
*/
public class StampedLockDemo
{
static int number = 37;
static StampedLock stampedLock = new StampedLock();
public void write()
{
long stamp = stampedLock.writeLock();
System.out.println(Thread.currentThread().getName()+"\t"+"=====写线程准备修改");
try
{
number = number + 13;
}catch (Exception e){
e.printStackTrace();
}finally {
stampedLock.unlockWrite(stamp);
}
System.out.println(Thread.currentThread().getName()+"\t"+"=====写线程结束修改");
}
//悲观读
public void read()
{
long stamp = stampedLock.readLock();
System.out.println(Thread.currentThread().getName()+"\t come in readlock block,4 seconds continue...");
//暂停几秒钟线程
for (int i = 0; i <4 ; i++) {
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t 正在读取中......");
}
try
{
int result = number;
System.out.println(Thread.currentThread().getName()+"\t"+" 获得成员变量值result:" + result);
System.out.println("写线程没有修改值,因为 stampedLock.readLock()读的时候,不可以写,读写互斥");
}catch (Exception e){
e.printStackTrace();
}finally {
stampedLock.unlockRead(stamp);
}
}
//乐观读
public void tryOptimisticRead()
{
long stamp = stampedLock.tryOptimisticRead();
int result = number;
//间隔4秒钟,我们很乐观的认为没有其他线程修改过number值,实际靠判断。
System.out.println("4秒前stampedLock.validate值(true无修改,false有修改)"+"\t"+stampedLock.validate(stamp));
for (int i = 1; i <=4 ; i++) {
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t 正在读取中......"+i+
"秒后stampedLock.validate值(true无修改,false有修改)"+"\t"
+stampedLock.validate(stamp));
}
if(!stampedLock.validate(stamp)) {
System.out.println("有人动过--------存在写操作!");
stamp = stampedLock.readLock();
try {
System.out.println("从乐观读 升级为 悲观读");
result = number;
System.out.println("重新悲观读锁通过获取到的成员变量值result:" + result);
}catch (Exception e){
e.printStackTrace();
}finally {
stampedLock.unlockRead(stamp);
}
}
System.out.println(Thread.currentThread().getName()+"\t finally value: "+result);
}
public static void main(String[] args)
{
StampedLockDemo resource = new StampedLockDemo();
new Thread(() -> {
resource.read();
//resource.tryOptimisticRead();
},"readThread").start();
// 2秒钟时乐观读失败,6秒钟乐观读取成功resource.tryOptimisticRead();,修改切换演示
//try { TimeUnit.SECONDS.sleep(6); } catch (InterruptedException e) { e.printStackTrace(); }
new Thread(() -> {
resource.write();
},"writeThread").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# StampedLock 的缺点
- StampedLock 不支持重入,没有 Re 开头
- StampedLock 的悲观读锁和写锁都不支持条件变量(Condition),这个也需要注意。
- 使用 StampedLock 一定不要调用中断操作,即不要调用 interrupt () 方法 如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly () 和写锁 writeLockInterruptibly ()