php中文网 | cnphp.com

 找回密码
 立即注册

QQ登录

只需一步,快速开始

搜索
查看: 371|回复: 0

什么是AbstractQueuedSynchronizer(AQS)

[复制链接]

2871

主题

2881

帖子

1万

积分

管理员

Rank: 9Rank: 9Rank: 9

UID
1
威望
0
积分
7285
贡献
0
注册时间
2021-4-14
最后登录
2024-9-20
在线时间
716 小时
QQ
发表于 2022-7-13 20:18:25 | 显示全部楼层 |阅读模式
字面意思是抽象队列同步器,使用一个voliate修饰的int类型的同步状态,通过一个FIFO队列完成资源获取的排队工作,把每个参与资源竞争的线程封装成一个Node节点来实现锁的分配。

AbstractQueuedSynchronizer源码
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private transient volatile Node head;//链表头
    private transient volatile Node tail;//链表尾
    private transient Thread exclusiveOwnerThread;//持有锁的线程
    private volatile int state;//同步状态,0表示当前没有线程获取到锁
    static final class Node {//链表的Node节点类
      volatile int waitStatus;//当前节点在队列中的状态
      volatile Node prev;//前置节点
      volatile Node next;//后置节点
      volatile Thread thread;//当前线程
    }
}

AQS同步队列的基本结构

9de90fd19e3c21bc362b33c4a31d6145_1336600-20220713163322354-449426838.png
Node.waitStatus的说明
image.png
state为什么要用volatile修饰?
可见性,一个线程对变量的修改可以立即被别的线程感知到
有序性,禁止指令重排
AQS获取锁步骤
  public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          selfInterrupt();
  }

当一个线程获取锁时,首先判断state状态值是否为0
如果state==0,则通过CAS的方式修改为非0状态
修改成功,则表明获取锁成功,执行业务代码
修改失败,则把当前线程封装为一个Node节点,加入到队列中并挂起当前线程
如果state!=0,则把当前线程封装为一个Node节点,加入到队列中并挂起当前线程
AQS获取锁过程
首先调用tryAcquire去修state的状态值,成功就获取当前锁;失败则加入当前等待队列中,然后挂起线程。

tryAcquire
在AQS的源码中tryAcquire是一空实现,需要它的子类去实现这个空方法。因为在AQS中虽然公平锁和非公平锁的都是基于一个CLH去实现,但是在获取锁的过程中略有不同。
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

公平锁FairSync#tryAcquire

protected final boolean tryAcquire(int acquires) {
            //获取当前线程
            final Thread current = Thread.currentThread();
            int c = getState();//获取同步器的状态
            if (c == 0) {//当前没有线程获取到锁
                //首先判断祖宗节点的线程是否当前线程一样
                if (!hasQueuedPredecessors() &&
                    //更改state的状态值为非0
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果锁持有者的线程是当前线程,则可放行,锁的重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
/**
* 判断祖宗节点的线程是否当前线程一样
* 傀儡节点的下个节点
*/
public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    //头节点的下个节点所持有的线程是否与当前线程相同
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
非公平锁NonfairSync#tryAcquire

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //通过CAS更改state的状态值
        if (compareAndSetState(0, acquires)) {
            //把当前线程设置为锁的持有者
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果锁持有者的线程是当前线程,则可放行,锁的重入
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
6888c9b2fa2dee222a680eb886f4f9e9_1336600-20220713163322314-138546421.png
对比后发现,公平锁先判断是否有老祖宗节点,如果有则返回false;如果当前线程对应的node就是老祖宗节点,则直接去修改state状态,把state改为非0。

addWaiter
获取锁成功的线程去执行业务逻辑了,获取锁失败的线程则会在队列中排队等候,每个等候的线程也都不安分的。
private Node addWaiter(Node mode) {
    //把当前线程封装为一个Node节点
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //加入到队列的尾部
    enq(node);
    return node;
}
把当前线程封装为一个Node节点
当第一次执行这个方法时,由于head和tail都还没有赋值,则pred指向的tail也是空,所以直接直到enq(node)
当pred指向的tail不为空时,则通过CAS的方式加入到尾部,如果成功直接返回;如果失败,则进入enq(node)通过自旋的方式加入。
//通过自旋的方式将节点加入到节点的尾部
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
为了操作链表的方便,一般都要在链表的头前加入一个傀儡节点,AQS的链表也不例外。
先创建一个傀儡节点,并把head、tail均指向它,然后再把node节点加入到尾部后面,移动tail的指向。

acquireQueued
当节点成功加入到链表的尾部后,等待被唤醒,然后通过自旋的方式去获取锁
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //当前节点的前置节点
            final Node p = node.predecessor();
            //如果前置节点是傀儡节点(head指向傀儡节点),则再次尝试去获取锁
            if (p == head && tryAcquire(arg)) {
                //获取成功后,则移除之前的傀儡节点,head指向当前node,
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //获取锁失败后,设置node节点的状态,并挂起当前节点
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
获取node节点的前置节点,如果前置节点是head,则再次尝试去获取锁
设置当前node节点的前置节点状态为-1(表示后续节点正在等待状态,默认是0),然后通过自旋的后会进行到parkAndCheckInterrupt挂起当前节点
LockSupport.park(this)执行完事,当前线程会一直阻塞到这个地方
当前唤醒时再次从1开始执行
2d00937cf142ea74232e6eec6ba39af4_1336600-20220713163322270-639899469.png
AQS释放锁过程
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
主要是恢复state的值、重置锁持有都线程,然后唤醒挂起的线程。

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    //当前线程与锁持有者线程不一样会报错
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {//重入的次数为0时,则当前线程已经没有重入了,可以清空锁的持有者
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
恢复state状态的值,如果重入次数为0时,则清空锁的持有都为null

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)//唤醒下个node对应的线程
            LockSupport.unpark(s.thread);
    }

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|php中文网 | cnphp.com ( 赣ICP备2021002321号-2 )

GMT+8, 2024-9-20 12:28 , Processed in 0.191676 second(s), 33 queries , Gzip On.

Powered by Discuz! X3.4 Licensed

Copyright © 2001-2020, Tencent Cloud.

申明:本站所有资源皆搜集自网络,相关版权归版权持有人所有,如有侵权,请电邮(fiorkn@foxmail.com)告之,本站会尽快删除。

快速回复 返回顶部 返回列表