参考:https://www.cnblogs.com/fengzheng/p/9153720.html
CountDownLatch的使用
作用:当一个或多个线程需要另外一个或多个线程完成后,再开始执行,比如主线程需要等待一个子线程完成环境相关配置的加载工作,主线程才继续执行,就可以利用CountDownLatch来实现。
例如下面这个例子,首先实例化一个CountDownLatch,参数可以理解为一个计数器,这里为1,然后主线程执行,调用worker子线程,接着调用CountDownLatch的await()方法,表示阻塞主线程,当子线程执行完后,finally块调用countDown()方法,表示一个等待已经完成,把计数器减一,直到减为0,主线程又开始执行。
public class CountDownLatchTest {
private static CountDownLatch latch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException {
System.out.println("主线程开始。。。");
Thread thread = new Thread(new Worker());
thread.start();
System.out.println("主线程等待。。。");
System.out.println(latch.toString());
latch.await();
System.out.println(latch.toString());
System.out.println("主线程继续。。。");
}
public static class Worker implements Runnable{
@Override
public void run() {
System.out.println("子线程任务正在执行。。。");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
finally {
latch.countDown();
}
}
}
}
执行结果:
主线程开始。。。
主线程等待。。。
java.util.concurrent.CountDownLatch@74a14482[Count = 1]
子线程任务正在执行。。。
java.util.concurrent.CountDownLatch@74a14482[Count = 0]
主线程继续。。。
AQS的原理
AQS 全称 AbstractQueuedSynchronizer
,是java.util.concurrent
中提供的一种高效且可扩展的同步机制。它可以用来实现可以依赖 int 状态的同步器,获取和释放参数以及一个内部FIFO等待队列,除了CountDownLatch
,ReentrantLock
、Semaphore
等功能实现都使用了它。
接下来用 CountDownLatch 来分析一下 AQS 的实现。建议看文章的时候先大致看一下源码,有助于理解下面所说的内容。
在我们的方法中调用 awit()
和countDown()
的时候,发生了几个关键的调用关系,我画了一个方法调用图。
首先在CountDownLatch类内部定义了一个Sync
内部类,这个内部类继承自AbstractQueuedSynchronizer,并且重写了方法tryAcquireShared
和tryReleaseShared
,例如当调用await()方法时,CountDownLatch会调用内部类Sync的acquireQueuedInterruptibly()
方法,然后在这个方法中会调用tryAcquireShared方法,这个方法就是CountDownLatch的内部类Sync里重写的AbstractQueuedSynchronizer的方法,调用countDown()方法同理。
这种方法是使用AbstractQueuedSynchronizer的标准化方法,大致分为两步:
- 内部持有继承自AbstractQueuedSynchronizer的对象Sync;
- 并在Sync内重写AbstractQueuedSynchronizer protected的部分或全部方法,这些方法包括如下几个:之所以要求子类重写这些方法,是为了让使用者(这里的使用者指 CountDownLatch 等)可以在其中加入自己的判断逻辑,例如 CountDownLatch 在
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
tryAcquireShared
中加入了判断,判断 state 是否不为0,如果不为0,才符合调用条件。
tryAcquire
和tryRelease
是对应的,前者是独占模式获取,后者是独占模式释放。
tryAcquireShared
和tryReleaseShared
是对应的,前者是共享模式获取,后者是共享模式释放。
我们看到 CountDownLatch 重写的方法 tryAcquireShared 实现如下:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
判断 state 值是否为0,为0 返回1,否则返回 -1。state 值是 AbstractQueuedSynchronizer 类中的一个 volatile 变量。
private volatile int state;
在 CountDownLatch 中这个 state 值就是计数器,在调用 await 方法的时候,将值赋给 state 。
等待线程入队
根据上面的逻辑,调用 await() 方法时,先去获取 state 的值,当计数器不为0的时候,说明还有需要等待的线程在运行,则调用doAcquireSharedInterruptibly
方法,进来执行的第一个动作就是尝试加入等待队列 ,即调用 addWaiter()
方法。
到这里就走到了 AQS 的核心部分,AQS 用内部的一个Node
类维护一个 CHL Node FIFO
队列。将当前线程加入等待队列,并通过 parkAndCheckInterrupt()
方法实现当前线程的阻塞。下面一大部分都是在说明 CHL 队列的实现,里面用 CAS 实现队列出入不会发生阻塞。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
//加入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
// 进入 CAS 循环
try {
for (;;) {
//当一个节点(关联一个线程)进入等待队列后, 获取此节点的 prev 节点
final Node p = node.predecessor();
// 如果获取到的 prev 是 head,也就是队列中第一个等待线程
if (p == head) {
// 再次尝试申请 反应到 CountDownLatch 就是查看是否还有线程需要等待(state是否为0)
int r = tryAcquireShared(arg);
// 如果 r >=0 说明没有线程需要等待了 state==0
if (r >= 0) {
//尝试将第一个线程关联的节点设置为 head
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//经过自旋tryAcquireShared后,state还不为0,就会到这里,第一次的时候,waitStatus是0,那么node的waitStatus就会被置为SIGNAL,第二次再走到这里,就会用LockSupport的park方法把当前线程阻塞住
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
addWaiter()方法,就是将当前线程加入等待队列,源码如下:
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速入队操作,因为大多数时候尾节点不为 null
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果尾节点为空(也就是队列为空) 或者尝试CAS入队失败(由于并发原因),进入enq方法
enq(node);
return node;
}
上面是向等待队列中添加等待者(waiter)的方法,首先构造一个Node实体,参数为当前线程和一个mode,这个mode有两种形式,一个是SHARED,一个是EXCLUSIVE,然后执行下面的入队操作 addWaiter,和 enq() 方法的 else 分支操作是一样的,这里的操作如果成功了,就不用再进到 enq() 方法的循环中去了,可以提高性能。如果没有成功,再调用 enq() 方法。
private Node enq(final Node node) {
// 死循环+CAS保证所有节点都入队
for (;;) {
Node t = tail;
// 如果队列为空 设置一个空节点作为 head
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//加入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
说明:循环加 CAS 操作是实现乐观锁的标准方式,CAS 是为了实现原子操作而出现的,所谓的原子操作指操作执行期间,不会受其他线程的干扰。Java 实现的 CAS 是调用 unsafe 类提供的方法,底层是调用 c++ 方法,直接操作内存,在 cpu 层面加锁,直接对内存进行操作。
上面是AQS等待队列入队方法,操作在无限循环中进行,如果入队成功则返回新的队尾节点,否则一直自旋,直到入队成功,假设入队的节点为node,上来直接进入循环,在循环中,先拿到尾节点。
- if分支:如果尾节点是null,说明现在队列中还没有等待线程,则尝试CAS操作将头节点初始化,然后将尾节点也设置为头节点,因为初始化的时候头尾是同一个,这和AQS的设计有关,AQS默认要有一个虚拟节点,此时尾节点不再为空,循环继续,进入else分支;
- else分支:如果尾节点不为null,node.prev = t,也就是将当前尾节点设置为待入队节点的前置节点,然后又是利用CAS操作,将待入队的节点设置为队列的尾节点,如果CAS返回false,表示未设置成功,继续循环设置,设置成功后将之前的尾节点(也就是倒数第二个节点)的next属性设置为当前尾节点,然后返回当前尾节点,退出循环。
setHeadAndPropagate
方法负责将自旋等待或被LockSupport阻塞的线程唤醒。
private void setHeadAndPropagate(Node node, int propagate) {
//备份现在的 head
Node h = head;
//抢到锁的线程被唤醒 将这个节点设置为head
setHead(node)
// propagate 一般都会大于0 或者存在可被唤醒的线程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 只有一个节点 或者是共享模式 释放所有等待线程 各自尝试抢占锁
if (s == null || s.isShared())
doReleaseShared();
}
}
Node对象中有一个属性是waitStatus,它有四种状态,分别是:
//线程已被 cancelled ,这种状态的节点将会被忽略,并移出队列
static final int CANCELLED = 1;
// 表示当前线程已被挂起,并且后继节点可以尝试抢占锁
static final int SIGNAL = -1;
//线程正在等待某些条件
static final int CONDITION = -2;
//共享模式下 无条件所有等待线程尝试抢占锁
static final int PROPAGATE = -3;
等待线程被唤醒
当执行CountDownLatch的countDown()方法,将计数器减一,也就是state减一,当减到0的时候,等待队列中的线程被释放,是调用AQS的releaseShared方法来实现的。
// AQS类
public final boolean releaseShared(int arg) {
// arg 为固定值 1
// 如果计数器state 为0 返回true,前提是调用 countDown() 之前不能已经为0
if (tryReleaseShared(arg)) {
// 唤醒等待队列的线程
doReleaseShared();
return true;
}
return false;
}
// CountDownLatch 重写的方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 依然是循环+CAS配合 实现计数器减1
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
/// AQS类
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果节点状态为SIGNAL,则他的next节点也可以尝试被唤醒
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 将节点状态设置为PROPAGATE,表示要向下传播,依次唤醒
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
因为是共享型的,当计数器为0后,会唤醒等待队列中的所有线程,所有调用了await()方法的线程都被唤醒,并发执行,这种情况对应到的场景是:有多个线程都需等待一些动作完成,比如一个线程完成初始化动作,其他五个线程都需要用到初始化的结果,那么在初始化线程调用countDown之前,其他5个线程都处在等待状态,一旦初始化线程调用了countDown,其他5个线程都被唤醒,开始执行。
总结
1、AQS 分为独占模式和共享模式,CountDownLatch 使用了它的共享模式。
2、AQS 当第一个等待线程(被包装为 Node)要入队的时候,要保证存在一个 head 节点,这个 head 节点不关联线程,也就是一个虚节点。
3、当队列中的等待节点(关联线程的,非 head 节点)抢到锁,将这个节点设置为 head 节点。
4、第一次自旋抢锁失败后,waitStatus 会被设置为 -1(SIGNAL),第二次再失败,就会被 LockSupport 阻塞挂起。
5、如果一个节点的前置节点为 SIGNAL 状态,则这个节点可以尝试抢占锁。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!