目 录CONTENT

文章目录

AQS - 并发与锁

FatFish1
2024-10-23 / 0 评论 / 0 点赞 / 67 阅读 / 0 字 / 正在检测是否收录...

AQS概述

AQS(AbstractQueuedSynchronizer),所谓的AQS即是抽象的队列式的同步器,内部定义了很多锁相关的方法。AQS框架包括以下部分:

  • countDownLatch:计数器

  • reentrantLock:锁

  • reentrantReadWriteLock:读写锁

  • cyclleBarrier:计数器

  • semaphore:信号量

AQS的实现原共享资源状态(volatile int state)+FIFO线程等待队列。这里volatile能够保证多线程下的可见性当state=1则代表当前对象锁已经被占有,其他线程来加锁时则会失败,加锁失败的线程会被放入一个FIFO的等待队列中,被UNSAFE.park()操作挂起,等待其他获取锁的线程释放锁才能够被唤醒。

AbstractQueuedSynchronizer

AQS的核心设计思路就是一个volatile的共享变量state和一个volatile修饰的FIFO队列,这两个都是线程安全的,能够在释放和获取锁时被正确改变。

// 阻塞队列的头节点
private transient volatile Node head;
// 阻塞队列的尾节点
private transient volatile Node tail;
// 锁状态
private volatile int state;

AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer,继承了持锁对象成员变量

 private transient Thread exclusiveOwnerThread;

但由于AQS本身提供的是锁竞争的逻辑,不是真正的锁,所以此对象少有体现。

AQS有一个内部类Node,观察Node的成员变量:

volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
private volatile int state;
private transient volatile Node head;
private transient volatile Node tail;

可以看到node的设计是一个双向的链表,同时保有其加锁线程,以及状态。同时还有一个直接指向头尾的head、tail。

这里的waitStatus是FIFO队列里面等待加锁的元素的状态,有以下几种:

  • SIGNAL:-1,表示待加锁任务的线程处于park状态

  • CANCELLED:1,表示待加锁线程已取消

  • CONDITION:-2,表示待加锁线程条件等待

  • PROPAGATE:-3

独占锁的申请流程和阻塞队列元素存入

在锁的申请流程中,acquire通过调用子类(真正的锁)的tryAcquire方法获取锁,具体怎么获取,看子类怎么实现的。如果获取不到,则把当前的任务塞到阻塞队列中做队尾。同时还要拿到这个node的前驱,如果前驱是head,尝试再次加锁。如果加锁加不上,或前驱不是head,则将前驱prev的状态修改为SIGNAL,同时线程挂起。

acquire - 获取锁方法架构

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire方法调用tryAcquire尝试获取锁,如果获取不到,则调用加入阻塞队列。tryAcquire就是父类的抽象方法,留给子类去实现具体怎么获取锁。

如果获取锁失败了,先执行addWaiter添加阻塞队列。添加队列完成后执行acquireQueued方法让这个新节点以死循环的方式获取锁,获取不到进入阻塞状态

addWaiter - 把当前节点添加到阻塞队列尾部

Node pred = tail;
if (pred != null) {
   ……
}
enq(node);

这里比较简单就是拿到tail,然后让新加入的node做tail,新node的prev是前tail。如果tail是null,执行enq插入,enq方法执行的是死循环的形式插入,确保Node一定能正确添加到尾部。

for (;;) {
    ……
    compareAndSetTail(t, node)
}

acquireQueued - 自旋申请锁

for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        interrupted = true;
}

node是addWaiter添加到队列里面的那个没拿到锁的新任务

node.predecessor是为了获取node的前驱p。这里判断,如果当前节点的前驱p是头结点head,然后让当前线程执行tryAcquire不阻塞申请锁。

如果申请到了,让node自己变成head,同时执行p.next=null是为了让p引用释放,便于垃圾回收

如果没申请到,执行shouldParkAfterFailedAcquire方法修改状态。然后暂停线程。

这里是个永不停的for循环,一旦par掉,for循环暂停。直到后面release流程把head的后继激活,如果就是它,则继续推动它的head进行获取锁,如果不是它,那它还是park在这,继续等着

这里为什么前驱是头节点才能获取锁?答:

  • 头节点是已经成功获取到同步状态的节点,而头节点释放同步状态后,会唤醒后继节点,每个被唤醒的节点都检查自己的前驱是否是头

  • 维护同步队列的FIFO原则,因为是先进先出。

shouldParkAfterFailedAcquire - 修改队列元素状态并判断是否满足park条件

    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;

这里拿pred的waitStatus,如果以及是signal了,就不再执行了直接return true。

如果waitStatus > 0,意味着前驱已经被取消了,则循环找前驱的前驱,把前面所有状态>0的全部丢弃,直到非取消状态的,配置为node的前驱,并且修改waitStatus为signal

parkAndCheckInterrupt暂停线程

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

这里就是把当前线程挂起了。park方法内部执行了线程的暂停。

独占锁的释放流程和阻塞队列元素的清理

执行release操作也是调用到子类锁中的tryRelease,判断锁释放后,执行唤醒当前线程的方法,被唤醒的等锁线程会继续其park的地方继续执行,即又回到了acquireQueued方法的无限for循环中

release - 锁释放骨架

  public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

这里的tryRelease还是留给子类锁去实现的。如果释放锁成功了,则拿阻塞队列中的head,如果head的状态非0(大概率为SIGNAL -1)执行unparkSuccessor方法激活它。

unparkSuccessor - 激活后继节点

Node s = node.next;
if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
        if (t.waitStatus <= 0)
            s = t;
}
if (s != null)
    LockSupport.unpark(s.thread);

传进来的node是head,如果head的waitStatus是<0的,可以直接set为0,然后往后找。

