Java 并发笔记

Java 并发笔记

封面来源:https://whvn.cc/y8622k

学习资源:

深入理解 Java 虚拟机 第三版

黑马程序员深入学习Java并发编程,JUC并发编程全套教程

JavaNote/Prog.md at main · Seazean/JavaNote (github.com)

java面试题:java中断,synchronized和ReentrantLock能否中断_我是方小磊的博客-CSDN博客

ps: 本文章只是上文相关节选,用作 Javaguide 等八股内容外的补充

系统学习请直接参考以上学习资料或者学习完 Javaguide 等八股内容再看本节选。

杂记 (对应视频 1p-77p)

说说线程的生命周期和状态?

Java 线程在运行的生命周期中的指定时刻只可能处于下面 6 种不同状态的其中一个状态:

  • NEW: 初始状态,线程被创建出来但没有被调用 start()
  • RUNNABLE: 运行状态,线程被调用了 start()等待运行的状态。
  • BLOCKED :阻塞状态,需要等待锁释放。
  • WAITING:等待状态,表示该线程需要等待其他线程做出一些特定动作(通知或中断)。
  • TIME_WAITING:超时等待状态,可以在指定的时间后自行返回而不是像 WAITING 那样一直等待。
  • TERMINATED:终止状态,表示该线程已经运行完毕。

创建线程的方式

通过继承 Thread 类

实现 Runnable 接口来创建线程

FutureTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "Hello, FutureTask!";
}
};

FutureTask<String> futureTask = new FutureTask<>(callable);

Thread thread = new Thread(futureTask);
thread.start();

String result = futureTask.get();
System.out.println(result);
}
}

这两种方式都可以用来创建线程,但是一般情况下推荐使用实现 Runnable 接口的方式来创建线程,原因如下:

  1. 代码复用性更好。使用实现 Runnable 接口的方式,可以将线程的任务和线程本身分离开来,这样可以使得代码更加清晰和易于维护。
  2. 避免单继承的限制。Java 中的类只能继承一个父类,因此如果使用继承 Thread 类的方式来创建线程,就无法继承其他的类了,而使用实现 Runnable 接口的方式则可以避免这个问题。
  3. 支持线程池。使用实现 Runnable 接口的方式可以更好地支持线程池,因为线程池需要的是一个实现了 Runnable 接口的任务。

线程上下文切换的时间

  1. 垃圾回收
  2. cpu 时间片用完
  3. 有更高级的线程需要运行
  4. 线程自己调用了 lock,wait,sleep,synchronized 等方法

任务优先级

只是一个提示,繁忙的时候,调度器会让优先级高的,获得更多时间片,但是cpu闲时,任务优先级没有作用

isInterrupt

join:等待线程运行结束

  1. join,wait,sleep,打断标记为 false,抛出异常
  2. 根据打断标记判断是否继续运行
  3. 打断正常运行的程序,不会干扰正常运行,只是有打断标记

image-20230227154427692

线程终止正确的方式—-两阶段终止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ProducerConsumer{
Thread monitor;
public void start() {
monitor = new Thread(() -> {
while (true) {
Thread now = Thread.currentThread();
if (now.isInterrupted()) {
break;
}
try {
now.sleep(1000);
} catch(InterruptedException e) {
stop();
}
}
});
}

public void stop(){
monitor.interrupt();
}
}

守护线程

主线程结束,守护线程无论如何都会结束

1
2
3
4
5
6
7
8
Thread t = new Thread(new Runnable() {
@Override
public void run() {
// do something
}
});
t.setDaemon(true); // 将线程设置为守护线程
t.start();

通常,守护线程被用于执行一些后台任务,例如JVM的垃圾回收线程、定时任务线程等。它们并不需要与用户交互,而且在程序退出时也不需要执行清理工作,因此设置为守护线程可以避免无谓的等待和资源浪费。

synchronized 锁机制

Monitor

image-20230227195506766

组成

  • owner: 拥有 Monitor 的线程,可以执行 Monitor 所保护的临界区代码。
  • entryList: Monitor 的入口队列,用于存放请求锁的线程。
  • waitSet: 等待队列,用于存放因调用 wait() 方法而被阻塞的线程。
  • signal 相关的组件: 用于实现线程间的协作,包括 notify() 方法、notifyAll() 方法和 wait() 方法等。其中 notify() 方法和 notifyAll() 方法用于唤醒 waitSet 中的线程,而 wait() 方法则会将当前线程加入到 waitSet 中。

轻量级锁

  • 创建锁记录(Lock Record)对象,每个线程的栈帧都会包含一个锁记录的结构,存储锁定对象的 Mark Word

    img

  • 让锁记录中 Object reference 指向锁住的对象,并尝试用 CAS 替换 Object 的 Mark Word,将 Mark Word 的值存入锁记录

  • 如果 CAS 替换成功,对象头中存储了锁记录地址和状态 00(轻量级锁) ,表示由该线程给对象加锁 img

  • 如果 CAS 失败,有两种情况:

    • 如果是其它线程已经持有了该 Object 的轻量级锁,这时表明有竞争,进入锁膨胀过程
    • 如果是线程自己执行了 synchronized 锁重入,就添加一条 Lock Record 作为重入的计数

    img

  • 当退出 synchronized 代码块(解锁时)

    • 如果有取值为 null 的锁记录,表示有重入,这时重置锁记录,表示重入计数减 1

    • 如果锁记录的值不为 null,这时使用 CAS

      将 Mark Word 的值恢复给对象头

      • 成功,则解锁成功
      • 失败,说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程

锁膨胀

