Java并发编程系列:深入分析AQS原理_aqs 队列获得锁之后为什么执行thread.currentthread().interrupt()-程序员宅基地

技术标签: Java杂货铺  


AQS又称为队列同步器,它是用来构建锁或其他同步组件的基础框架,它是实现ReentrangLock、Semaphore等同步工具的基础。本文将会详细的阐述AQS实现的细节问题。

数据结构定义

AQS内部通过int类型的state控制锁的状态,当state=0时,则说明没有任何线程占有共享资源的锁,当state>0时,则说明有线程目前正在使用共享变量,其他线程必须加入同步队列进行等待。同步队列为FIFO的双向队列,竞争失败的线程会被添加至队尾。

// 同步队列的头部
private transient volatile Node head;
// 同步队列的尾部
private transient volatile Node tail;
// 同步状态
private volatile int state;

Node节点的定义:

//标识线程的状态
volatile int waitStatus;
//等待队列的前驱节点
volatile Node prev;
//等待队列的后继节点
volatile Node next;
//当前节点的线程
volatile Thread thread;
//条件队列的等待节点
Node nextWaiter;
//判断当前节点是否是共享节点
final boolean isShared() {
    
 return nextWaiter == SHARED;
}
  • 1 CANCELLED:该节点的线程可能由于超时或被中断而处于被取消(作废)状态,一旦处于这个状态,节点状态将一直 处于CANCELLED,因此应该从队列中移除
  • -1 SIGNAL:表示该节点处于等待唤醒状态,后继节点会被挂起,因此在当前节点释放锁或被取消之后必须唤醒其后继结点
  • -2 CONDITION:该节点的线程处于等待条件状态,不会被当作是同步队列上的节点,直到被唤醒(signal),设置其值为0,重新进入阻塞状态
  • 0:新加入的节点

在锁的获取时,并不一定只有一个线程才能持有这个锁,所以此时有了独占模式和共享模式的区别,通过nextWaiter来区分。
还有一个点是公平锁和非公平锁,它是由子类来实现的。在ReentrantLock中有FairSync和NonFairSync来实现。

下面以ReentrantLock为例,解释锁的获取和释放流程。

获取锁

获取锁的流程为Lock.lock -> Sync.lock -> AQS.acquire -> Sync.tryAcquire -> AQS.addWaiter ->AQS.acquireQueued,我们按照这个流程逐步分析。TODO 流程图

# Lock.lock -> Sync.lock

获取锁可以通过ReentrantLock中的lock、lockInterruptibly、tryLock,此三个方法的意义在ReentrantLock的文章中已经详细阐述。

