AbstractQueuedSynchronizer详解

starlin 1,050 2018-06-05

AQS简介

AbstractQueuedSynchronizer简称AQS,即抽象的队列同步器,它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。它是JUC并发包中的核心基础组件。
整个 AQS 分为以下几部分:

  • Node 节点, 用于存放获取线程的节点, 存在于 Sync Queue, Condition Queue, 这些节点主要的区分在于 waitStatus 的值
  • Condition Queue, 这个队列是用于独占模式中, 只有用到 Condition.awaitXX 时才会将 node加到 tail 上
  • Sync Queue, 独占 共享的模式中均会使用到的存放 Node 的 CLH queue
  • ConditionObject, 用于独占的模式, 主要是线程释放lock, 加入 Condition Queue, 并进行相应的 signal 操作
  • 独占的获取lock (acquire, release), 例如 ReentrantLock
  • 共享的获取lock (acquireShared, releaseShared), 例如 ReeantrantReadWriteLock, Semaphore, CountDownLatch

![AQS特点](https://raw.githubusercontent.com/smartlin/pic/main/_posts/java%E5%B9%B6%E5%8F%91/java%E5%B9%B6%E5%8F%91%E4%B9%8Babstractqueuedsynchronizer%E8%AF%A6%E8%A7%A3.md/AQS%E7%89%B9%E7%82%B9.png =899x)

AQS原理概览

AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。
这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
CLH:(Craig、Landin and Hagersten)队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

主要原理图如下:
AQS使用一个Volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。
![CLH变体队列](https://raw.githubusercontent.com/smartlin/pic/main/_posts/java%E5%B9%B6%E5%8F%91/java%E5%B9%B6%E5%8F%91%E4%B9%8Babstractqueuedsynchronizer%E8%AF%A6%E8%A7%A3.md/CLH%E5%8F%98%E4%BD%93%E9%98%9F%E5%88%97.png =672x)

CAS

CAS(Compare and Swap),比较并交换,通过利用底层硬件平台的特性,实现原子性操作。CAS 操作涉及到3个操作数,内存值 V,旧的期望值 A,需要修改的新值 B。当且仅当预期值 A 和 内存值 V 相同时,才将内存值 V 修改为 B,否则什么都不做。CAS 操作类似于执行了下面流程

if(oldValue == memory[valueAddress]) {
    memory[valueAddress] = newValue;
}

在上面的流程中,其实涉及到了两个操作,比较以及替换,为了确保程序正确,需要确保这两个操作的原子性(也就是说确保这两个操作同时进行,中间不会有其他线程干扰)。现在的 CPU 中,提供了相关的底层 CAS 指令,即 CPU 底层指令确保了比较和交换两个操作作为一个原子操作进行(其实在这一点上还是有排他锁的. 只是比起用synchronized, 这里的排他时间要短的多.),Java 中的 CAS 函数是借助于底层的 CAS 指令来实现的.我们来看下 Java 中对于 CAS 函数的定义:

/**
 * Atomically update Java variable to x if it is currently
 * holding expected.
 * @return true if successful
 */
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);
/**
 * Atomically update Java variable to x if it is currently
 * holding expected.
 * @return true if successful
 */
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
/**
 * Atomically update Java variable to x if it is currently
 * holding expected.
 * @return true if successful
 */
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long x);

上面三个函数定义在 sun.misc.Unsafe 类中,使用该类可以进行一些底层的操作,例如直接操作原生内存,更多关于 Unsafe 类的文章可以参考 这篇

这里简单介绍一下CAS,详情请点击这里

同步队列