在尝试加轻量级锁的过程中,CAS 操作无法成功,可能是其它线程为此对象加上了轻量级锁(有竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁

  • 当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁

    img

  • Thread-1 加轻量级锁失败,进入锁膨胀流程:为 Object 对象申请 Monitor 锁,通过 Object 对象头获取到持锁线程,将 Monitor 的 Owner 置为 Thread-0,将 Object 的对象头指向重量级锁地址,然后自己进入 Monitor 的 EntryList BLOCKED

    img

  • 当 Thread-0 退出同步块解锁时,使用 CAS 将 Mark Word 的值恢复给对象头失败,这时进入重量级解锁流程,即按照 Monitor 地址找到 Monitor 对象,设置 Owner 为 null,唤醒 EntryList 中 BLOCKED 线程

自旋优化

偏向锁

偏向锁的思想是偏向于让第一个获取锁对象的线程,这个线程之后重新获取该锁不再需要同步操作:

  • 当锁对象第一次被线程获得的时候进入偏向状态,标记为 101,同时使用 CAS 操作将线程 ID 记录到 Mark Word。如果 CAS 操作成功,这个线程以后进入这个锁相关的同步块,查看这个线程 ID 是自己的就表示没有竞争,就不需要再进行任何同步操作
  • 当有另外一个线程去尝试获取这个锁对象时,偏向状态就宣告结束,此时撤销偏向(Revoke Bias)后恢复到未锁定或轻量级锁状态
  • 默认开启偏向锁
  • 偏向锁是默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加 VM 参数 -XX:BiasedLockingStartupDelay=0 来禁用延迟。JDK 8 延迟 4s 开启偏向锁原因:在刚开始执行代码时,会有好多线程来抢锁,如果开偏向锁效率反而降低
  • 添加 VM 参数 -XX:-UseBiasedLocking 禁用偏向锁

偏向锁撤销情况

  • 调用对象的 hashCode:偏向锁的对象 MarkWord 中存储的是线程 id,调用 hashCode 导致偏向锁被撤销
  • 当有其它线程使用偏向锁对象时,会将偏向锁升级为轻量级锁
  • 调用 wait/notify,需要申请 Monitor,进入 WaitSet

批量撤销

如果对象被多个线程访问,但没有竞争,这时偏向了线程 T1 的对象仍有机会重新偏向 T2,重偏向会重置对象的 Thread ID

  • 批量重偏向:当撤销偏向锁阈值超过 20 次后,JVM 会觉得是不是偏向错了,于是在给这些对象加锁时重新偏向至加锁线程
  • 批量撤销:当撤销偏向锁阈值超过 40 次后,JVM 会觉得自己确实偏向错了,根本就不该偏向,于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的

锁消除

锁消除是指对于被检测出不可能存在竞争的共享数据的锁进行消除,这是 JVM 即时编译器的优化(JIT)

锁消除主要是通过逃逸分析来支持,如果堆上的共享数据不可能逃逸出去被其它线程访问到,那么就可以把它们当成私有数据对待,也就可以将它们的锁进行消除

锁粗化

对相同对象多次加锁,导致线程发生多次重入,频繁的加锁操作就会导致性能损耗,可以使用锁粗化方式优化

如果虚拟机探测到一串的操作都对同一个对象加锁,将会把加锁的范围扩展(粗化)到整个操作序列的外部

  • 一些看起来没有加锁的代码,其实隐式的加了很多锁:

    1
    2
    3
    public static String concatString(String s1, String s2, String s3) {
    return s1 + s2 + s3;
    }
  • String 是一个不可变的类,编译器会对 String 的拼接自动优化。在 JDK 1.5 之前,转化为 StringBuffer 对象的连续 append() 操作,每个 append() 方法中都有一个同步块

    1
    2
    3
    4
    5
    6
    7
    public static String concatString(String s1, String s2, String s3) {
    StringBuffer sb = new StringBuffer();
    sb.append(s1);
    sb.append(s2);
    sb.append(s3);
    return sb.toString();
    }

扩展到第一个 append() 操作之前直至最后一个 append() 操作之后,只需要加锁一次就可以

同步模式

保护性暂停

保护性暂停(Guarded Suspension)是一种并发编程模式,用于解决生产者-消费者问题,即生产者线程在生产数据时,如果消费者线程尚未准备好接收数据,那么生产者线程就需要等待消费者线程准备好再生产数据,以避免浪费资源或者产生错误结果。

join 源码

VvExN.png

单任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class GuardedObject {
private Object response;
private final Object lock = new Object();

//获取结果
//timeout :最大等待时间
public Object get(long millis) {
synchronized (lock) {
// 1) 记录最初时间
long begin = System.currentTimeMillis();
// 2) 已经经历的时间
long timePassed = 0;
while (response == null) {
// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
long waitTime = millis - timePassed;
log.debug("waitTime: {}", waitTime);
//经历时间超过最大等待时间退出循环
if (waitTime <= 0) {
log.debug("break...");
break;
}
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3) 如果提前被唤醒,这时已经经历的时间假设为 400
timePassed = System.currentTimeMillis() - begin;
log.debug("timePassed: {}, object is null {}",
timePassed, response == null);
}
return response;
}

//产生结果
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
log.debug("notify...");
lock.notifyAll();
}
}
}

多任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class  Mailboxes {
private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
private static int id = 1;
//产生唯一的id
private static synchronized int generateId() {
return id++;
}

public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}

public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}

public static Set<Integer> getIds() {
return boxes.keySet();
}
}
class GuardedObject {
//标识,Guarded Object
private int id;//添加get set方法
}

异步模式

传统版

异步模式之生产者/消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

