还在加班吗?记得吃晚饭哦!

Java

大佬们在说的AQS,到底啥是个AQS(AbstractQueuedSynchronizer)同步队列

2022年04月07日 22:59:43 · 本文共 4,617 字阅读时间约 16分钟 · 3,259 次浏览
大佬们在说的AQS,到底啥是个AQS(AbstractQueuedSynchronizer)同步队列

各位大佬应该听过很多大佬讲过AQS,到底啥是个AQS?根据名称 AbstractQueuedSynchronizer 我们可以猜到,这是一个抽象的排队同步器,每个汉字都认识,连到一起就不认识了,我们分开理解。

Abstract抽象

抽象排队同步器,中的Abstract抽象含义是它是个 Abstract 抽象类,所以 AbstractQueuedSynchronizer 并不直接提供服务,我个人的理解哈,Abstract抽象类就是写了一半的类,所以AQS其实是一个框架、一种解决方案,JDK作者根据他的思路写了一个解决方案,然后咱们可以继续补全他的类,实现自己的同步器。

Queued队列

抽象排队同步器,中的Queued队列含义是它依赖一个队列数据结构来实现,是一个先进先出(FIFO)的队列数据结构,当然也会配合相关同步器(信号量、事件等)来实现,内容有点多后面再说。

Synchronizer同步器

抽象排队同步器,中的Synchronizer同步器含义就是它是个同步器,嗯~跟没说一样,这里我写我自己的理解,不一定对哈,如果有错误欢迎指出,在多线程环境中各个线程被多个CPU随机挑选执行,所以几乎是乱序的执行,在某些场景我们需要让线程按顺序逐个执行,这个时候就用到同步器,让各个线程之间有顺序的执行,如果你是单线程的程序,那么这个AQS对单线程没有任何意义。

有啥用

首先,可以装X,哦不,是理解JDK大佬们的思想,加以学习,用虔诚的心膜拜各位大佬。基本在各个 Lock 锁中都能看到内部实现了一个 AQS 同步器,作为小白的我认为,在多线程中协调线程运行顺序,就可以使用同步器,虽然小白基本不会直接使用AQS,但是知道大佬们的使用姿势也是好的。

如果你想自定义同步器在实现AQS时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。

AQS内部构成

上面我大致解释了一下AQS是啥,那接下来再看看内部都有啥,满世界的 volatile,咱们下期文章再谈,先看AQS。

volatile int state

这个注释上写的是:The synchronization state,同步状态,具体是啥含义?得看实现类的定义了,例如ReentrantLock可重入锁的实现,state是用来表示加锁次数,可重入就可多次加锁。

final class Node 内部类

这个 Node 是等待队列中的节点类,这里还需要提一下 CLH (Craig, Landin, and Hagersten) 锁,因为这是 CLH 锁的变体,CLH锁是一种是基于逻辑队列非线程饥饿的一种自旋公平锁,由于是 Craig、Landin 和 Hagersten三位大佬的发明,因此命名为CLH锁。AQS作为JUC的核心,而CLH则是AQS的基础。先继续看里面有什么:

volatile int waitStatus,节点等待状态,有5种值:

  • 0 当一个Node被初始化的时候的默认值

  • CANCELLED 为1,表示线程获取锁的请求已经取消了

  • CONDITION 为-2,表示节点在等待队列中,节点线程等待唤醒

  • PROPAGATE 为-3,当前线程处在SHARED情况下,该字段才会使用

  • SIGNAL 为-1,表示线程已经准备好了,就等资源释放了

volatile Thread thread,线程引用,这个就是装载的等待的线程了。

volatile Node prev,next,这个就是节点的前驱和后继的节点引用,这样就可以形成双向队列。

Node nextWaiter,链接到下一个等待条件的节点,或共享的特殊值。这个是Condition条件队列,先暂时放一放,后面再写条件队列和同步队列的转化。

volatile Node head

注意这里咱们已经从 Node 里出来了,这里是 AQS 里的头节点

volatile Node tail

这里是 AQS 里的尾节点

到这里基本介绍了AQS的结构,那到底怎么运行的呢,首先AQS只是个抽象类,只实现了等待队列的维护,所以这里只先说队列的运行过程,共享资源state的操作需要看具体实现类。