AQS 依赖内部的同步队列(一个 FIFO的双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把队列中第一个等待节点线程唤醒(下图中的 Node1),使其再次尝试获取同步状态。同步队列的结构如下所示

Head 节点本身不保存等待线程的信息,它通过 next 变量指向第一个保存线程等待信息的节点(Node1)。当线程被唤醒之后,会删除 Head 节点,而唤醒线程所在的节点会设置为 Head 节点(Node1 被唤醒之后,Node1会被置为 Head 节点)。

内部类Node

Node 节点是代表获取lock的线程, 存在于 Condition Queue, Sync Queue 里面, 而其主要就是 nextWaiter (标记共享还是独占),waitStatus 标记node的状态

Node 的数据结构其实也挺简单的,就是 thread + waitStatus + pre + next 四个属性而已,源码如下:

 static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        // 标记当前节点在共享模式下
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        // 标记当前节点在独占模式下
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
         // 代码此线程取消了争抢这个锁,是唯一一个大于0的状态
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        // 表示当前node的后继节点(即next节点)对应的线程需要被唤醒
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        // 表明线程正在等待一个条件
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
         // 用于acquireShared中向后传播
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
         // 等待状态
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
         //  前驱节点的引用
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
         //  后继节点的引用
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
         // 获取同步状态的线程
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        // 作用分2种
        // 1.在 Sync Queue 里面, nextWaiter用来判断节点是 共享模式, 还是独占模式
        // 2.在 Condition queue 里面, 节点主要是链接且后继节点 (Condition queue是一个单向的, 不支持并发的 list)
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        // 当前节点是否是共享模式
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        // 获取node的前继节点 
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }
        // 初始化 Node 用于 Sync Queue 里面
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        // 初始化 Node 用于 Condition Queue 里面
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

在 Node 类中定义了四种等待状态:

  • CANCELED: 1,因为等待超时 (timeout)或者中断(interrupt),节点会被置为取消状态。处于取消状态的节点不会再去竞争锁,也就是说不会再被阻塞。节点会一直保持取消状态,而不会转换为其他状态。处于 CANCELED 的节点会被移出队列,被 GC 回收。
  • SIGNAL: -1,表明当前的后继结点正在或者将要被阻塞(通过使用 LockSupport.pack 方法),因此当前的节点被释放(release)或者被取消时(cancel)时,要唤醒它的后继结点(通过 LockSupport.unpark 方法)。
  • CONDITION: -2,表明当前节点在条件队列中,因为等待某个条件而被阻塞。
  • PROPAGATE: -3,在共享模式下,可以认为资源有多个,因此当前线程被唤醒之后,可能还有剩余的资源可以唤醒其他线程。该状态用来表明后续节点会传播唤醒的操作。需要注意的是只有头节点才可以设置为该状态(This is set (for head node only) in doReleaseShared to ensure propagation continues, even if other operations have since intervened.)。
    0:新创建的节点会处于这种状态

waitStatus的状态变化:

  1. 线程刚入Sync Queue 里面, 发现独占锁被其他人获取, 则将其前继节点标记为SIGNAL, 然后再尝试获取一下锁(调用tryAcquire方法)
  2. 若调用 tryAcquire方法获取失败, 则判断一下是否前继节点被标记为 SIGNAL, 若是的话直接block(block前会确保前继节点被标记为SIGNAL, 因为前继节点在进行释放锁时根据是否标记为 SIGNAL 来决定唤醒后继节点与否 <- 这是独占的情况下)
  3. 前继节点使用完lock, 进行释放, 因为自己被标记为 SIGNAL, 所以唤醒其后继节点

waitStatus 变化过程:

  1. 独占模式下: 0(初始) -> signal(被后继节点标记为release需要唤醒后继节点) -> 0 (等释放好lock, 会恢复到0)
  2. 独占模式 + 使用 Condition情况下: 0(初始) -> signal(被后继节点标记为release需要唤醒后继节点) -> 0 (等释放好lock, 会恢复到0)
    其上可能涉及 中断与超时, 只是多了一个 CANCELLED, 当节点变成 CANCELLED, 后就等着被清除
  3. 共享模式下: 0(初始) -> PROPAGATE(获取 lock 或release lock 时) (获取 lock 时会调用 setHeadAndPropagate 来进行 传递式的唤醒后继节点, 直到碰到 独占模式的节点)
  4. 共享模式 + 独占模式下: 0(初始) -> signal(被后继节点标记为release需要唤醒后继节点) -> 0 (等释放好lock, 会恢复到0)
    其上的这些状态变化主要在: doReleaseShared , shouldParkAfterFailedAcquire 里面

