以下文章来源于程序猿阿星 ,作者点击关注 👉
一起成长进阶!专注技术原理、源码,通过图解方式输出技术,这里将会分享操作系统、计算机网络、Java、分布式、数据库等精品原创文章
(给ImportNew加星标,提高Java技能)
转自: 程序猿阿星
AbstractQueuedSynchronizer 抽象同步队列简称 AQS,它是实现同步器的基础组件,如常用的 ReentrantLock、Semaphore、CountDownLatch 等。
AQS 定义了一套多线程访问共享资源的同步模板,解决了实现同步器时涉及的大量细节问题,能够极大地减少实现工作。
虽然大多数开发者可能永远不会使用 AQS 实现自己的同步器(JUC 包下提供的同步器基本足够应对日常开发),但是知道 AQS 的原理对于架构设计还是很有帮助的,面试还可以吹吹牛。
下面是 AQS 的组成结构:
AQS 由三部分组成:state 同步状态、Node 组成的 CLH 队列、ConditionObject 条件变量(包含 Node 组成的条件单向队列),下面会分别对这三部分做介绍。
先贴下 AbstractQueuedSynchronizer 提供的核心函数,混个脸熟就够了,后面会讲解。
状态
独占资源(不响应线程中断)
共享资源(不响应线程中断)
这里补充下,获取独占、共享资源操作还提供超时与响应中断的扩展函数,有兴趣的读者可以去AbstractQueuedSynchronizer源码了解。
在 AQS 中维护了一个同步状态变量 state,getState 函数获取同步状态,setState、compareAndSetState 函数修改同步状态。
对于 AQS来说,线程同步的关键是对 state 的操作。可以说获取、释放资源是否成功都是由 state 决定的。比如 state>0 代表可获取资源,否则无法获取。所以 state 的具体语义由实现者去定义。
现有的 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch 定义的 state 语义都不一样。
CLH 是 AQS 内部维护的 FIFO(先进先出)双端双向队列(方便尾部节点插入),基于链表数据结构。当一个线程竞争资源失败,就会将等待资源的线程封装成一个 Node 节点,通过 CAS 原子操作插入队列尾部。最终,不同的 Node 节点连接组成了一个 CLH 队列。所以说 AQS 通过 CLH 队列管理竞争资源的线程。
个人总结 CLH 队列具有如下几个优点:
Node 是 AQS 的内部类。每个等待资源的线程都会封装成 Node 节点组成 CLH 队列、等待队列。所以说 Node 是非常重要的部分,理解它是理解 AQS的第一步。
队列 Node 类中的变量都很好理解,只有 waitStatus、nextWaiter 没有细说,下面做个补充说明。
waitStatus 等待状态如下:
nextWaiter 特殊标记:
线程获取资源失败,封装成 Node 节点从 CLH 队列尾部入队并阻塞线程。某线程释放资源时会把 CLH 队列首部 Node 节点关联的线程唤醒(此处的首部是指第二个节点,后面会细说)再次获取资源。
获取资源失败的线程需要封装成 Node 节点,接着尾部入队。在 AQS 中提供 addWaiter 函数完成 Node 节点的创建与入队。
/*** @author: 程序猿阿星* @description: Node节点入队-CLH队列* @param mode 标记 Node.EXCLUSIVE独占式 or Node.SHARED共享式*/private Node addWaiter(Node mode) {//根据当前线程创建节点,等待状态为0Node node = new Node(Thread.currentThread(), mode);// 获取尾节点Node pred = tail;if (pred != null) {//如果尾节点不等于null,把当前节点的前驱节点指向尾节点node.prev = pred;//通过cas把尾节点指向当前节点if (compareAndSetTail(pred, node)) {//之前尾节点的下个节点指向当前节点pred.next = node;return node;}}//如果添加失败或队列不存在,执行end函数enq(node);return node;}
添加节点的时候,如果从 CLH队列已经存在,通过 CAS 快速将当前节点添加到队列尾部;如果添加失败或队列不存在,则指向 enq 函数自旋入队。
/*** @author: 程序猿阿星* @description: 自旋cas入队* @param node 节点*/private Node enq(final Node node) {for (;;) { //循环//获取尾节点Node t = tail;if (t == null) {//如果尾节点为空,创建哨兵节点,通过cas把头节点指向哨兵节点if (compareAndSetHead(new Node()))//cas成功,尾节点指向哨兵节点tail = head;} else {//当前节点的前驱节点设指向之前尾节点node.prev = t;//cas设置把尾节点指向当前节点if (compareAndSetTail(t, node)) {//cas成功,之前尾节点的下个节点指向当前节点t.next = node;return t;}}}}
通过自旋 CAS 尝试往队列尾部插入节点,直到成功。自旋过程如果发现 CLH 队列不存在时,会初始化 CLH 队列。
入队过程流程如下图:
第一次循环:
第二次循环
3.当前线程节点的前驱节点指向尾部节点(哨兵节点);
4.设置当前线程节点为尾部,tail 指向当前线程节点;
5.前尾部节点的后驱节点指向当前线程节点(当前尾部节点)。
最后结合 addWaiter 与 enq 函数的入队流程图如下:
CLH 队列中的节点都是获取资源失败的线程节点。当持有资源的线程释放资源时,会将 head.next 指向的线程节点唤醒(CLH 队列的第二个节点)。如果唤醒的线程节点获取资源成功,线程节点清空信息设置为头部节点(新哨兵节点),原头部节点出队(原哨兵节点)。
acquireQueued 函数中的部分代码:
//1.获取前驱节点final Node p = node.predecessor();//如果前驱节点是首节点,获取资源(子类实现)if (p == head && tryAcquire(arg)) {//2.获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点setHead(node);//3.原来首节点下个节点指向为nullp.next = null; // help GC//4.非异常状态,防止指向finally逻辑failed = false;//5.返回线程中断状态return interrupted;}private void setHead(Node node) {//节点设置为头部head = node;//清空线程node.thread = null;//清空前驱节点node.prev = null;}
只需要关注1~3步骤即可,过程非常简单:假设获取资源成功,更换头部节点,并把头部节点的信息清除变成哨兵节点。
注意,这个过程是不需要使用 CAS来保证,因为只有一个线程能够成功获取到资源。
Object 的 wait、notify 函数是配合 Synchronized 锁实现线程间同步协作的功能,AQS 的 ConditionObject 条件变量也提供这样的功能。通过 ConditionObject 的 await 和 signal 两类函数完成。
不同于 Synchronized 锁,一个 AQS 可以对应多个条件变量,而Synchronized只有一个。
如上图所示,ConditionObject 内部维护着一个单向条件队列。不同于 CHL 队列,条件队列只入队执行 await 的线程节点。并且加入条件队列的节点,不能在 CHL 队列, 条件队列出队的节点,会入队到 CHL 队列。
当某个线程执行了 ConditionObject 的 await 函数,阻塞当前线程。线程会被封装成 Node 节点添加到条件队列的末端,其他线程执行 ConditionObject 的 signal 函数,会将条件队列头部线程节点转移到 CHL 队列参与竞争资源,具体流程如下图:
最后补充下,条件队列Node类是使用nextWaiter变量指向下个节点,并且因为是单向队列,所以prev与next变量都是null
AQS 采用了模板方法设计模式,提供了两类模板:一类是独占式模板,另一类是共享形模式,对应的模板函数如下:
acquire 是个模板函数,模板流程就是线程获取共享资源。如果获取资源成功,线程直接返回;否则进入 CLH 队列,直到获取资源成功为止,且整个过程忽略中断的影响。
acquire 函数代码如下:
acquire 函数的大致流程都清楚了,下面来分析下 acquireQueued 函数。线程封装成节点后,是如何自旋阻塞等待获取资源的,代码如下:
/*** @author: 程序猿阿星* @description: 自旋机制等待获取资源* @param node* @param arg* @return: boolean*/final boolean acquireQueued(final Node node, int arg) {//异常状态,默认是boolean failed = true;try {//该线程是否中断过,默认否boolean interrupted = false;for (;;) {//自旋//获取前驱节点final Node p = node.predecessor();//如果前驱节点是首节点,获取资源(子类实现)if (p == head && tryAcquire(arg)) {//获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点setHead(node);//原来首节点下个节点指向为nullp.next = null; // help GC//非异常状态,防止指向finally逻辑failed = false;//返回线程中断状态return interrupted;}/*** 如果前驱节点不是首节点,先执行shouldParkAfterFailedAcquire函数,shouldParkAfterFailedAcquire做了三件事* 1.如果前驱节点的等待状态是SIGNAL,返回true,执行parkAndCheckInterrupt函数,返回false* 2.如果前驱节点的等大状态是CANCELLED,把CANCELLED节点全部移出队列(条件节点)* 3.以上两者都不符合,更新前驱节点的等待状态为SIGNAL,返回false** 使用LockSupport类的静态方法park挂起当前线程,直到被唤醒* 唤醒后检查当前线程是否被中断,返回该线程中断状态并重置中断状态*/if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {//该线程被中断过interrupted = true;}} finally {// 尝试获取资源失败并执行异常,取消请求,将当前节点从队列中移除if (failed) {cancelAcquire(node);}}}
一图胜千言,核心流程图如下:
有获取资源,自然就少不了释放资源。AQS 中提供了 release 模板函数来释放资源,模板流程就是线程释放资源成功,唤醒 CLH队列的第二个线程节点(首节点的下个节点),代码如下:
/*** @author: 程序猿阿星* @description: 独占式-释放资源模板函数* @param arg* @return: boolean*/public final boolean release(int arg) {//释放资源成功,tryRelease子类实现if (tryRelease(arg)) {//获取头部线程节点Node h = head;//头部线程节点不为null,并且等待状态不为0if (h != null && h.waitStatus != 0) {//唤醒CHL队列第二个线程节点unparkSuccessor(h);}return true;}return false;}private void unparkSuccessor(Node node) {//获取节点等待状态int ws = node.waitStatus;if (ws < 0) {//cas更新节点状态为0compareAndSetWaitStatus(node, ws, 0);}//获取下个线程节点Node s = node.next;//如果下个节点信息异常,从尾节点循环向前获取到正常的节点为止,正常情况不会执行if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev) {if (t.waitStatus <= 0) {s = t;}}}if (s != null) {//唤醒线程节点LockSupport.unpark(s.thread);}}
release 逻辑非常简单,流程图如下:
acquireShared 是个模板函数,模板流程就是线程获取共享资源。如果获取到资源,线程直接返回;否则进入CLH队列,直到获取到资源为止,且整个过程忽略中断的影响。acquireShared 函数代码如下:
/*** @author: 程序猿阿星* @description: 共享式-获取资源模板函数* @param arg* @return: void*/public final void acquireShared(int arg) {/*** 1.负数表示失败* 2.0表示成功,但没有剩余可用资源* 3.正数表示成功且有剩余资源*/if (tryAcquireShared(arg) < 0) //获取资源失败,tryAcquireShared子类实现//自旋阻塞等待获取资源doAcquireShared(arg);}
doAcquireShared 函数与独占式的 acquireQueued 函数逻辑基本一致,唯一的区别就是下图红框部分:
AQS 中提供了 releaseShared 模板函数来释放资源,模板流程就是线程释放资源成功,唤醒 CHL 队列的第二个线程节点(首节点的下个节点),代码如下:
/*** @author: 程序猿阿星* @description: 共享式-释放资源模板函数* @param arg* @return: boolean*/public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {//释放资源成功,tryReleaseShared子类实现//唤醒后继节点doReleaseShared();return true;}return false;}private void doReleaseShared() {for (;;) {//获取头节点Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {//如果头节点等待状态为SIGNALif (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//更新头节点等待状态为0continue; // loop to recheck cases//唤醒头节点下个线程节点unparkSuccessor(h);}//如果后继节点暂时不需要被唤醒,更新头节点等待状态为PROPAGATEelse if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}if (h == head)break;}}
与独占式释放资源区别不大,都是唤醒头节点的下个节点,就不做过多描述了。
说了这么多理论,现在到实战环节了。正如前文所述,AQS 定义了一套多线程访问共享资源的同步模板,解决了实现同步器时涉及的大量细节问题,能够极大地减少实现工作。现在我们基于 AQS 实现一个不可重入的独占锁,直接使用 AQS 提供的独占式模板,只需明确 state 的语义与实现 tryAcquire 与 tryRelease 函数(获取资源与释放资源),在这里 state 为 0 表示锁没有被线程持有,state 为 1 表示锁已经被某个线程持有。由于是不可重入锁,所以不需要记录持有锁线程的获取锁次数。
不可重入的独占锁代码如下:
/*** @Author 程序猿阿星* @Description 不可重入的独占锁*/public class NonReentrantLock implements Lock {/*** @Author 程序猿阿星* @Description 自定义同步器*/private static class Sync extends AbstractQueuedSynchronizer {/*** 锁是否被线程持有*/protected boolean isHeldExclusively() {//0:未持有 1:已持有return super.getState() == 1;}/*** 获取锁*/protected boolean tryAcquire(int arg) {if (arg != 1) {//获取锁操作,是需要把state更新为1,所以arg必须是1throw new RuntimeException("arg not is 1");}if (compareAndSetState(0, arg)) {//cas 更新state为1成功,代表获取锁成功//设置持有锁线程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}/*** 释放锁*/protected boolean tryRelease(int arg) {if (arg != 0) {//释放锁操作,是需要把state更新为0,所以arg必须是0throw new RuntimeException("arg not is 0");}//清空持有锁线程setExclusiveOwnerThread(null);//设置state状态为0,此处不用cas,因为只有获取锁成功的线程才会执行该函数,不需要考虑线程安全问题setState(arg);return true;}/*** 提供创建条件变量入口*/public ConditionObject createConditionObject() {return new ConditionObject();}}private final Sync sync = new Sync();/*** 获取锁*/public void lock() {//Aqs独占式-获取资源模板函数sync.acquire(1);}/*** 获取锁-响应中断*/public void lockInterruptibly() throws InterruptedException {//Aqs独占式-获取资源模板函数(响应线程中断)sync.acquireInterruptibly(1);}/*** 获取锁是否成功-不阻塞*/public boolean tryLock() {//子类实现return sync.tryAcquire(1);}/*** 获取锁-超时机制*/public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {//Aqs独占式-获取资源模板函数(超时机制)return sync.tryAcquireNanos(1,unit.toNanos(time));}/*** 释放锁*/public void unlock() {//Aqs独占式-释放资源模板函数sync.release(0);}/*** 创建条件变量*/public Condition newCondition() {return sync.createConditionObject();}}
NonReentrantLock 定义了一个内部类 Sync,Sync 用来实现具体的锁操作,它继承了 AQS,因为使用的是独占式模板,所以重写 tryAcquire 与 tryRelease 函数,另外提供了一个创建条件变量的入口,下面使用自定义的独占锁来同步两个线程对 j++。
private static int j = 0;public static void main(String[] agrs) throws InterruptedException {NonReentrantLock nonReentrantLock = new NonReentrantLock();Runnable runnable = () -> {//获取锁nonReentrantLock.lock();for (int i = 0; i < 100000; i++) {j++;}//释放锁nonReentrantLock.unlock();};Thread thread = new Thread(runnable);Thread threadTwo = new Thread(runnable);thread.start();threadTwo.start();thread.join();threadTwo.join();System.out.println(j);}
无论执行多少次输出内容都是:200000。
- EOF -
看完本文有收获?请转发分享给更多人
关注「ImportNew」,提升Java技能
点赞和在看就是最大的支持❤️