取head的后继s,如果s为null或s的waitStatus>0,证明为cancel状态,则从tail往前推,直到推到t为head,或t为null停止,找到的最后一个t就是head需要指向的后继。

此时调用unpark方法激活s的线程,从前面挂起的地方(acquireQueued)继续执行

共享锁的获取和释放操作

共享锁获取与独占锁的区别在于,同一时刻是否能有多个线程同时获取到同步状态

acquireShared - 共享锁获取锁骨架

if (tryAcquireShared(arg) < 0)
    doAcquireShared(arg);

这里也是先调用到锁自行实现的tryAcquireShared方法,区别就在于try方法的返回,共享锁返回>0说明是获取到,如果<0说明获取不到,然后调用doAcquireShared尝试自旋阻塞获取。

doAcquireShared - 共享锁自旋阻塞获取锁

for (;;) {
    ……
    if (p == head) {
    int r = tryAcquireShared(arg);
    if (r >= 0) {
        setHeadAndPropagate(node, r);
        p.next = null; // help GC
        ……
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())

共享锁的自旋阻塞流程,可以看到,如果p是头节点,就可以去非阻塞获取锁,如果获取到了,这里调用setHeadAndPropagate方法把它设置为头节点,但这里会判断后面是否都是共享,如果是,立刻释放锁,给下一个获取

如果p不是头,才进入Park逻辑

setHeadAndPropagate - 共享锁设置node状态

Node s = node.next;
if (s == null || s.isShared())
    doReleaseShared();

这里与独占锁的区别在于,独占锁只是做setHead操作,但是共享锁还需要判断下一个节点也是共享,就直接通过doReleaseShared方法通知后面的节点继续抢锁

releaseShared - 共享锁释放锁骨架

if (tryReleaseShared(arg)) {
    doReleaseShared();
……

非阻塞释放,如果成功,调用doReleaseShared方法唤醒后续节点,在doReleaseShared方法中也是获取头节点,通过无限自旋和CAS操作做状态修改,调用unparkSuccessor唤醒后置节点

独占式超时锁获取和释放

doAcquireNanos - 阻塞获取独占式超时锁

final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);

addWaiter的流程都一样,但是这里生成了一个deadline,用于判断超时

获取锁调用tryAcquire都一样,到获取不到判断超时的逻辑

nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
    return false;
if (shouldParkAfterFailedAcquire(p, node) &&
    nanosTimeout > spinForTimeoutThreshold)
    LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
    throw new InterruptedException();

首先如果deadline - 当前时间是负的,说明已经超时了,不用自旋直接返回了

否则,判断是否阻塞,跟独占锁一样,如果需要阻塞,还需要判断一下超时时间和超时时间阈值,如果nanosTimeout小于等于spinForTimeoutThreshold(1000纳秒)时,将不会使该线程进行超时等待,而是进入快速的自旋过程。原因在于,非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让nanosTimeout的超时从整体上表现得反而不精确。

因此超时时间很短的场景下,同步器会进入无条件的快速自旋

AQS阻塞队列的其他操作

hasQueuedPredecessors - 判断队列是否有节点

判断思路如下:

第一个if判断head是否为null,为null直接返回fale没有节点,如果head非null判断第二个if

第二个if判断head.next(s)为null,或head.next的waitStatus是取消状态,如果第二个if判断失败,则判断第三个if。如果判断成功,从tail往前找,直到找到head或找到一个不为null的节点给s,再判断第三个if。

第三个if判断s非null,且s的所属线程不是当前线程,说明是等待状态的,证明队列有节点。这里s要么是第二个if中的head.next,要么是从tail往前找到的非null的节点。

    if (s != null && s.thread != Thread.currentThread())

这里第三个if再次判断s非空,是为了防止enq存在并发操作。因为前面的入对操作,第一个线程先让tail = node,再让head.next = node,执行完第一步,第二步还没执行的时候,第二个线程来了正好判断head.next=null

transferAfterCancelledWait - 判断中断状态

    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
    }

final boolean transferAfterCancelledWait(Node node) {
	//使用cas修改节点状态,如果还能修改成功,说明线程被中断时,signal还没有被调用。 
	// 这里要注意,就是线程被唤醒,并不一定是在java层面执行了locksupport.unpark
    // 也可能是调用了线程的interrupt()方法,这个方法会更新一个中断标识,并且会唤醒处于阻塞状态下的线程。 
	if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 
		enq(node); //如果cas成功,则把
		node添加到AQS队列 
		return true; 
	} 
 
	//如果cas失败,则判断当前node是否已经在AQS队列上,如果不在,则让给其他线程执行 
 
	//当node被触发了signal方法时,node就会被加到 aqs队列上 while (!isOnSyncQueue(node))
	//循环检测node是否已经成功添加到AQS队列中。
 
	// 如果没有,则通过yield, 释放资源
	Thread.yield(); 
	return false; 
}

check方法是判断线程是否中断,中断的话执行transfer方法判断抛异常还是继续中断

isHeldExclusively - 判断持锁线程是不是当前线程

是一个留给锁实现类自己去实现的判断方法。它用于判断持锁线程是不是当前的线程。

acquireInterruptibly - 监测当前线程是否中断的加锁方法

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

该方法会先判断当前线程是否是中断状态,如果是直接抛出中断异常,放弃取锁。如果拿不到锁,这里的入队和等待方法同样也是支持线程中断监测的。

doAcquireInterruptibly - 监测当前线程是否中断的入队方法

    if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        throw new InterruptedException();

与acquire方法不同的点在于,该方法在park时会检查线程的Interrupt状态,如果是interrupt的,同样抛出中断异常。

0

评论区