public void lock() {
    
    sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
    
    sync.acquireInterruptibly(1);
}
public boolean tryLock() {
    
    return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

可以看出内部都是通过sync来实现,抽象类Sync继承了AQS,并且Sync的实现类为FairSync和NonFairSync。因此调用根据构造方法实例化出的FairSync或着NonFairSync的lock方法:

// Fair
final void lock() {
    
    acquire(1);
}
// NonFair
final void lock() {
    
    // 以cas方式尝试将AQS中的state从0更新为1
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

公平锁和非公平锁的lock方法这里就有区别,非公共锁先通过CAS操作去竞争锁,然后再去执行AQS实现的acquire方法。
exclusiveOwnerThread属性是AQS从父类AbstractOwnableSynchronizer中继承的属性,用来保存当前占用锁的线程。

# AQS.acquire -> Sync.tryAcquire

继续跟进AQS

public final void acquire(int arg) {
    
    if (tryAcquire(arg){
    
        return;
    }
    if(acquireQueued(addWaiter(Node.EXCLUSIVE), arg))) {
    
        selfInterrupt();
    }
}

首先执行tryAcquire方法,由具体的子类实现,不同的子类有不同的实现方式,如果失败,表示该线程获取锁失败,就调用addWaiter方法,将当前线程加入到等待队列中,然后返回当前线程的node节点。将node节点传递给acquireQueued方法,如果node节点的前驱节点是头结点,就再次尝试获取到锁,如果获取锁成功(成功返回的是false不会执行selfInterrupt方法),就将该节点设置为头结点,如果获取失败,就将当前节点的线程挂起。

下面看非公平锁的tryAcquire实现:

// NonFair
protected final boolean tryAcquire(int acquires) {
    
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    
        // state为0,说明当前锁未被任何线程持有
        if (compareAndSetState(0, acquires)) {
    // CAS设置state,如果成功,则设置锁的拥有者为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果是重入得情况
    else if (current == getExclusiveOwnerThread()) {
    
        int nextc = c + acquires;
        setState(nextc);// 更改进入的次数
        return true;
    }
    return false;
}

这里公平锁和非公平锁的实现几乎相同,只是多了一个!hasQueuedPredecessors()判断条件,意思是当前同步队列中如果没有正在排队的线程,才会进行后续的步骤。

# addWaiter

如果执行到addWaiter,则说明前面的tryAcquire没有抢到锁,那么会将将节点加入到等待队列。这里需要注意前面提到独享锁和共享锁,
ReentrantLock属于独享锁,并且AQS通过Node节点也就是线程的封装来表示独享/共享,因此这里传入的Mode的参数为Node.EXCLUSIVE。

private Node addWaiter(Node mode) {
    
    // 将当前线程构造为等待节点
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
    // 如果尾节点部为空
        node.prev = pred;// 将当前节点添加至队列尾部
        if (compareAndSetTail(pred, node)) {
    
            pred.next = node;
            return node;
        }
    }
    // 如果尾节点为空,或者CAS操作失败,则通过死循环更新尾节点
    enq(node);
    return node;
}

enq方法没什么好说的,死循环+CAS,返回node的前驱节点

private Node enq(final Node node) {
    
    for (;;) {
    
        Node t = tail;
        if (t == null) {
     // 如果尾节点为空,那么初始化尾和头,头节点是一个空节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
     // 
            node.prev = t;
            if (compareAndSetTail(t, node)) {
    
                t.next = node;
                return t;
            }
        }
    }
}
# acquireQueued

在把node插入队列末尾后,它并不立即挂起该节点中线程,因为在插入它的过程中,前面的线程可能已经执行完成,
所以它会先进行自旋操作,尝试让该线程重新获取锁。代码如下:

final boolean acquireQueued(final Node node, int arg) {
    
    boolean failed = true;
    try {
    
        boolean interrupted = false;
        for (;;) {
    
            // 得到前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点是head节点并且tryAcquire获取到锁
            if (p == head && tryAcquire(arg)) {
    
                setHead(node); // 设置Head为节点当前节点
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果当前节点前驱节点不是head或者CAS设置失败,去挂起线程
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() )
                interrupted = true;
        }
    } finally {
    
    	// 正常情况下failed = false,cancelAcquire的作用是删除节点,
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire()方法的作用是判断当前结点的前驱结点是否为SIGNAL状态,如果是则返回true。
如果为CANCELLED状态(值为1>0),即结束状态,则说明该前驱结点已没有用应该从同步队列移除,直到寻找到非CANCELLED状态的结点。倘若前驱结点的ws值不为CANCELLED,也不为SIGNAL(当从Condition的条件等待队列转移到同步队列时,结点状态为CONDITION因此需要转换为SIGNAL),那么将其转换为SIGNAL状态,以便在下轮循环中将其挂起。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    
    int ws = pred.waitStatus;// 前驱节点的状态
    if (ws == Node.SIGNAL)// 如果是等待唤醒状态,返回true,
        return true;
    if (ws > 0) {
     // >0 则为CNACLE状态,被取消了,需要将前驱节点移除
        do {
    
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
    // 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    
        //将当前线程挂起
        LockSupport.park(this);
        //获取线程中断状态,interrupted()是判断当前中断状态,
        return Thread.interrupted();
}

parkAndCheckInterrupt()方法挂起当前线程,需要等待一个unpark()操作来唤醒它,调用interrupte方法可以中断,稳定后列表的状态为

  • 除了头节点,剩余节点都被阻塞,线程处于WAITING状态。
  • 除了尾节点,剩余节点都满足waitStatus==SIGNAL,表示释放后需要唤醒后继节点。

到此ReetrantLock内部间接通过AQS的FIFO的同步队列就完成了lock()操作。一张图总结lock的流程:
lock

释放锁

下面继续看unLock的流程:

public final boolean release(int arg) {
    
   if (tryRelease(arg)) {
    
       Node h = head;
       if (h != null && h.waitStatus != 0)
           unparkSuccessor(h);
       return true;
   }
   return false;
}

释放锁其实就两个步骤,1.释放锁,2.如果完全释放则唤醒等待的线程。先看释放锁:

protected final boolean tryRelease(int releases) {
    
    int c = getState() - releases;// 
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
    // 完全释放
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

完全释放表示ownerThread的所有重入操作均已结束,接着是唤醒后面的线程,注意这里并没有将head置为null,只是将ExclusiveOwnerThread和state初始化。

private void unparkSuccessor(Node node) {
           
    int ws = node.waitStatus;
    if (ws < 0) // 正常情况下为-1
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
    // 如果下一个节点为空或者被取消,继续从尾节点开始找离头结点最近的
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)// 状态<0 的节点
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);// 唤醒下一个节点
}

当最近可用的节点被唤醒后,会进入acquireQueued()函数的if (p == head && tryAcquire(arg))的判断,继续开始自旋。

Condition实现原理

在 https://blog.csdn.net/TheLudlows/article/details/76962006 中介绍了Condition的用法,类似于Object的wait和notify。

# await

Condition接口提供了await、signal方法,它的实现为AQS的内部类ConditionObject,在它的内部也有一个队列,称为等待队列,单向列表实现。AQS内部的队列叫做同步队列。同步队列主要用来保存阻塞的线程,而等待队列用来保存调用了await方法的线程。ConditionObject的成员变量如下:

 public class ConditionObject implements Condition, java.io.Serializable {
    
    //等待队列第一个等待结点
    private transient Node firstWaiter;
    //等待队列最后一个等待结点
    private transient Node lastWaiter;
}

等待队列的元素和同步队列中的元素都是Node类型。当一个线程调用了await()相关的方法,那么该线程将会释放锁,并构建一个Node节点封装当前线程的相关信息加入到等待队列中进行等待,直到被唤醒、中断、超时才从队列中移出。

等待队列中结点的状态只有两种即CANCELLED和CONDITION,前者表示线程已结束需要从等待队列中移除,后者表示条件结点等待被唤醒。每个Codition对象对于一个等待队列,也就是说AQS中只能存在一个同步队列,但可拥有多个等待队列。

下面分析await方法的逻辑:

public final void await() throws InterruptedException {
    
    if (Thread.interrupted())
        throw new InterruptedException();
    // 构建为Node节点,并加入队尾
    Node node = addConditionWaiter();
    // 释放当前线程锁即释放同步状态
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
    // 阻塞,直到收到信号或被中断
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 唤醒之后执行,同时判断线程是否被中断
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

添加到等待队列:

private Node addConditionWaiter() {
    
    Node t = lastWaiter;
    // lastWaiter初始化为null
    // 清除被唤醒的node
    if (t != null && t.waitStatus != Node.CONDITION) {
    
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 新建node,状态为 Node.CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)// 初始化队列
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

尽管此处没有任何线程安全的保护,但实际使用时不会出现任何线程安全问题——因为条件队列的使用要求我们在调用await或signal时持有与该条件队列唯一相关的锁。共享锁中没有实现Lock接口,因此没有newCondition方法。

final int fullyRelease(Node node) {
    
    boolean failed = true;
    try {
    
        int savedState = getState();
        if (release(savedState)) {
    // 上节讲过的release,注意参数
            failed = false;
            return savedState;// 返回状态
        } else {
    
            throw new IllegalMonitorStateException();
        }
    } finally {
    
        if (failed)// 正常情况不会进入此分支
            node.waitStatus = Node.CANCELLED;
    }
}
# signal
public final void signal() {
    
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

signal()方法做了两件事,一是判断当前线程是否持有独占锁,没有就抛出异常,从这点也可以看出只有独占模式先采用等待队列,而共享模式下是没有等待队列的,也就没法使用Condition。二是将等待队列的头节点从等待队列中删除,同时将它加入到同步队列,意思是次线程可以去竞争锁了。

private void doSignal(Node first) {
    
   do {
    
   	   firstWaiter = first.nextWaiter;// 移除首节点
       if (firstWaiter == null)
           lastWaiter = null;
       first.nextWaiter = null; // 将旧的首节点next属性置为null
       
   } while (!transferForSignal(first) && (first = firstWaiter) != null);
}     

transferForSignal 将 first节点移出等待队列,通过时修改状态,加入同步队列,根据在同步队列中的前驱节点的状态和来决定是否唤醒等待阻塞的线程。

final boolean transferForSignal(Node node) {
    
	// 设置节点状态为初始状态
   if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    Node p = enq(node);// 加入同步队列,得到前驱节点
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

其实这里不唤醒阻塞的线程也是可以的,因为此线程已经加入到同步队列中,同步队列中等待的线程是通过前驱节点的来唤醒的。但是这里为什么要多次一举?能够进入此分支说明前驱节点是CANCELLED状态,那么说明当前节点距离Head又进了一步,早些将此CANCELLED节点清除,因此将次线程唤醒,去竞争锁,同时删除无效的节点。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/TheLudlows/article/details/88696874

智能推荐

二叉树的层序遍历算法_编写算法,实现二叉树的层序遍历-程序员宅基地

文章浏览阅读291次。二叉树的层序遍历算法是一种按照层级顺序访问节点的方法,它利用队列的先进先出特性来实现。通过遍历每一层的节点,并将其子节点入队,我们可以按照层级顺序获取二叉树中的节点值。在这篇文章中,我们将介绍如何实现二叉树的层序遍历算法,并给出相应的源代码。在每一次迭代中,我们首先取出队首节点,访问它,并将它的左右子节点(如果存在)依次入队。如果该节点存在右子节点,则将右子节点入队。变量记录当前层的节点数量,这是为了在内层循环中准确地遍历当前层的节点。以上示例中,我们创建了一个二叉树,并使用层序遍历算法对其进行遍历。_编写算法,实现二叉树的层序遍历

linux usb虚拟网卡(NCM)_linux usb ncm-程序员宅基地

文章浏览阅读6.6k次。1. kernel config<M>USB Gadget precomposed configurations<M>Ethernet Gadget (with CDC Ethernet support) <M>Network Control Model (NCM) support2. build modulesmake ARCH=arm64 CROSS_COMPILE=aar..._linux usb ncm

Struts 应用转移到 Struts 2-程序员宅基地

文章浏览阅读1.9k次。 翻译:SpringSide团队 转载请注明出处。有很多人都很熟悉 Struts, 无论是从项目中直接获得的实战经验还是从书中了解到的。我们这一系列文章,将通过一个由 Stuts 转移到 Struts2 简单的例子向大家展现Struts2的所有特征。 在我们开始这个例子之前,你需要去知道一点 Struts2的背景知识。 在第一部分的文章中,我们将介绍Struts2与Struts的核心

在Windows平台上安装MRTG流量监控软件_mrtg 下载-程序员宅基地

文章浏览阅读188次。打开MRTG软件包中的"MRTG.cfg"文件,该文件是MRTG的主配置文件。打开MRTG软件包中的"MRTG.cfg"文件,该文件是MRTG的主配置文件。确保将命令中的"C:\MRTG"替换为你的MRTG安装目录和配置文件路径,"community"替换为你的SNMP团体字符串,"device_ip"替换为目标设备的IP地址。确保将命令中的"C:\MRTG"替换为你的MRTG安装目录和配置文件路径,"community"替换为你的SNMP团体字符串,"device_ip"替换为目标设备的IP地址。_mrtg 下载

kaggle简单使用教程(代码查找.下载、项目建立.运行、参加比赛)_kaggle在线写代码-程序员宅基地

文章浏览阅读1w次,点赞7次,收藏35次。Kaggle机器学习竞赛、托管数据库、编写和分享代码_kaggle在线写代码

随便推点

第三十二节 java学习 —— Applet_java applet是在哪学的-程序员宅基地

文章浏览阅读324次。1》Applet的定义applet是java语言编写的,无法独立运行,但是可以镶嵌到网页中执行。它扩展了传统的编程结构和方法,可以通过互联网发布到任何具有java编译环境的浏览的个体计算机上。 2》Applet用途用户可以静态显示Applet,像显示一副图片或一段文本一样;Applet可以是一个动态交互过程,用户输入简单的数据产生相应的响应。3》Applet编写的格式在编写_java applet是在哪学的

zabbix_server 3.0 安装-程序员宅基地

文章浏览阅读57次。2019独角兽企业重金招聘Python工程师标准>>> ..._zabbix server 3.0安装

OpenCV图像梯度_opencv 计算梯度图像-程序员宅基地

文章浏览阅读1.7k次。目标在本章中,我们将学习:寻找图像梯度、边缘等 我们将看到以下职能:cv2.sobel(), cv2.scharr(), cv2.Laplacian()等理论OpenCV提供三种类型的梯度滤波器或高通滤波器,Sobel、Scharr和Laplacian.我们会看到他们中的每一个。1.Sobel和Scharr衍生物¶Sobel算子是一种联合高斯平滑加微分运算,具有更强的..._opencv 计算梯度图像

flutter 聊天界面+表情图片_flutter表情包插件-程序员宅基地

文章浏览阅读2.6k次。网上找了找 零零碎碎有一些文章 没找到一个整体的 自己做完记录一下 防止忘了大体就是这样聊天气泡用的是https://blog.csdn.net/oterminator12/article/details/105790961这个文章看到的然后表情用的是https://blog.csdn.net/qq_36676433/article/details/104756685这个文章看到的整体结构及底部输入/表情选择部分body下的结构主要为最外层Column,然后聊天部分用F..._flutter表情包插件

win10应用:便签 商店 xbox等打不开,报错0x800704cf_xbox0x800704cf错误代码-程序员宅基地

文章浏览阅读2.8k次,点赞3次,收藏2次。登录便签,一直报错:执行此操作需要Internet,0x800704cf。笔者网络是没有问题的,其它程序可以正常访问。解决方法关闭代理1.Win+R打开运行,输入 inetcpl.cpl 打开internet选项界面2.切换到[连接]选项,点击局域网设置。红色框选处的两个勾取消。笔者上述配置后即可解决问题。如若还不能解决,试试下面这个方法设置DNS服务器地址,首选设置为4.2.2.1 备用设置为4.2.2.2..._xbox0x800704cf错误代码

conda命令克隆(复制)环境_conda clone-程序员宅基地

文章浏览阅读8.9w次,点赞55次,收藏138次。在服务器上想要使用别人搭好的环境,但是又怕自己对环境的修改更新会影响他人的使用,这个时候可以使用conda命令进行复制环境。首先假设已经安装了Anaconda。根据已有环境名复制生成新的环境假设已有环境名为A,需要生成的环境名为B:conda create -n B --clone A根据已有环境路径复制生成新的环境假设已有环境路径为D:\A,需要生成的新的环境名为B:conda ..._conda clone