public void increment() throws Exception{
// 同步代码块,加锁
lock.lock();
try {
// 判断 防止虚假唤醒
while(number != 0) {
// 等待不能生产
condition.await();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void decrement() throws Exception{
// 同步代码块,加锁
lock.lock();
try {
// 判断 防止虚假唤醒
while(number == 0) {
// 等待不能消费
condition.await();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

public class TraditionalProducerConsumer {
public static void main(String[] args) {
ShareData shareData = new ShareData();
// t1线程,生产
new Thread(() -> {
for (int i = 0; i < 5; i++) {
shareData.increment();
}
}, "t1").start();

// t2线程,消费
new Thread(() -> {
for (int i = 0; i < 5; i++) {
shareData.decrement();
}
}, "t2").start();
}
}

改进版

异步模式之生产者/消费者:

  • 消费队列可以用来平衡生产和消费的线程资源,不需要产生结果和消费结果的线程一一对应
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种阻塞队列,采用的就是这种模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class demo {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(2);
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
queue.put(new Message(id,"值"+id));
}, "生产者" + i).start();
}

new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
Message message = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"消费者").start();
}
}

//消息队列类,Java间线程之间通信
class MessageQueue {
private LinkedList<Message> list = new LinkedList<>();//消息的队列集合
private int capacity;//队列容量
public MessageQueue(int capacity) {
this.capacity = capacity;
}

//获取消息
public Message take() {
//检查队列是否为空
synchronized (list) {
while (list.isEmpty()) {
try {
sout(Thread.currentThread().getName() + ":队列为空,消费者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//从队列的头部获取消息返回
Message message = list.removeFirst();
sout(Thread.currentThread().getName() + ":已消费消息--" + message);
list.notifyAll();
return message;
}
}

//存入消息
public void put(Message message) {
synchronized (list) {
//检查队列是否满
while (list.size() == capacity) {
try {
sout(Thread.currentThread().getName()+":队列为已满,生产者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//将消息加入队列尾部
list.addLast(message);
sout(Thread.currentThread().getName() + ":已生产消息--" + message);
list.notifyAll();
}
}
}

final class Message {
private int id;
private Object value;
//get set
}

阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) {
ExecutorService consumer = Executors.newFixedThreadPool(1);
ExecutorService producer = Executors.newFixedThreadPool(1);
BlockingQueue<Integer> queue = new SynchronousQueue<>();
producer.submit(() -> {
try {
System.out.println("生产...");
Thread.sleep(1000);
queue.put(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
consumer.submit(() -> {
try {
System.out.println("等待消费...");
Integer result = queue.take();
System.out.println("结果为:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

park&&unpark

由三部分组成:_cond, _mutex, _counter

park:如果 counter 已经为零,获取mutex,进入cond ,线程挂起,否则 counter - 1。

unpark: 设置 counter 为1, 如果已经在 _cond 中,线程继续运行,_counter - 1。

LockSupport 出现就是为了增强 wait & notify 的功能:

  • wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park、unpark 不需要
  • park & unpark 以线程为单位来阻塞和唤醒线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程
  • park & unpark 可以先 unpark,而 wait & notify 不能先 notify。类比生产消费,先消费发现有产品就消费,没有就等待;先生产就直接产生商品,然后线程直接消费
  • wait 会释放锁资源进入等待队列,park 不会释放锁资源,只负责阻塞当前线程,会释放 CPU

死锁定位

定位死锁的方法:

  • 使用 jps 定位进程 id,再用 jstack id 定位死锁,找到死锁的线程去查看源码,解决优化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    "Thread-1" #12 prio=5 os_prio=0 tid=0x000000001eb69000 nid=0xd40 waiting formonitor entry [0x000000001f54f000]
    java.lang.Thread.State: BLOCKED (on object monitor)
    #省略
    "Thread-1" #12 prio=5 os_prio=0 tid=0x000000001eb69000 nid=0xd40 waiting for monitor entry [0x000000001f54f000]
    java.lang.Thread.State: BLOCKED (on object monitor)
    #省略

    Found one Java-level deadlock:
    ===================================================
    "Thread-1":
    waiting to lock monitor 0x000000000361d378 (object 0x000000076b5bf1c0, a java.lang.Object),
    which is held by "Thread-0"
    "Thread-0":
    waiting to lock monitor 0x000000000361e768 (object 0x000000076b5bf1d0, a java.lang.Object),
    which is held by "Thread-1"

    Java stack information for the threads listed above:
    ===================================================
    "Thread-1":
    at thread.TestDeadLock.lambda$main$1(TestDeadLock.java:28)
    - waiting to lock <0x000000076b5bf1c0> (a java.lang.Object)
    - locked <0x000000076b5bf1d0> (a java.lang.Object)
    at thread.TestDeadLock$$Lambda$2/883049899.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:745)
    "Thread-0":
    at thread.TestDeadLock.lambda$main$0(TestDeadLock.java:15)
    - waiting to lock <0x000000076b5bf1d0> (a java.lang.Object)
    - locked <0x000000076b5bf1c0> (a java.lang.Object)
    at thread.TestDeadLock$$Lambda$1/495053715
  • Linux 下可以通过 top 先定位到 CPU 占用高的 Java 进程,再利用 top -Hp 进程id 来定位是哪个线程,最后再用 jstack 的输出来看各个线程栈

  • 可以使用 jconsole 工具,在 jdk\bin 目录下

    VvVMx.png

点击对应线程检测死锁

活锁

两个线程互相改变对方的结束条件,最后谁也无法结束

饥饿

饥饿:一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束

ReentrantLock

条件变量

基本使用

synchronized 的条件变量,是当条件不满足时进入 WaitSet 等待;ReentrantLock 的条件变量比 synchronized 强大之处在于支持多个条件变量

ReentrantLock 类获取 Condition 对象:public Condition newCondition()

Condition 类 API:

  • void await():当前线程从运行状态进入等待状态,释放锁
  • void signal():唤醒一个等待在 Condition 上的线程,但是必须获得与该 Condition 相关的锁

使用流程:

  • await / signal 前需要获得锁

  • await 执行后,会释放锁进入 ConditionObject 等待

  • await 的线程被唤醒去重新竞争 lock 锁

  • 线程在条件队列被打断会抛出中断异常

  • 竞争 lock 锁成功后,从 await 后继续执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) throws InterruptedException {    
ReentrantLock lock = new ReentrantLock();
//创建一个新的条件变量
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
new Thread(() -> {
try {
lock.lock();
System.out.println("进入等待");
//进入休息室等待
condition1.await();
System.out.println("被唤醒了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
Thread.sleep(1000);
//叫醒
new Thread(() -> {
try {
lock.lock();
//唤醒
condition2.signal();
} finally {
lock.unlock();
}
}).start();
}

Volatile

性能:volatile 修饰的变量进行读操作与普通变量几乎没什么差别,但是写操作相对慢一些,因为需要在本地代码中插入很多内存屏障来保证指令不会发生乱序执行,但是开销比锁要小

synchronized 无法禁止指令重排和处理器优化,为什么可以保证有序性可见性

  • 加了锁之后,只能有一个线程获得到了锁,获得不到锁的线程就要阻塞,所以同一时间只有一个线程执行,相当于单线程,由于数据依赖性的存在,单线程的指令重排是没有问题的
  • 线程加锁前,将清空工作内存中共享变量的值,使用共享变量时需要从主内存中重新读取最新的值;线程解锁前,必须把共享变量的最新值刷新到主内存中(JMM 内存交互章节有讲)

内存屏障

  • 对 volatile 变量的写指令后会加入写屏障
  • 对 volatile 变量的读指令前会加入读屏障

写屏障(sfence,Store Barrier)保证在该屏障之前的,对共享变量的改动,都同步到主存当中

读屏障(lfence,Load Barrier)保证在该屏障之后的,对共享变量的读取,从主存刷新变量值,加载的是主存中最新数据

全能屏障:mfence(modify/mix Barrier),兼具 sfence 和 lfence 的功能

Atomic

原理:自旋锁 + CAS 算法

循环比较预期值和当前值,相等则修改,否则继续

常用API

常见原子类:AtomicInteger、AtomicBoolean、AtomicLong

构造方法:

  • public AtomicInteger():初始化一个默认值为 0 的原子型 Integer
  • public AtomicInteger(int initialValue):初始化一个指定值的原子型 Integer

常用API:

方法 作用
public final int get() 获取 AtomicInteger 的值
public final int getAndIncrement() 以原子方式将当前值加 1,返回的是自增前的值
public final int incrementAndGet() 以原子方式将当前值加 1,返回的是自增后的值
public final int getAndSet(int value) 以原子方式设置为 newValue 的值,返回旧值
public final int addAndGet(int data) 以原子方式将输入的数值与实例中的值相加并返回
实例:AtomicInteger 里的 value

原子引用

原子引用:对 Object 进行原子操作,提供一种读和写都是原子性的对象引用变量

原子引用类:AtomicReference、AtomicStampedReference、AtomicMarkableReference

AtomicReference 类:

  • 构造方法:AtomicReference<T> atomicReference = new AtomicReference<T>()

  • 常用 API:

    • public final boolean compareAndSet(V expectedValue, V newValue):CAS 操作
    • public final void set(V newValue):将值设置为 newValue
    • public final V get():返回当前值

ABA的解决

  1. AtomicStampedReference

  • 构造方法:

    • public AtomicStampedReference(V initialRef, int initialStamp):初始值和初始版本号
  • 常用API:

    • public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp)期望引用和期望版本号都一致才进行 CAS 修改数据
    • public void set(V newReference, int newStamp):设置值和版本号
    • public V getReference():返回引用的值
    • public int getStamp():返回当前版本号
  1. AtomicMarkableReference

区别:与AtomicStampedReference的区别在于,AtomicMarkableReference使用boolean类型的标记值,而AtomicStampedReference使用int类型的标记值。


原子更新器

原子更新器类:AtomicReferenceFieldUpdater、AtomicIntegerFieldUpdater、AtomicLongFieldUpdater

利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常 IllegalArgumentException: Must be volatile type

常用 API:

  • static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> c, String fieldName):构造方法
  • abstract boolean compareAndSet(T obj, int expect, int update):CAS
1
2
3
4
5
6
7
8
9
10
11
public class UpdateDemo {
private volatile int field;

public static void main(String[] args) {
AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater
.newUpdater(UpdateDemo.class, "field");
UpdateDemo updateDemo = new UpdateDemo();
fieldUpdater.compareAndSet(updateDemo, 0, 10);
System.out.println(updateDemo.field);//10
}
}

原子累加器

原子累加器类:LongAdder、DoubleAdder、LongAccumulator、DoubleAccumulator

LongAdder 和 LongAccumulator 区别:

相同点:

  • LongAddr 与 LongAccumulator 类都是使用非阻塞算法 CAS 实现的
  • LongAddr 类是 LongAccumulator 类的一个特例,只是 LongAccumulator 提供了更强大的功能,可以自定义累加规则,当accumulatorFunction 为 null 时就等价于 LongAddr

不同点:

  • 调用 casBase 时,LongAccumulator 使用 function.applyAsLong(b = base, x) 来计算,LongAddr 使用 casBase(b = base, b + x)

  • LongAccumulator 类功能更加强大,构造方法参数中

    • accumulatorFunction 是一个双目运算器接口,可以指定累加规则,比如累加或者相乘,其根据输入的两个参数返回一个计算值,LongAdder 内置累加规则
    • identity 则是 LongAccumulator 累加器的初始值,LongAccumulator 可以为累加器提供非0的初始值,而 LongAdder 只能提供默认的 0

伪共享

待续

LongAddr 源码解析

待续

Unsafe

Unsafe 是 CAS 的核心类,由于 Java 无法直接访问底层系统,需要通过本地(Native)方法来访问

Unsafe 类存在 sun.misc 包,其中所有方法都是 native 修饰的,都是直接调用操作系统底层资源执行相应的任务,基于该类可以直接操作特定的内存数据,其内部方法操作类似 C 的指针

模拟实现原子整数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public static void main(String[] args) {
MyAtomicInteger atomicInteger = new MyAtomicInteger(10);
if (atomicInteger.compareAndSwap(20)) {
System.out.println(atomicInteger.getValue());
}
}

class MyAtomicInteger {
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
private volatile int value;

static {
try {
//Unsafe unsafe = Unsafe.getUnsafe()这样会报错,需要反射获取
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
UNSAFE = (Unsafe) theUnsafe.get(null);
// 获取 value 属性的内存地址,value 属性指向该地址,直接设置该地址的值可以修改 value 的值
VALUE_OFFSET = UNSAFE.objectFieldOffset(
MyAtomicInteger.class.getDeclaredField("value"));
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
throw new RuntimeException();
}
}

public MyAtomicInteger(int value) {
this.value = value;
}
public int getValue() {
return value;
}

public boolean compareAndSwap(int update) {
while (true) {
int prev = this.value;
int next = update;
// 当前对象 内存偏移量 期望值 更新值
if (UNSAFE.compareAndSwapInt(this, VALUE_OFFSET, prev, update)) {
System.out.println("CAS成功");
return true;
}
}
}
}

final

原理

1
2
3
public class TestFinal {
final int a = 20;
}

字节码:

1
2
3
4
5
6
7
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: aload_0
5: bipush 20 // 将值直接放入栈中
7: putfield #2 // Field a:I
<-- 写屏障
10: return

final 变量的赋值通过 putfield 指令来完成,在这条指令之后也会加入写屏障,保证在其它线程读到它的值时不会出现为 0 的情况

其他线程访问 final 修饰的变量会复制一份放入栈中,效率更高

当其大于SHORT.MAX_VALUE 时放入常量池

保护性拷贝

创建副本对象来避免共享的方式称之为保护性拷贝

State

无状态:成员变量保存的数据也可以称为状态信息,无状态就是没有成员变量

线程池

线程池设计模式—-享元模式

享元模式(Flyweight pattern): 用于减少创建对象的数量,以减少内存占用和提高性能,这种类型的设计模式属于结构型模式,它提供了减少对象数量从而改善应用所需的对象结构的方式

状态信息

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量。这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 CAS 原子操作进行赋值。

状态 高3位 接收新任务 处理阻塞任务队列 说明
RUNNING 111 Y Y
SHUTDOWN 000 N Y 不接收新任务,但处理阻塞队列剩余任务
STOP 001 N N 中断正在执行的任务,并抛弃阻塞队列任务
TIDYING 010 - - 任务全执行完毕,活动线程为 0 即将进入终结
TERMINATED 011 - - 终止状态

第一位符号位,从小到大

Executors

Executors 提供了四种线程池的创建:newCachedThreadPool、newFixedThreadPool、newSingleThreadExecutor、newScheduledThreadPool

  • newFixedThreadPool:创建一个拥有 n 个线程的线程池

    1
    2
    3
    4
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
    • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
    • LinkedBlockingQueue 是一个单向链表实现的阻塞队列,默认大小为 Integer.MAX_VALUE,也就是无界队列,可以放任意数量的任务,在任务比较多的时候会导致 OOM(内存溢出)
    • 适用于任务量已知,相对耗时的长期任务
  • newCachedThreadPool:创建一个可扩容的线程池

    1
    2
    3
    4
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }
    • 核心线程数是 0 ,全部都是救急线程(60s 后可以回收),可能会创建大量线程,从而导致 OOM

    • SynchronousQueue 作为阻塞队列,没有容量,对于每一个 take 的线程会阻塞直到有一个 put 的线程放入元素为止(类似一手交钱、一手交货)

    • 适合任务数比较密集,但每个任务执行时间较短的情况

  • newSingleThreadExecutor:创建一个只有 1 个线程的单线程池

    1
    2
    3
    4
    5
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }
    • 保证所有任务按照指定顺序执行,线程数固定为 1,任务数多于 1 时会放入无界队列排队,任务执行完毕,这唯一的线程也不会被释放

对比:

  • 创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,线程池会新建一个线程,保证池的正常工作

  • Executors.newSingleThreadExecutor() 线程个数始终为 1,不能修改。FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法

    原因:父类不能直接调用子类中的方法,需要反射或者创建对象的方式,可以调用子类静态方法

  • Executors.newFixedThreadPool(1) 初始时为 1,可以修改。对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

提交方法

ExecutorService 类 API:

方法 说明
void execute(Runnable command) 执行任务(Executor 类 API)
Future<?> submit(Runnable task) 提交任务 task()
Future submit(Callable task) 提交任务 task,用返回值 Future 获得任务执行结果
List<Future> invokeAll(Collection<? extends Callable> tasks) 提交 tasks 中所有任务
List<Future> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) 提交 tasks 中所有任务,超时时间针对所有task,超时会取消没有执行完的任务,并抛出超时异常
T invokeAny(Collection<? extends Callable> tasks) 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消

execute 和 submit 都属于线程池的方法,对比:

  • execute 只能执行 Runnable 类型的任务,没有返回值; submit 既能提交 Runnable 类型任务也能提交 Callable 类型任务,底层是封装成 FutureTask,然后调用 execute 执行

  • execute 会直接抛出任务执行时的异常,submit 会吞掉异常,可通过 Future 的 get 方法将任务执行时的异常重新抛出


关闭方法

ExecutorService 类 API:

方法 说明
void shutdown() 线程池状态变为 SHUTDOWN,等待任务执行完后关闭线程池,不会接收新任务,但已提交任务会执行完,而且也可以添加线程(不绑定任务)
List shutdownNow() 线程池状态变为 STOP,用 interrupt 中断正在执行的任务,直接关闭线程池,不会接收新任务,会将队列中的任务返回
boolean isShutdown() 不在 RUNNING 状态的线程池,此执行者已被关闭,方法返回 true
boolean isTerminated() 线程池状态是否是 TERMINATED,如果所有任务在关闭后完成,返回 true
boolean awaitTermination(long timeout, TimeUnit unit) 调用 shutdown 后,由于调用线程不会等待所有任务运行结束,如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待

处理异常

execute 会直接抛出任务执行时的异常,submit 会吞掉异常,有两种处理方法

方法 1:主动捉异常

1
2
3
4
5
6
7
8
9
ExecutorService executorService = Executors.newFixedThreadPool(1);
pool.submit(() -> {
try {
System.out.println("task1");
int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
}
});

方法 2:使用 Future 对象

1
2
3
4
5
6
7
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<?> future = pool.submit(() -> {
System.out.println("task1");
int i = 1 / 0;
return true;
});
System.out.println(future.get());

任务调度

任务调度

Timer

Timer 实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static void method1() {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
System.out.println("task 1");
//int i = 1 / 0;//任务一的出错会导致任务二无法执行
Thread.sleep(2000);
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
System.out.println("task 2");
}
};
// 使用 timer 添加两个任务,希望它们都在 1s 后执行
// 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此任务1的延时,影响了任务2的执行
timer.schedule(task1, 1000);//17:45:56 c.ThreadPool [Timer-0] - task 1
timer.schedule(task2, 1000);//17:45:58 c.ThreadPool [Timer-0] - task 2
}

Scheduled

任务调度线程池 ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor:

  • 使用内部类 ScheduledFutureTask 封装任务
  • 使用内部类 DelayedWorkQueue 作为线程池队列
  • 重写 onShutdown 方法去处理 shutdown 后的任务
  • 提供 decorateTask 方法作为 ScheduledFutureTask 的修饰方法,以便开发者进行扩展

构造方法:Executors.newScheduledThreadPool(int corePoolSize)

1
2
3
4
5
6
public ScheduledThreadPoolExecutor(int corePoolSize) {
// 最大线程数固定为 Integer.MAX_VALUE,保活时间 keepAliveTime 固定为 0
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
// 阻塞队列是 DelayedWorkQueue
new DelayedWorkQueue());
}

常用 API:

  • ScheduledFuture<?> schedule(Runnable/Callable<V>, long delay, TimeUnit u):延迟执行任务
  • ScheduledFuture<?> scheduleAtFixedRate(Runnable/Callable<V>, long initialDelay, long period, TimeUnit unit):定时执行周期任务,不考虑执行的耗时,参数为初始延迟时间、间隔时间、单位
  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable/Callable<V>, long initialDelay, long delay, TimeUnit unit):定时执行周期任务,考虑执行的耗时,参数为初始延迟时间、间隔时间、单位

基本使用:

  • 延迟任务,但是出现异常并不会在控制台打印,也不会影响其他线程的执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public static void main(String[] args){
    // 线程池大小为1时也是串行执行
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
    // 添加两个任务,都在 1s 后同时执行
    executor.schedule(() -> {
    System.out.println("任务1,执行时间:" + new Date());
    //int i = 1 / 0;
    try { Thread.sleep(2000); } catch (InterruptedException e) { }
    }, 1000, TimeUnit.MILLISECONDS);

    executor.schedule(() -> {
    System.out.println("任务2,执行时间:" + new Date());
    }, 1000, TimeUnit.MILLISECONDS);
    }
  • 定时任务 scheduleAtFixedRate:一次任务的启动到下一次任务的启动之间只要大于等于间隔时间,抢占到 CPU 就会立即执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public static void main(String[] args) {
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    System.out.println("start..." + new Date());

    pool.scheduleAtFixedRate(() -> {
    System.out.println("running..." + new Date());
    Thread.sleep(2000);
    }, 1, 1, TimeUnit.SECONDS);
    }

    /*start...Sat Apr 24 18:08:12 CST 2021
    running...Sat Apr 24 18:08:13 CST 2021
    running...Sat Apr 24 18:08:15 CST 2021
    running...Sat Apr 24 18:08:17 CST 2021
  • 定时任务 scheduleWithFixedDelay:一次任务的结束到下一次任务的启动之间等于间隔时间,抢占到 CPU 就会立即执行,这个方法才是真正的设置两个任务之间的间隔

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public static void main(String[] args){
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
    System.out.println("start..." + new Date());

    pool.scheduleWithFixedDelay(() -> {
    System.out.println("running..." + new Date());
    Thread.sleep(2000);
    }, 1, 1, TimeUnit.SECONDS);
    }
    /*start...Sat Apr 24 18:11:41 CST 2021
    running...Sat Apr 24 18:11:42 CST 2021
    running...Sat Apr 24 18:11:45 CST 2021
    running...Sat Apr 24 18:11:48 CST 2021

ForkJoin

Fork/Join:线程池的实现,体现是分治思想,适用于能够进行任务拆分的 CPU 密集型运算,用于并行计算

任务拆分:将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列都可以用分治思想进行求解

  • Fork/Join 在分治的基础上加入了多线程,把每个任务的分解和合并交给不同的线程来完成,提升了运算效率

  • ForkJoin 使用 ForkJoinPool 来启动,是一个特殊的线程池,默认会创建与 CPU 核心数大小相同的线程池

  • 任务有返回值继承 RecursiveTask,没有返回值继承 RecursiveAction

ForkJoinPool 实现了工作窃取算法来提高 CPU 的利用率:

  • 每个线程都维护了一个双端队列,用来存储需要执行的任务
  • 工作窃取算法允许空闲的线程从其它线程的双端队列中窃取一个任务来执行
  • 窃取的必须是最晚的任务,避免和队列所属线程发生竞争,但是队列中只有一个任务时还是会发生竞争

AQS

AQS 用状态属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁

  • 独占模式是只有一个线程能够访问资源,如 ReentrantLock
  • 共享模式允许多个线程访问资源,如 Semaphore,ReentrantReadWriteLock 是组合式

AQS 核心思想:

  • 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置锁定状态

  • 请求的共享资源被占用,AQS 用队列实现线程阻塞等待以及被唤醒时锁分配的机制,将暂时获取不到锁的线程加入到队列中

ReentrantLock

加锁

NonfairSync 继承自 AQS

1
2
3
public void lock() {
sync.lock();
}
  • 没有竞争:ExclusiveOwnerThread 属于 Thread-0,state 设置为 1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // ReentrantLock.NonfairSync#lock
    final void lock() {
    // 用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示【获得了独占锁】
    if (compareAndSetState(0, 1))
    // 设置当前线程为独占线程
    setExclusiveOwnerThread(Thread.currentThread());
    else
    acquire(1);//失败进入
    }
  • 第一个竞争出现:Thread-1 执行,CAS 尝试将 state 由 0 改为 1,结果失败(第一次),进入 acquire 逻辑

    1
    2
    3
    4
    5
    6
    7
    8
    // AbstractQueuedSynchronizer#acquire
    public final void acquire(int arg) {
    // tryAcquire 尝试获取锁失败时, 会调用 addWaiter 将当前线程封装成node入队,acquireQueued 阻塞当前线程,
    // acquireQueued 返回 true 表示挂起过程中线程被中断唤醒过,false 表示未被中断过
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    // 如果线程被中断了逻辑来到这,完成一次真正的打断效果
    selfInterrupt();
    }
  • 进入 tryAcquire 尝试获取锁逻辑,这时 state 已经是1,结果仍然失败(第二次),加锁成功有两种情况:

    • 当前 AQS 处于无锁状态
    • 加锁线程就是当前线程,说明发生了锁重入
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    // ReentrantLock.NonfairSync#tryAcquire
    protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
    }
    // 抢占成功返回 true,抢占失败返回 false
    final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // state 值
    int c = getState();
    // 条件成立说明当前处于【无锁状态】
    if (c == 0) {
    //如果还没有获得锁,尝试用cas获得,这里体现非公平性: 不去检查 AQS 队列是否有阻塞线程直接获取锁
    if (compareAndSetState(0, acquires)) {
    // 获取锁成功设置当前线程为独占锁线程。
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    // 如果已经有线程获得了锁, 独占锁线程还是当前线程, 表示【发生了锁重入】
    else if (current == getExclusiveOwnerThread()) {
    // 更新锁重入的值
    int nextc = c + acquires;
    // 越界判断,当重入的深度很深时,会导致 nextc < 0,int值达到最大之后再 + 1 变负数
    if (nextc < 0) // overflow
    throw new Error("Maximum lock count exceeded");
    // 更新 state 的值,这里不使用 cas 是因为当前线程正在持有锁,所以这里的操作相当于在一个管程内
    setState(nextc);
    return true;
    }
    // 获取失败
    return false;
    }
  • 接下来进入 addWaiter 逻辑,构造 Node 队列(不是阻塞队列),前置条件是当前线程获取锁失败,说明有线程占用了锁

    • 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
    • Node 的创建是懒惰的,其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // AbstractQueuedSynchronizer#addWaiter,返回当前线程的 node 节点
    private Node addWaiter(Node mode) {
    // 将当前线程关联到一个 Node 对象上, 模式为独占模式
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 快速入队,如果 tail 不为 null,说明存在队列
    if (pred != null) {
    // 将当前节点的前驱节点指向 尾节点
    node.prev = pred;
    // 通过 cas 将 Node 对象加入 AQS 队列,成为尾节点,【尾插法】
    if (compareAndSetTail(pred, node)) {
    pred.next = node;// 双向链表
    return node;
    }
    }
    // 初始时队列为空,或者 CAS 失败进入这里
    enq(node);
    return node;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // AbstractQueuedSynchronizer#enq
    private Node enq(final Node node) {
    // 自旋入队,必须入队成功才结束循环
    for (;;) {
    Node t = tail;
    // 说明当前锁被占用,且当前线程可能是【第一个获取锁失败】的线程,【还没有建立队列】
    if (t == null) {
    // 设置一个【哑元节点】,头尾指针都指向该节点
    if (compareAndSetHead(new Node()))
    tail = head;
    } else {
    // 自旋到这,普通入队方式,首先赋值尾节点的前驱节点【尾插法】
    node.prev = t;
    // 【在设置完尾节点后,才更新的原始尾节点的后继节点,所以此时从前往后遍历会丢失尾节点】
    if (compareAndSetTail(t, node)) {
    //【此时 t.next = null,并且这里已经 CAS 结束,线程并不是安全的】
    t.next = node;
    return t; // 返回当前 node 的前驱节点
    }
    }
    }
    }
  • 线程节点加入队列成功,进入 AbstractQueuedSynchronizer#acquireQueued 逻辑阻塞线程

    • acquireQueued 会在一个自旋中不断尝试获得锁,失败后进入 park 阻塞

    • 如果当前线程是在 head 节点后,会再次 tryAcquire 尝试获取锁,state 仍为 1 则失败(第三次)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    final boolean acquireQueued(final Node node, int arg) {
    // true 表示当前线程抢占锁失败,false 表示成功
    boolean failed = true;
    try {
    // 中断标记,表示当前线程是否被中断
    boolean interrupted = false;
    for (;;) {
    // 获得当前线程节点的前驱节点
    final Node p = node.predecessor();
    // 前驱节点是 head, FIFO 队列的特性表示轮到当前线程可以去获取锁
    if (p == head && tryAcquire(arg)) {
    // 获取成功, 设置当前线程自己的 node 为 head
    setHead(node);
    p.next = null; // help GC
    // 表示抢占锁成功
    failed = false;
    // 返回当前线程是否被中断
    return interrupted;
    }
    // 判断是否应当 park,返回 false 后需要新一轮的循环,返回 true 进入条件二阻塞线程
    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    // 条件二返回结果是当前线程是否被打断,没有被打断返回 false 不进入这里的逻辑
    // 【就算被打断了,也会继续循环,并不会返回】
    interrupted = true;
    }
    } finally {
    // 【可打断模式下才会进入该逻辑】
    if (failed)
    cancelAcquire(node);
    }
    }
    • 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node 的 waitStatus 改为 -1,返回 false;waitStatus 为 -1 的节点用来唤醒下一个节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 表示前置节点是个可以唤醒当前节点的节点,返回 true
    if (ws == Node.SIGNAL)
    return true;
    // 前置节点的状态处于取消状态,需要【删除前面所有取消的节点】, 返回到外层循环重试
    if (ws > 0) {
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    // 获取到非取消的节点,连接上当前节点
    pred.next = node;
    // 默认情况下 node 的 waitStatus 是 0,进入这里的逻辑
    } else {
    // 【设置上一个节点状态为 Node.SIGNAL】,返回外层循环重试
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 返回不应该 park,再次尝试一次
    return false;
    }
    • shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,这时 state 仍为 1 获取失败(第四次)
    • 当再次进入 shouldParkAfterFailedAcquire 时,这时其前驱 node 的 waitStatus 已经是 -1 了,返回 true
    • 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
    1
    2
    3
    4
    5
    6
    private final boolean parkAndCheckInterrupt() {
    // 阻塞当前线程,如果打断标记已经是 true, 则 park 会失效
    LockSupport.park(this);
    // 判断当前线程是否被打断,清除打断标记
    return Thread.interrupted();
    }
  • 再有多个线程经历竞争失败后:


解锁

ReentrantLock#unlock:释放锁

1
2
3
public void unlock() {
sync.release(1);
}

Thread-0 释放锁,进入 release 流程

  • 进入 tryRelease,设置 exclusiveOwnerThread 为 null,state = 0

  • 当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // AbstractQueuedSynchronizer#release
    public final boolean release(int arg) {
    // 尝试释放锁,tryRelease 返回 true 表示当前线程已经【完全释放锁,重入的释放了】
    if (tryRelease(arg)) {
    // 队列头节点
    Node h = head;
    // 头节点什么时候是空?没有发生锁竞争,没有竞争线程创建哑元节点
    // 条件成立说明阻塞队列有等待线程,需要唤醒 head 节点后面的线程
    if (h != null && h.waitStatus != 0)
    unparkSuccessor(h);
    return true;
    }
    return false;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // ReentrantLock.Sync#tryRelease
    protected final boolean tryRelease(int releases) {
    // 减去释放的值,可能重入
    int c = getState() - releases;
    // 如果当前线程不是持有锁的线程直接报错
    if (Thread.currentThread() != getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
    // 是否已经完全释放锁
    boolean free = false;
    // 支持锁重入, 只有 state 减为 0, 才完全释放锁成功
    if (c == 0) {
    free = true;
    setExclusiveOwnerThread(null);
    }
    // 当前线程就是持有锁线程,所以可以直接更新锁,不需要使用 CAS
    setState(c);
    return free;
    }
  • 进入 AbstractQueuedSynchronizer#unparkSuccessor 方法,唤醒当前节点的后继节点

    • 找到队列中距离 head 最近的一个没取消的 Node,unpark 恢复其运行,本例中即为 Thread-1
    • 回到 Thread-1 的 acquireQueued 流程
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    private void unparkSuccessor(Node node) {
    // 当前节点的状态
    int ws = node.waitStatus;
    if (ws < 0)
    // 【尝试重置状态为 0】,因为当前节点要完成对后续节点的唤醒任务了,不需要 -1 了
    compareAndSetWaitStatus(node, ws, 0);
    // 找到需要 unpark 的节点,当前节点的下一个
    Node s = node.next;
    // 已取消的节点不能唤醒,需要找到距离头节点最近的非取消的节点
    if (s == null || s.waitStatus > 0) {
    s = null;
    // AQS 队列【从后至前】找需要 unpark 的节点,直到 t == 当前的 node 为止,找不到就不唤醒了
    for (Node t = tail; t != null && t != node; t = t.prev)
    // 说明当前线程状态需要被唤醒
    if (t.waitStatus <= 0)
    // 置换引用
    s = t;
    }
    // 【找到合适的可以被唤醒的 node,则唤醒线程】
    if (s != null)
    LockSupport.unpark(s.thread);
    }

    从后向前的唤醒的原因:enq 方法中,节点是尾插法,首先赋值的是尾节点的前驱节点,此时前驱节点的 next 并没有指向尾节点,从前遍历会丢失尾节点

  • 唤醒的线程会从 park 位置开始执行,如果加锁成功(没有竞争),会设置

    • exclusiveOwnerThread 为 Thread-1,state = 1
    • head 指向刚刚 Thread-1 所在的 Node,该 Node 会清空 Thread
    • 原本的 head 因为从链表断开,而可被垃圾回收(图中有错误,原来的头节点的 waitStatus 被改为 0 了)

  • 如果这时有其它线程来竞争(非公平),例如这时有 Thread-4 来了并抢占了锁

    • Thread-4 被设置为 exclusiveOwnerThread,state = 1
    • Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞

公平原理

与非公平锁主要区别在于 tryAcquire 方法:先检查 AQS 队列中是否有前驱节点,没有才去 CAS 竞争

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查 AQS 队列中是否有前驱节点, 没有(false)才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 锁重入
return false;
}
}
1
2
3
4
5
6
7
8
9
10
public final boolean hasQueuedPredecessors() {    
Node t = tail;
Node h = head;
Node s;
// 头尾指向一个节点,链表为空,返回false
return h != t &&
// 头尾之间有节点,判断头节点的下一个是不是空
// 不是空进入最后的判断,第二个节点的线程是否是本线程,不是返回 true,表示当前节点有前驱节点
((s = h.next) == null || s.thread != Thread.currentThread());
}

ConcurrentHashMap


Java 并发笔记
http://lanfunoe.site/2023/02/27/Java并发学习笔记/
作者
John Doe
发布于
2023年2月27日
许可协议