acquire获锁

在代码里看看获取锁的逻辑是啥,代码是:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

执行顺序依次是:tryAcquire(arg)、addWaiter(Node.EXCLUSIVE)、acquireQueued,如果进 if 就执行 selfInterrupt(),不要着急挨个看看大佬们的操作。

尝试直接去获取资源

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

看第一个 tryAcquire 就懵逼了,大佬的操作呢?直接抛异常是什么鬼?还记得我上面说的吗,AQS只是个框架,实现了一半的类,这个就需要实现AQS的子类来实现,是否能重入,是否能加塞,子类自己去实现,所以没啥看的,继续下一个。

将该线程加入等待队列的尾部

addWaiter(Node)方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点,代码是:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

意思是把当前线程封装到Node里,然后判断尾节点不为空就将这个节点设置为尾节点,如果失败了就执行enq(node)入队。

enq(node)入队

通过CAS自旋加入队尾,CAS自旋后面的文章再讲,先看AQS的逻辑,代码是:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued排队

经过上面tryAcquire尝试获锁addWaiter添加到队尾,现在该干嘛了?已经到队尾了,乖乖排队,等着被叫号重新唤起,我大概描述一下流程:

首先用try来处理线程中断,然后用 for 自旋不断循环尝试,拿到节点的前驱节点;

判断前驱节点是不是头节点,如果是的话就去tryAcquire尝试获取,成功的话通过setHead讲自己设置为头节点,讲前驱节点的后继引用设置为null,方便GC回收;

如果前驱节点不是头节点,自己可以去休息了,通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,自旋,发现拿不到资源,继续进入park()等待。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire检查状态,拿到前驱节点的waitStatus状态,看看前面排队的人有没有放弃,如果前驱节点放弃了就插队到前面,然后自己去休息吧。

parkAndCheckInterrupt,通过LockSupport.park()去休息,LockSupport.park()也放到后面文章再说,不要着急。

到这里获锁的AQS排队流程就走完了,现在开始释放流程。

release放锁

先看看代码里的逻辑:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

先执行tryRelease,然后unparkSuccessor唤醒等待队列中下一个节点。

tryRelease,跟上面的获取锁一样,需要子类去实现。

unparkSuccessor,用于唤醒等待队列中下一个线程,主要是寻找下一个节点,如果下一个节点已经放弃了,就从后往前找队列里最前面的,然后唤醒节点。

这里为啥是从后往前找?而不是从前往后找?因为可能会有问题,设想这样的情况:

有一个节点正在调用addWaiter入队,将自己设置到队尾,执行完 compareAndSetTail(pred, node) 被踢出 CPU 挂起,这个时候你从前往后找,到最后的时候会发现 next 是 null,因为 pred.next = node; 这句还没执行!

以上是独占模式,还有共享模式,但我写不动了,在这里只写字吧,就不逐一去找代码了。

独占模式只有一个线程在工作,共享模式跟独占差不多,就多了一步操作,获得资源的线程会判断是否还有剩余的资源,如果还有剩余的资源会唤醒自己后面的兄弟一起来工作,这个资源数是 tryAcquireShared 返回的,也是子类自己实现的,我在这里就简单的写一下调用流程链:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

tryAcquireShared,需要子类自己实现,但AQS已经把其返回值的语义定好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源

doAcquireShared,将当前线程加入等待队列尾休息

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这里跟独占式很像,主要不一样的是调用了 setHeadAndPropagate 这个方法,我们再看看这个方法:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

里面又调用了 doReleaseShared 方法,什么意思呢?如果还有剩余量,继续唤醒下一个邻居线程,让自己的兄弟一起工作起来,这就是跟独占式最大的区别。

AQS就先写到这里吧,一旦扩展开到处是知识,我这个小白控制不住,下个文章写CAS吧,请关注我哦。

商业用途请联系作者获得授权。
版权声明:本文为博主「任霏」原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://www.renfei.net/posts/1003520
评论与留言

以下内容均由网友提交发布,版权与真实性无法查证,请自行辨别。

微信搜一搜:任霏博客