一、概述

AbstractQueuedSynchronizer(以下简称AQS),又称队列同步器,作为java.util.concurrent包的基础,它提供了一套完整的同步编程框架,用来构建锁或者其他同步组件的基础框架,开发人员只需要实现其中几个简单的方法就能自由的使用诸如独占,共享,条件队列等多种同步模式。我们常用的比如ReentrantLock,CountDownLatch等等基础类库都是基于AQS实现的,足以说明这套框架的强大之处。鉴于此,我们开发人员更应该了解它的实现原理,这样才能在使用过程中得心应手。

二、独占锁

执行过程概述

获取锁的过程:

  1. 当线程调用acquire()申请获取锁资源,如果成功,则进入临界区。
  2. 当获取锁失败时,则进入一个FIFO等待队列,然后被挂起等待唤醒。
  3. 当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则进入临界区,否则继续挂起等待。

释放锁过程:

  1. 当线程调用release()进行锁资源释放时,如果没有其他线程在等待锁资源,则释放完成。
  2. 如果队列中有其他等待锁资源的线程需要唤醒,则唤醒队列中的第一个等待节点(先入先出)。

源码深入分析

首先来看下获取锁的方法acquire()

public final void acquire(int arg) {
	if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		selfInterrupt();
}

代码虽然短,但包含的逻辑却很多,一步一步看下:

  1. 首先是调用开发人员自己实现的tryAcquire() 方法尝试获取锁资源,如果成功则整个acquire()方法执行完毕,即当前线程获得锁资源,可以进入临界区。
  2. 如果获取锁失败,则开始进入后面的逻辑,首先是addWaiter(Node.EXCLUSIVE)方法。来看下这个方法的源码实现:
//注意:该入队方法的返回值就是新创建的节点
private Node addWaiter(Node mode) {
    //基于当前线程,节点类型(Node.EXCLUSIVE)创建新的节点
    //由于这里是独占模式,因此节点类型就是Node.EXCLUSIVE
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    //这里为了提搞性能,首先执行一次快速入队操作,即直接尝试将新节点加入队尾
    if (pred != null) {
        node.prev = pred;
        //这里根据CAS的逻辑,即使并发操作也只能有一个线程成功并返回,其余的都要执行后面的入队操作。即enq()方法
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
//完整的入队操作
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        //如果队列还没有初始化,则进行初始化,即创建一个空的头节点
        if (t == null) {
            //同样是CAS,只有一个线程可以初始化头结点成功,其余的都要重复执行循环体
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //新创建的节点指向队列尾节点,毫无疑问并发情况下这里会有多个新创建的节点指向队列尾节点
            node.prev = t;
            //基于这一步的CAS,不管前一步有多少新节点都指向了尾节点,这一步只有一个能真正入队成功,其他的都必须重新执行循环体
            if (compareAndSetTail(t, node)) {
                t.next = node;
                //该循环体唯一退出的操作,就是入队成功(否则就要无限重试)
                return t;
            }
        }
    }
}

上面的入队操作有两点需要说明:

一、初始化队列的触发条件就是当前已经有线程占有了锁资源,因此上面创建的空的头节点可以认为就是当前占有锁资源的节点(虽然它并没有设置任何属性)。

二、注意整个代码是处在一个死循环中,知道入队成功。如果失败了就会不断进行重试。

经过上面的操作,我们申请获取锁的线程已经成功加入了等待队列,通过文章最一开始说的独占锁获取流程,那么节点现在要做的就是挂起当前线程,等待被唤醒,这个逻辑是怎么实现的呢?来看下源码:

通过上面的分析,该方法入参node就是刚入队的包含当前线程信息的节点

