Java并发包(JUC)中提供了很多并发工具,比如:ReentrantLock,Semaphore,CountDownLatch,FutureTask等,它们的实现都用到了一个共同的基类-AbstractQueuedSynchronizer,简称AQS。AQS是一个用来构建锁和同步其他组件的基础框架,使用AQS能简单且高效地构建出应用广泛的同步器。
基本实现原理
AQS使用一个int成员变量state来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。
private volatile int state;//共享变量,使用volatile修饰保证内存可见性
状态信息通过protected类型的getState,setState,compareAndSetState进行操作。
AQS支持两种同步方式:
- 独占式
- 共享式
这样方便使用者实现不同类型的同步组件,独占式有ReentrantLock,共享式有Semaphore,CountDownLatch,组合式有ReentrantReadWriteLock,总之,AQS为使用者提供了底层支持,如何组装实现,使用者可以自由发挥。
同步器的设计是基于模板方法模式的,一般的使用方式如下:
- 使用者继承AbstractQueuedSynchronizer并重写指定的方法,这些重写方法就是对共享资源state的释放和获取;
- 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
我们来看看AQS定义的这些可重写的方法:
//独占式获取同步状态,试着获取,成功返回true,反之为false
protected boolean tryAcquire(int arg) ;
//独占式释放同步状态,等待中的其他线程此时将有机会获取到同步状态;
protected boolean tryRelease(int arg) ;
//共享式获取同步状态,返回值大于等于0,代表获取成功;反之获取失败;
protected int tryAcquireShared(int arg) ;
//共享式释放同步状态,成功为true,失败为false
protected boolean tryReleaseShared(int arg) ;
//是否在独占模式下被线程占用
protected boolean isHeldExclusively() ;
如何使用
首先,我们需要继承AbstractQueuedSynchronizer
这个类,然后根据我们的需求去重写相应的方法,比如要实现一个独占锁,那就去重写tryAcquire,tryRelease方法,要实现共享锁,就去重写tryAcquireShared,tryReleaseShared;最后,在我们的组件中调用AQS中的模板方法就可以了,而这些模板方法就会调用我们之前重写的那些方法,也就是说,我们只需要很小的工作量就可以实现自己的同步组件,重写的那些方法,仅仅是一些简单的对于共享资源state的获取和释放操作,至于像是获取资源失败,线程需要阻塞之类的操作,自然是AQS帮我们完成了。
设计思想
对于使用者来讲,我们无需关心获取资源失败,线程排队,线程阻塞/唤醒等一系列复杂的实现,这些都在AQS中为我们处理好了。我们只需要负责好自己的那个环节就好,也就是获取/释放共享资源state。AQS为我们定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现即可。
自定义同步器
同步器代码实现
public class Mutex implements Serializable {
/** 静态内部类,继承AQS */
private static class Sync extends AbstractQueuedSynchronizer{
/** 当状态为0的时候获取锁,CAS操作成功,则state状态为1 */
@Override
protected boolean tryAcquire(int acquires) {
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/** 释放锁,将同步状态置为0 */
@Override
protected boolean tryRelease(int arg) {
if(getState() == 0){
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/** 是否处于占用状态 */
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
/** 同步对象完成一系列复杂的操作,我们仅需指向它即可 */
private final Sync sync = new Sync();
/** 加锁操作,代理到acquire(模板方法)上就行,acquire会调用我们重写的tryAcquire方法 */
public void lock(){
sync.acquire(1);
}
public boolean tryLock(){
return sync.tryAcquire(1);
}
public void unlock(){
sync.release(1);
}
public boolean isLocked(){
return sync.isHeldExclusively();
}
}
同步器代码测试
public class MutexTest {
private static CyclicBarrier barrier = new CyclicBarrier(31);
private static int a = 0;
private static Mutex mutex = new Mutex();
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
//说明:我们启用30个线程,每个线程对i自加10000次,同步正常的话,最终结果应为30000;
//未加锁
for (int i = 0; i < 30; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 10000; j++) {
increment1();
}
try {
barrier.await();//等30个线程累加结束
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
t.start();
}
barrier.await();
System.out.println("未加锁:a="+a);
//加锁后
//重置CyclicBarrier
barrier.reset();
a = 0;
for (int i = 0; i < 30; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 10000; j++) {
increment2();
}
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
}
barrier.await();
System.out.println("加锁后:a="+a);
}
/** 使用自定义的Mutex进行同步处理的a++ */
private static void increment2() {
mutex.lock();
a++;
mutex.unlock();
}
private static void increment1() {
a++;
}
}
源码分析
AQS维护着一个共享资源state,通过内置的FIFO队列(CLH队列)来完成获取资源线程的排队工作,该队列由一个个的Node
节点组成,每个Node节点维护一个prev引用和next引用,分别指向自己的前驱和后继节点,AQS维护两个指针,分别指向队列头部head和尾部tail。
当线程获取资源失败(比如tryAcquire时试图设置state状态失败),会被构造成一个节点加入CLH队列中,同时当前线程会被阻塞在队列中(通过LockSupport.park实现,其实是等待态),当持有同步状态的线程释放同步状态时,会唤醒后继节点,然后此节点线程继续加入到对同步状态的争夺中。
Node节点
Node节点是AbstractQueuedSynchronizer中的一个静态内部类,其部分源码如下:
static final class Node {
/** waitStatus值,表示线程已被取消(等待超时或者被中断)*/
static final int CANCELLED = 1;
/** waitStatus值,表示后继线程需要被唤醒(unpaking)*/
static final int SIGNAL = -1;
/**waitStatus值,表示结点线程等待在condition上,当被signal后,会从等待队列转移到同步到队列中 */
static final int CONDITION = -2;
/** waitStatus值,表示下一次共享式同步状态会被无条件地传播下去
static final int PROPAGATE = -3;
/** 等待状态,初始为0 */
volatile int waitStatus;
/**当前结点的前驱结点 */
volatile Node prev;
/** 当前结点的后继结点 */
volatile Node next;
/** 与当前结点关联的排队中的线程 */
volatile Thread thread;
/** ...... */
}
独占式
获取同步状态:acquire()
lock方法一般会直接代理到acquire()上。
public final void acquire(int arg){
if(!tryAcquire(arg) &&
acuqireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
代码逻辑:
a. 首先,调用使用者重写的tryAcquire()方法,若返回true,意味着获取同步状态成功,后面的逻辑不再执行;若返回false,也就是获取同步状态失败,进入b步骤;
b. 此时,获取同步状态失败,构造独占式同步结点,通过addWatiter方法将此结点添加到同步队列的尾部(此时可能会有多个线程结点试图加入同步队列尾部,需要以线程安全的方式添加);
c. 该结点在队列中尝试获取同步状态,若获取不到,则阻塞结点线程,直到被前驱结点唤醒或者被中断。
addWaiter
为获取同步状态失败的线程,构造一个Node节点,添加到同步队列尾部。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//构造结点
//指向尾结点tail
Node pred = tail;
//如果尾结点不为空,CAS快速尝试在尾部添加,若CAS设置成功,返回;否则,enq。
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
先CAS快速设置,若失败,进入enq()
方法。
将节点添加到同步队列尾部这个操作,同时可能会有多个线程尝试添加到尾部,是非线程安全的操作,以上代码可以看出,使用了compareAndSetTail这个CAS操作保证安全添加至尾节点。
enq方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { //如果队列为空,创建结点,同时被head和tail引用
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {//cas设置尾结点,不成功就一直重试
t.next = node;
return t;
}
}
}
}
enq内部是个死循环,通过CAS设置尾节点,不成功就一直重试。
最后看下acquireQueued方法。
acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//死循环
final Node p = node.predecessor();//找到当前结点的前驱结点
if (p == head && tryAcquire(arg)) {//如果前驱结点是头结点,才tryAcquire,其他结点是没有机会tryAcquire的。
setHead(node);//获取同步状态成功,将当前结点设置为头结点。
p.next = null; // 方便GC
failed = false;
return interrupted;
}
// 如果没有获取到同步状态,通过shouldParkAfterFailedAcquire判断是否应该阻塞,parkAndCheckInterrupt用来阻塞线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued内部也是一个死循环,只有前驱节点是头节点的节点,也就是第二个节点才有机会tryAcquire;若tryAcquire成功,表示获取同步状态成功,将此节点设置为头节点,若不是第二个节点,或者tryAcquire失败,则进入shouldParkAfterFailedAccquire去判断当前线程是否应该阻塞,若可以,调用parkAndCheckInterrupt阻塞当前线程,直到被中断或者被前驱节点唤醒,若还不能,继续循环。
shouldParkAfterFailedAcquire
用来判断当前节点线程是否能休息
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前驱结点的wait值
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)//若前驱结点的状态是SIGNAL,意味着当前结点可以被安全地park
return true;
if (ws > 0) {
// ws>0,只有CANCEL状态ws才大于0。若前驱结点处于CANCEL状态,也就是此结点线程已经无效,从后往前遍历,找到一个非CANCEL状态的结点,将自己设置为它的后继结点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 若前驱结点为其他状态,将其设置为SIGNAL状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
若shouldParkAfterFailedAcquire返回true,也就是当前结点的前驱结点为SIGNAL状态,则意味着当前结点可以放心休息,进入parking状态了。parkAndCheckInterrupt阻塞线程并处理中断。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//使用LockSupport使线程进入阻塞状态
return Thread.interrupted();// 线程是否被中断过
}
至此,关于acquire的方法源码已经分析完毕,我们来简单总结下:
- 首先tryAcquire获取同步状态,成功则直接返回;否则,进入下一环节;
- 线程获取同步状态失败,就构造一个结点,加入同步队列中,这个过程要保证线程安全;
- 加入队列中的结点线程进入自旋状态,若是老二结点(即前驱结点为头结点),才有机会尝试去获取同步状态;否则,当其前驱结点的状态为SIGNAL,线程便可安心休息,进入阻塞状态,直到被中断或者被前驱结点唤醒。
释放同步状态:release()
当前线程执行完自己的逻辑之后,需要释放同步状态。
public final boolean release(int arg) {
if (tryRelease(arg)) {//调用使用者重写的tryRelease方法,若成功,唤醒其后继结点,失败则返回false
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒后继结点
return true;
}
return false;
}
UnparkSuccessor:唤醒后继节点
private void unparkSuccessor(Node node) {
//获取wait状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);// 将等待状态waitStatus设置为初始值0
Node s = node.next;//后继结点
//若后继结点为空,或状态为CANCEL(已失效),则从后尾部往前遍历找到一个处于正常阻塞状态的结点进行唤醒
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//使用LockSupprot唤醒结点对应的线程
}
release的同步状态相对简单,需要找到头节点的后继节点进行唤醒,若后继节点为空或处于CANNEL状态,从后往前遍历找寻一个正常的节点,唤醒其对应线程。
共享式
共享式地获取同步状态,对于独占式同步组件来说,同一时刻只有一个线程能够获取到同步状态,其他线程都得排队等待,而对于共享式同步组件来说,同一时刻可以有很多线程同时获取到同步状态,其待重写的尝试获取同步状态的方法为tryAcquireShared
。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
- 当返回值大于0时,表示获取同步状态成功,同时还有剩余同步状态可供其他线程获取;
- 当返回值等于0时,表示获取同步状态成功,但没有可用同步状态了;
- 当返回值小于0时,表示获取同步状态失败。
获取同步状态:acquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)//返回值小于0,获取同步状态失败,排队去;获取同步状态成功,直接返回去干自己的事儿。
doAcquireShared(arg);
}
doAcquireShared
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//构造一个共享结点,添加到同步队列尾部。若队列初始为空,先添加一个无意义的傀儡结点,再将新节点添加到队列尾部。
boolean failed = true;//是否获取成功
try {
boolean interrupted = false;//线程parking过程中是否被中断过
for (;;) {//死循环
final Node p = node.predecessor();//找到前驱结点
if (p == head) {//头结点持有同步状态,只有前驱是头结点,才有机会尝试获取同步状态
int r = tryAcquireShared(arg);//尝试获取同步装填
if (r >= 0) {//r>=0,获取成功
setHeadAndPropagate(node, r);//获取成功就将当前结点设置为头结点,若还有可用资源,传播下去,也就是继续唤醒后继结点
p.next = null; // 方便GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&//是否能安心进入parking状态
parkAndCheckInterrupt())//阻塞线程
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
大体逻辑与独占式的acquireQueued差距不大,只不过由于是共享式,会有多个线程同时获取到同步状态,所以当排队中的老二获取到同步状态,如果还有可用资源,会继续传播下去。
setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
释放同步状态:releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();//释放同步状态
return true;
}
return false;
}
doReleaseShared
private void doReleaseShared() {
for (;;) {//死循环,共享模式,持有同步状态的线程可能有多个,采用循环CAS保证线程安全
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);//唤醒后继结点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
共享模式下,释放同步状态也是多线程的,此处采用了CAS自旋来保证。
总结
AQS是JUC中很多同步组件的构建基础,简单来讲,它内部实现主要是状态变量state
和一个FIFO队列
来完成,同步队列的头节点是当前获取到同步状态的节点,获取同步状态state失败的线程,会被构造成一个节点(共享式或独占式)加入到同步队列尾部(采用自旋CAS来保证此操作的线程安全),随后线程会被阻塞;释放时唤醒头节点的后继节点,使其加入对同步状态的争夺中。
AQS为我们定义好了顶层的处理实现逻辑,我们在使用AQS构建符合我们需求的同步组件时,只需要重写tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared等几个方法,来决定同步状态的释放和获取即可,至于背后复杂的线程排队,线程阻塞/唤醒,如何保证线程安全,都由AQS为我们完成了,这也是非常典型的模板方法的应用。AQS定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!