独占锁的获取和释放

独占锁的获取

下面是获取独占锁的流程图:

我们通过 acquire 方法来获取独占锁,下面是方法定义

public final void acquire(int arg) {
    // 首先尝试获取锁,如果获取失败,会先调用 addWaiter 方法创建节点并追加到队列尾部
    // 然后调用 acquireQueued 阻塞或者循环尝试获取锁
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
        // 在 acquireQueued 中,如果线程是因为中断而退出的阻塞状态会返回 true
        // 这里的 selfInterrupt 主要是为了恢复线程的中断状态
        selfInterrupt();
    }
}

acquire 会首先调用 tryAcquire 方法来获得锁,该方法需要我们来实现,这个在前面已经提过了。如果没有获取锁,会调用 addWaiter 方法创建一个和当前线程关联的节点追加到同步队列的尾部,我们调用 addWaiter 时传入的是 Node.EXCLUSIVE,表明当前是独占模式。下面是 addWaiter 的具体实现

    private Node addWaiter(Node mode) {
        //新建Node
        Node node = new Node(Thread.currentThread(), mode);
        //快速尝试添加尾节点
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //CAS设置尾节点
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //多次尝试
        enq(node);
        return node;
    }

addWaiter 方法会首先调用 if 方法,来判断能否成功将节点添加到队列尾部,如果添加失败,再调用 enq 方法(使用循环不断重试)进行添加,下面是 enq 方法的实现:

private Node enq(final Node node) {
    //通过“死循环”的方式来保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去。
    for (;;) {
        Node t = tail;
        // 同步队列采用的懒初始化(lazily initialized)的方式,
        // 初始时 head 和 tail 都会被设置为 null,当一次被访问时
        // 才会创建 head 对象,并把尾指针指向 head。
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //设置为尾节点
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter 仅仅是将节点加到了同步队列的末尾,并没有阻塞线程,线程阻塞的操作是在 acquireQueued 方法中完成的,下面是 acquireQueued 的实现:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        //中断标志
        boolean interrupted = false;
        //自旋过程,其实就是一个死循环而已
        for (;;) {
            final Node p = node.predecessor();
            // 如果当前节点的前继节点是 head,就使用自旋(循环)的方式不断请求锁
            if (p == head && tryAcquire(arg)) {
                // 成功获得锁,将当前节点置为 head 节点,同时删除原 head 节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // shouldParkAfterFailedAcquire 检查是否可以挂起线程,
            // 如果可以挂起进程,会调用 parkAndCheckInterrupt 挂起线程,
            // 如果 parkAndCheckInterrupt 返回 true,表明当前线程是因为中断而退出挂起状态的,
            // 所以要将 interrupted 设为 true,表明当前线程被中断过
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireQueued 会首先检查当前节点的前继节点是否为 head,如果为 head,将使用自旋的方式不断的请求锁,如果不是 head,则调用 shouldParkAfterFailedAcquire 查看是否应该挂起当前节点关联的线程,下面是 shouldParkAfterFailedAcquire 的实现

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 当前节点的前继节点的等待状态
    int ws = pred.waitStatus;
    // 如果前继节点的等待状态为 SIGNAL 我们就可以将当前节点对应的线程挂起
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        // ws 大于 0,表明当前线程的前继节点处于 CANCELED 的状态,
        // 所以我们需要从当前节点开始往前查找,直到找到第一个不为
        // CAECELED  状态的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

shouldParkAfterFailedAcquire 会检查前继节点的等待状态,如果前继节点状态为 SIGNAL,则可以将当前节点关联的线程挂起,如果不是 SIGNAL,会做一些其他的操作,在当前循环中不会挂起线程。如果确定了可以挂起线程,就调用 parkAndCheckInterrupt 方法对线程进行阻塞:

private final boolean parkAndCheckInterrupt() {
    // 挂起当前线程
    LockSupport.park(this);
    // 可以通过调用 interrupt 方法使线程退出 park 状态,
    // 为了使线程在后面的循环中还可以响应中断,会重置线程的中断状态。
    // 这里使用 interrupted 会先返回线程当前的中断状态,然后将中断状态重置为 false,
    // 线程的中断状态会返回给上层调用函数,在线程获得锁后,
    // 如果发现线程曾被中断过,会将中断状态重新设为 true
    return Thread.interrupted();
}

独占锁的释放

下面是获取独占锁的流程图:

通过 release 方法,我们可以释放互斥锁。下面是 release 方法的实现:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // waitStatus 为 0,证明是初始化的空队列或者后继结点已经被唤醒了
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

在独占模式下释放锁时,是没有其他线程竞争的,所以处理会简单一些。首先尝试释放锁,如果失败就直接返回(失败不是因为多线程竞争,而是线程本身就不拥有锁)。如果成功的话,会检查 h 的状态,然后调用 unparkSuccessor 方法来唤醒后续线程。
下面是unparkSuccessor的实现

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 将 head 节点的状态置为 0,表明当前节点的后续节点已经被唤醒了,
    // 不需要再次唤醒,修改 ws 状态主要作用于 release 的判断
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

共享锁获取和释放

独占锁的流程和原理比较容易理解,因为只有一个锁,但是共享锁的处理就相对复杂一些了。在独占锁中,只有在释放锁之后,才能唤醒等待的线程,而在共享模式中,获取锁和释放锁之后,都有可能唤醒等待的线程。如果想要理清共享锁的工作过程,必须将共享锁的获取和释放结合起来看。这里我们先看一下共享锁的释放过程,只有明白了释放过程做了哪些工作,才能更好的理解获取锁的过程。

共享锁释放

下面是释放共享锁的流程:

通过 releaseShared 方法会释放共享锁,下面是具体的实现

public final boolean releaseShared(int releases) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

releases 是要释放的共享资源数量,其中 tryReleaseShared 的方法由我们自己重写,该方法的主要功能就是修改共享资源的数量(state + releases),因为可能会有多个线程同时释放资源,所以实现的时候,一般采用循环加 CAS 操作的方式,如下面的形式:

protected boolean tryReleaseShared(int releases) {
    // 释放共享资源,因为可能有多个线程同时执行,所以需要使用 CAS 操作来修改资源总数。
    for (;;) {
        int lastCount = getState();
        int newCount = lastCount + releases;
        if (compareAndSetState(lastCount, newCount)) {
            return true;
        }
    }
}

当共享资源数量修改了之后,会调用 doReleaseShared 方法,该方法主要唤醒同步队列中的第一个等待节点(head.next),下面是具体实现:

private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            // head = null 说明没有初始化,head = tail 说明同步队列中没有等待节点
            if (h != null && h != tail) {
            // 查看当前节点的等待状态
                int ws = h.waitStatus;
                // SIGNAL说明有后续节点需要唤醒
                if (ws == Node.SIGNAL) {
                  /*
                   * 将当前节点的值设为 0,表明已经唤醒了后继节点
                   * 可能会有多个线程同时执行到这一步,所以使用 CAS 保证只有一个线程能修改成功,
                   * 从而执行 unparkSuccessor,其他的线程会执行 continue 操作
                   */
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                /*
                 * ws 等于 0,说明无需唤醒后继结点(后续节点已经被唤醒或者当前节点没有被阻塞的后继结点),
                 * 也就是这一次的调用其实并没有执行唤醒后继结点的操作。就类似于我只需要一张优惠券,
                 * 但是我的两个朋友,他们分别给我了一张,因此我就剩余了一张。然后我就将这张剩余的优惠券
                 * 送(传播)给其他人使用,因此这里将节点置为可传播的状态(PROPAGATE)
                 */
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

从上面的实现中,doReleaseShared 的主要作用是用来唤醒阻塞的节点并且一次只唤醒一个,让该节点关联的线程去重新竞争锁,它既不修改同步队列,也不修改共享资源。
当多个线程同时释放资源时,可以确保两件事:

  • 共享资源的数量能正确的累加
  • 至少有一个线程被唤醒,其实只要确保有一个线程被唤醒就可以了,即便唤醒了多个线程,在同一时刻,也只能有一个线程能得到竞争锁的资格,在下面我们会看到。

所以释放锁做的主要工作还是修改共享资源的数量。而有了多个共享资源后,如何确保同步队列中的多个节点可以获取锁,是由获取锁的逻辑完成的。下面看下共享锁的获取。

共享锁的获取

下面是获取共享锁的流程

通过 acquireShared 方法,我们可以申请共享锁,下面是具体的实现:

public final void acquireShared(int arg) {
    // 如果返回结果小于 0,证明没有获取到共享资源
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

如果没有获取到共享资源,就会执行 doAcquireShared 方法,下面是该方法的具体实现:

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

从上面的代码中可以看到,只有前置节点为 head 的节点才有可能去竞争锁,这点和独占模式的处理是一样的,所以即便唤醒了多个线程,也只有一个线程能进入竞争锁的逻辑,其余线程会再次进入 park 状态,当线程获取到共享锁之后,会执行 setHeadAndPropagate 方法,下面是具体的实现:

private void setHeadAndPropagate(Node node, long propagate) {
    // 备份一下头节点
    Node h = head; // Record old head for check below
    /*
     * 移除头节点,并将当前节点置为头节点
     * 当执行完这一步之后,其实队列的头节点已经发生改变,
     * 其他被唤醒的线程就有机会去获取锁,从而并发的执行该方法,
     * 所以上面备份头节点,以便下面的代码可以正确运行
     */
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
     /*
      * 判断是否需要唤醒后继结点,propagate > 0 说明共享资源有剩余,
      * h.waitStatus < 0,表明当前节点状态可能为 SIGNAL,CONDITION,PROPAGATE
      */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 只有 s 不处于独占模式时,才去唤醒后继结点
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

判断后继结点是否需要唤醒的条件是十分宽松的,也就是一定包含必要的唤醒,但是也有可能会包含不必要的唤醒。从前面我们可以知道 doReleaseShared 函数的主要作用是唤醒后继结点,它既不修改共享资源,也不修改同步队列,所以即便有不必要的唤醒也是不影响程序正确性的。如果没有共享资源,节点会再次进入等待状态。

到了这里,脉络就比较清晰了,当一个节点获取到共享锁之后,它除了将自身设为 head 节点之外,还会判断一下是否满足唤醒后继结点的条件,如果满足,就唤醒后继结点,后继结点获取到锁之后,会重复这个过程,直到判断条件不成立。就类似于考试时从第一排往最后一排传卷子,第一排先留下一份,然后将剩余的传给后一排,后一排会重复这个过程。如果传到某一排卷子没了,那么位于这排的人就要等待,直到老师又给了他新的卷子。

中断

在获取锁时还可以设置响应中断,独占锁和共享锁的处理逻辑类似,这里我们以独占锁为例。使用 acquireInterruptibly 方法,在获取独占锁时可以响应中断,下面是具体的实现:

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

doAcquireInterruptibly方法实现:

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                // 这里会抛出异常
                throw new InterruptedException();
            }
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

从上面的代码中我们可以看出,acquireInterruptibly 和 acquire 的逻辑类似,只是在下面的代码处有所不同:当线程因为中断而退出阻塞状态时,会直接抛出 InterruptedException 异常。

我们知道,不管是抛出异常还是方法返回,程序都会执行 finally 代码,而 failed 肯定为 true,所以抛出异常之后会执行 cancelAcquire 方法,cancelAcquire 方法主要将节点从同步队列中移除。下面是具体的实现:

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;
    node.thread = null;
    // 跳过前面的已经取消的节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // 保存下 pred 的后继结点,以便 CAS 操作使用
    // 因为可能存在已经取消的节点,所以 pred.next 不一等于 node
    Node predNext = pred.next;
    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    // 将节点状态设为 CANCELED
    node.waitStatus = Node.CANCELLED;
    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

从上面的代码可以看出,节点的删除分为三种情况:

  • 删除节点为尾节点,直接将该节点的第一个有效前置节点置为尾节点
  • 删除节点的前置节点为头节点,则对该节点执行 unparkSuccessor 操作
  • 删除节点为中间节点,结果如下图所示。下图中(1)表示同步队列的初始状态,假设删除 node2, node1 是正常节点(非 CANCELED),(2)就是删除 node2 后同步队列的状态,此时 node1 节点的后继已经变为 node3,也就是说当 node1 变为 head 之后,会直接唤醒 node3。当另外的一个节点中断之后再次执行 cancelAcquire,在执行下面的代码时,会使同步队列的状态由(2)变为(3),此时 node2 已经没有外界指针了,可以被回收了。如果一直没有另外一个节点中断,也就是同步队列一直处于(2)状态,那么需要等 node3 被回收之后,node2 才可以被回收

超时

超时是在中断的基础上加了一层时间的判断,这里我们还是以独占锁为例。 tryAcquireNanos 支持获取锁的超时处理,下面是具体实现:

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}

当获取锁失败之后,会执行 doAcquireNanos 方法,下面是具体实现:

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0 L)
        return false;
    // 线程最晚结束时间
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 判断是否超时,如果超时就返回
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0 L)
                return false;
            // 这里如果设定了一个阈值,如果超时的时间比阈值小,就认为
            // 当前线程没必要阻塞,再执行几次 for 循环估计就超时了
            if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

当线程超时返回时,还是会执行 cancelAcquire 方法,cancelAcquire 的逻辑已经在前面说过了,这里不再赘述。

Condition Queue

Condition Queue 是一个并发不安全的, 只用于独占模式的队列(PS: 为什么是并发不安全的呢? 主要是在操作 Condition 时, 线程必需获取 独占的 lock, 所以不需要考虑并发的安全问题)

而当Node存在于 Condition Queue 里面, 则其只有 waitStatus, thread, nextWaiter 有值, 其他的都是null(其中的 waitStatus 只能是 CONDITION, 0(0 代表node进行转移到 Sync Queue里面, 或被中断/timeout)); 这里有个注意点, 就是 当线程被中断或获取 lock 超时, 则一瞬间 node 会存在于 Condition Queue, Sync Queue 两个队列中.

节点 Node4, Node5, Node6, Node7 都是调用 Condition.awaitXX 方法 加入 Condition Queue(PS: 加入后会将原来的 lock 释放)

Condition Queue 入队列方法 addConditionWaiter

源码如下:

        /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         * 将当前线程封装成一个 Node 节点 放入大 Condition Queue 里面
         */
        private Node addConditionWaiter() {
            // Condition queue 的尾节点
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            // 尾节点已经Cancel, 直接进行清除
            
            // 这里有个问题, 何时出现t.waitStatus != Node.CONDITION -> 在对线程进行中断时 ConditionObject -> await -> checkInterruptWhileWaiting ->
            //transferAfterCancelledWait "compareAndSetWaitStatus(node, Node.CONDITION, 0)" <- 导致这种情况一般是 线程中断或 await 超时
            
            // 一个注意点: 当Condition进行 awiat 超时或被中断时, Condition里面的节点是没有被删除掉的, 需要其他 await 在将线程加入 Condition Queue 时调用 
            //addConditionWaiter而进而删除, 或 await 操作差不多结束时, 调用 "node.nextWaiter != null" 进行判断而删除 (PS: 通过 signal 进行唤醒时
            //node.nextWaiter 会被置空, 而中断和超时时不会)
            if (t != null && t.waitStatus != Node.CONDITION) {
            //调用 unlinkCancelledWaiters 对 "waitStatus != Node.CONDITION" 的节点进行删除(在Condition里面的Node的waitStatus 要么是CONDITION(正常), 
            //要么就是 0 (signal/timeout/interrupt))
                unlinkCancelledWaiters();
                // 获取最新的lastWaiter
                t = lastWaiter;
            }
            // 将线程封装成 node 准备放入 Condition Queue 里面
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                //加到queue尾部
                t.nextWaiter = node;
            // 重新复制lastWaiter   
            lastWaiter = node;
            return node;
        }

Condition Queue 删除Cancelled节点的方法 unlinkCancelledWaiters

当Node在Condition Queue 中, 若状态不是 CONDITION, 则一定是 被中断或超时

        /**
         * Unlinks cancelled waiter nodes from condition queue.
         * Called only while holding lock. This is called when
         * cancellation occurred during condition wait, and upon
         * insertion of a new waiter when lastWaiter is seen to have
         * been cancelled. This method is needed to avoid garbage
         * retention in the absence of signals. So even though it may
         * require a full traversal, it comes into play only when
         * timeouts or cancellations occur in the absence of
         * signals. It traverses all nodes rather than stopping at a
         * particular target to unlink all pointers to garbage nodes
         * without requiring many re-traversals during cancellation
         * storms.
         */
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
            // 初始化next节点
                Node next = t.nextWaiter;
            // 节点不有效, 在Condition Queue 里面 Node.waitStatus 只有可能是 CONDITION 或是 0(timeout/interrupt引起的)    
                if (t.waitStatus != Node.CONDITION) {
                // Node.nextWaiter 置空
                    t.nextWaiter = null;
                    // 一次都没有遇到有效的节点
                    if (trail == null)
                    // 将 next 赋值给 firstWaiter(此时 next 可能也是无效的, 这只是一个临时处理)
                        firstWaiter = next;
                    else
                    // next 赋值给 trail.nextWaiter, 这一步其实就是删除节点 t
                        trail.nextWaiter = next;
                    // next == null 说明 已经 traverse 完了 Condition Queue    
                    if (next == null)
                    // 将有效节点赋值给 trail
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

Condition Queue 转移节点的方法 transferForSignal

transferForSignal只有在节点被正常唤醒才调用的正常转移的方法,源码如下:

    /**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    // 将node从 condition queue转移到sync queue
    // 在调用transferForSignal之前, 会 first.nextWaiter = null;
    // 若节点是因为timeout / interrupt 进行转移, 则不会进行这步操作; 两种情况的转移都会把 wautStatus 置为 0
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        // 若node已经cancell,则失败 
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        //加入Sync queue 
        Node p = enq(node);
        int ws = p.waitStatus;
        //这里的 ws > 0 指Sync Queue中node的前继节点cancelled了, 所以, 唤醒一下node ;
        //compareAndSetWaitStatus(p, ws, Node.SIGNAL)失败, 则说明前继节点已经变成SIGNAL或 cancelled, 所以也要唤醒
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

Condition Queue 转移节点的方法 transferAfterCancelledWait

transferAfterCancelledWait 在节点获取lock时被中断或获取超时才调用的转移方法,其源码如下:

    /**
     * Transfers node, if necessary, to sync queue after a cancelled wait.
     * Returns true if thread was cancelled before being signalled.
     *
     * @param node the node
     * @return true if cancelled before the node was signalled
     */
    //将Condition Queue 中因 timeout/interrupt 而唤醒的节点进行转移 
    final boolean transferAfterCancelledWait(Node node) {
    //  没有 node 没有 cancelled , 直接进行转移 (转移后, Sync Queue , Condition Queue 都会存在 node)
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
        //这时是其他的线程发送signal,将本线程转移到 Sync Queue 里面的工程中(转移的过程中 waitStatus = 0了, 所以上面的 CAS 操作失败) 
        while (!isOnSyncQueue(node))
        // 这里调用 isOnSyncQueue判断是否已经 入Sync Queue 了
            Thread.yield();
        return false;
    }

参考

The java.util.concurrent Synchronizer Framework
Java并发详解AbstractQueuedSynchronizer
abstractqueuedsynchronizer
CAS