final boolean acquireQueued(final Node node, int arg) {
    //锁资源获取失败标记位
    boolean failed = true;
    try {
        //等待线程被中断标记位
        boolean interrupted = false;
        //这个循环体执行的时机包括新节点入队和队列中等待节点被唤醒两个地方
        for (;;) {
            //获取当前节点的前置节点
            final Node p = node.predecessor();
            //如果前置节点就是头结点,则尝试获取锁资源
            if (p == head && tryAcquire(arg)) {
                //当前节点获得锁资源以后设置为头节点,这里继续理解我上面说的那句话
                //头结点就表示当前正占有锁资源的节点
                setHead(node);
                p.next = null; //帮助GC
                //表示锁资源成功获取,因此把failed置为false
                failed = false;
                //返回中断标记,表示当前节点是被正常唤醒还是被中断唤醒
                return interrupted;
            }
            如果没有获取锁成功,则进入挂起逻辑
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //最后会分析获取锁失败处理逻辑
        if (failed)
            cancelAcquire(node);
    }
}

挂起逻辑是很重要的逻辑,这里拿出来单独分析一下,首先要注意目前为止,我们只是根据当前线程,节点类型创建了一个节点并加入队列中,其他属性都是默认值

//首先说明一下参数,node是当前线程的节点,pred是它的前置节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取前置节点的waitStatus
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        //如果前置节点的waitStatus是Node.SIGNAL则返回true,然后会执行parkAndCheckInterrupt()方法进行挂起
        return true;
    if (ws > 0) {
        //由waitStatus的几个取值可以判断这里表示前置节点被取消
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        //这里我们由当前节点的前置节点开始,一直向前找最近的一个没有被取消的节点
        //注,由于头结点head是通过new Node()创建,它的waitStatus为0,因此这里不会出现空指针问题,也就是说最多就是找到头节点上面的循环就退出了
        pred.next = node;
    } else {
        //根据waitStatus的取值限定,这里waitStatus的值只能是0或者PROPAGATE,那么我们把前置节点的waitStatus设为Node.SIGNAL然后重新进入该方法进行判断
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

上面这个方法逻辑比较复杂,它是用来判断当前节点是否可以被挂起,也就是唤醒条件是否已经具备,即如果挂起了,那一定是可以由其他线程来唤醒的。该方法如果返回false,即挂起条件没有完备,那就会重新执行acquireQueued方法的循环体,进行重新判断,如果返回true,那就表示万事俱备,可以挂起了,就会进入parkAndCheckInterrupt()方法看下源码:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    //被唤醒之后,返回中断标记,即如果是正常唤醒则返回false,如果是由于中断醒来,就返回true
    return Thread.interrupted();
}

看acquireQueued方法中的源码,如果是因为中断醒来,那么就把中断标记置为true。不管是正常被唤醒还是由与中断醒来,都会去尝试获取锁资源。如果成功则返回中断标记,否则继续挂起等待。

注:Thread.interrupted()方法在返回中断标记的同时会清除中断标记,也就是说当由于中断醒来然后获取锁成功,那么整个acquireQueued方法就会返回true表示是因为中断醒来,但如果中断醒来以后没有获取到锁,继续挂起,由于这次的中断已经被清除了,下次如果是被正常唤醒,那么acquireQueued方法就会返回false,表示没有中断。

最后我们回到acquireQueued方法的最后一步,finally模块。这里是针对锁资源获取失败以后做的一些善后工作,翻看上面的代码,其实能进入这里的就是tryAcquire()方法抛出异常,也就是说AQS框架针对开发人员自己实现的获取锁操作如果抛出异常,也做了妥善的处理,一起来看下源码:

//传入的方法参数是当前获取锁资源失败的节点
private void cancelAcquire(Node node) {
    // 如果节点不存在则直接忽略
    if (node == null)
        return;
    node.thread = null;
    // 跳过所有已经取消的前置节点,跟上面的那段跳转逻辑类似
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    //这个是前置节点的后继节点,由于上面可能的跳节点的操作,所以这里可不一定就是当前节点,仔细想一下。^_^
    Node predNext = pred.next;
    //把当前节点waitStatus置为取消,这样别的节点在处理时就会跳过该节点
    node.waitStatus = Node.CANCELLED;
    //如果当前是尾节点,则直接删除,即出队
    //注:这里不用关心CAS失败,因为即使并发导致失败,该节点也已经被成功删除
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                        (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                //这里的判断逻辑很绕,具体就是如果当前节点的前置节点不是头节点且它后面的节点等待它唤醒(waitStatus小于0),
                //再加上如果当前节点的后继节点没有被取消就把前置节点跟后置节点进行连接,相当于删除了当前节点
                compareAndSetNext(pred, predNext, next);
        } else {
            //进入这里,要么当前节点的前置节点是头结点,要么前置节点的waitStatus是PROPAGATE,直接唤醒当前节点的后继节点
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

上面就是独占模式获取锁的核心源码,确实非常难懂,很绕,就这几个方法需要反反复复看很多遍,才能慢慢理解。

接下来看下释放锁的过程:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease()方法是用户自定义的释放锁逻辑,如果成功,就判断等待队列中有没有需要被唤醒的节点(waitStatus为0表示没有需要被唤醒的节点),一起看下唤醒操作:

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        //把标记为设置为0,表示唤醒操作已经开始进行,提高并发环境下性能
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    //如果当前节点的后继节点为null,或者已经被取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        //注意这个循环没有break,也就是说它是从后往前找,一直找到离当前节点最近的一个等待唤醒的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //执行唤醒操作
    if (s != null)
        LockSupport.unpark(s.thread);
}

相比而言,锁的释放操作就简单很多了,代码也比较少。

总结

以上就是AQS独占锁的获取与释放过程,大致思想很简单,就是尝试去获取锁,如果失败就加入一个队列中挂起。释放锁时,如果队列中有等待的线程就进行唤醒。但如果一步一步看源码,会发现细节非常多,很多地方很难搞明白,我自己也是反反复复学习很久才有点心得,但也不敢说已经研究通了AQS,甚至不敢说我上面的研究成果就是对的,只是写篇文章总结一下,跟同行交流交流心得。

除了独占锁,后面还会产出AQS一系列的文章,包括共享锁,条件队列的实现原理等。

三、共享锁

执行过程概述

获取锁的过程:

  1. 当线程调用acquireShared()申请获取锁资源时,如果成功,则进入临界区。
  2. 当获取锁失败时,则创建一个共享类型的节点并进入一个FIFO等待队列,然后被挂起等待唤醒。
  3. 当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待。

释放锁过程:

当线程调用releaseShared()进行锁资源释放时,如果释放成功,则唤醒队列中等待的节点,如果有的话。

源码深入分析

基于上面所说的共享锁执行流程,我们接下来看下源码实现逻辑:

首先来看下获取锁的方法acquireShared(),如下

public final void acquireShared(int arg) {
    //尝试获取共享锁,返回值小于0表示获取失败
    if (tryAcquireShared(arg) < 0)
        //执行获取锁失败以后的方法
        doAcquireShared(arg);
}

这里tryAcquireShared()方法是留给用户去实现具体的获取锁逻辑的。关于该方法的实现有两点需要特别说明:

一、该方法必须自己检查当前上下文是否支持获取共享锁,如果支持再进行获取。

二、该方法返回值是个重点。其一、由上面的源码片段可以看出返回值小于0表示获取锁失败,需要进入等待队列。其二、如果返回值等于0表示当前线程获取共享锁成功,但它后续的线程是无法继续获取的,也就是不需要把它后面等待的节点唤醒。最后、如果返回值大于0,表示当前线程获取共享锁成功且它后续等待的节点也有可能继续获取共享锁成功,也就是说此时需要把后续节点唤醒让它们去尝试获取共享锁。

有了上面的约定,我们再来看下doAcquireShared方法的实现:

//参数不多说,就是传给acquireShared()的参数
private void doAcquireShared(int arg) {
    //添加等待节点的方法跟独占锁一样,唯一区别就是节点类型变为了共享型,不再赘述
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            //表示前面的节点已经获取到锁,自己会尝试获取锁
            if (p == head) {
                int r = tryAcquireShared(arg);
                //注意上面说的, 等于0表示不用唤醒后继节点,大于0需要
                if (r >= 0) {
                    //这里是重点,获取到锁以后的唤醒操作,后面详细说
                    setHeadAndPropagate(node, r);
                    p.next = null;
                    //如果是因为中断醒来则设置中断标记位
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            //挂起逻辑跟独占锁一样,不再赘述
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //获取失败的取消逻辑跟独占锁一样,不再赘述
        if (failed)
            cancelAcquire(node);
    }
}

独占锁模式获取成功以后设置头结点然后返回中断状态,结束流程。而共享锁模式获取成功以后,调用了setHeadAndPropagate方法,从方法名就可以看出除了设置新的头结点以外还有一个传递动作,一起看下代码:

//两个入参,一个是当前成功获取共享锁的节点,一个就是tryAcquireShared方法的返回值,注意上面说的,它可能大于0也可能等于0
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; //记录当前头节点
    //设置新的头节点,即把当前获取到锁的节点设置为头节点
    //注:这里是获取到锁之后的操作,不需要并发控制
    setHead(node);
    //这里意思有两种情况是需要执行唤醒操作
    //1.propagate > 0 表示调用方指明了后继节点需要被唤醒
    //2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        //如果当前节点的后继节点是共享类型获取没有后继节点,则进行唤醒
        //这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒
        if (s == null || s.isShared())
            //后面详细说
            doReleaseShared();
    }
}
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

最终的唤醒操作也很复杂,专门拿出来分析一下:

注:这个唤醒操作在releaseShare()方法里也会调用。

private void doReleaseShared() {
    for (;;) {
        //唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了
        //其实就是唤醒上面新获取到共享锁的节点的后继节点
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //表示后继节点需要被唤醒
            if (ws == Node.SIGNAL) {
                //这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                //执行唤醒操作 
                unparkSuccessor(h);
            }
            //如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
            else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        //如果头结点没有发生变化,表示设置完成,退出循环
        //如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
        if (h == head)
            break;
    }
}

接下来看下释放共享锁的过程:

public final boolean releaseShared(int arg) {
    //尝试释放共享锁
    if (tryReleaseShared(arg)) {
        //唤醒过程,详情见上面分析
        doReleaseShared();
        return true;
    }
    return false;
}

注:上面的setHeadAndPropagate()方法表示等待队列中的线程成功获取到共享锁,这时候它需要唤醒它后面的共享节点(如果有),但是当通过releaseShared()方法去释放一个共享锁的时候,接下来等待独占锁跟共享锁的线程都可以被唤醒进行尝试获取。

总结

跟独占锁相比,共享锁的主要特征在于当一个在等待队列中的共享节点成功获取到锁以后(它获取到的是共享锁),既然是共享,那它必须要依次唤醒后面所有可以跟它一起共享当前锁资源的节点,毫无疑问,这些节点必须也是在等待共享锁(这是大前提,如果等待的是独占锁,那前面已经有一个共享节点获取锁了,它肯定是获取不到的)。当共享锁被释放的时候,可以用读写锁为例进行思考,当一个读锁被释放,此时不论是读锁还是写锁都是可以竞争资源的。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

IDEA常用配置 Previous
线程池介绍 Next