Java中的线程即是工作单元也是执行机制,从JDK 5后,工作单元与执行机制被分离。工作单元包括Runnable和Callable,执行机制由JDK 5中增加的java.util.concurrent包中Executor框架提供。
1、Executor 框架简介
1.1、Executor 框架的两级调度模型
在 HotSpot VM 的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也被回收。操作系统会调度所有线程并将它们分配给可用的CPU。
在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定的数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。这两级的调度模型的示意图如下所示:
从图中可以看出,应用程序通过 Executor 框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。
1.2、Executor 框架的结构与成员
1.2.1、Executor 框架的结构
Executor 框架主要由3大部分组成如下:
1、任务:包括被执行任务需要实现的接口:Runnable 接口或 Callable 接口;
2、任务的执行:包括任务执行机制的核心接口Executor,以及继承自 Executor 的 ExecutorService 接口。Executor 框架有两个关键类实现了 ExecutorService 接口,即:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor;
3、异步计算的结果:包括接口 Future 和实现 Future 接口的 FutureTask 类。
Executor 框架包含的主要类和接口如下图所示:
1、Executor:是一个接口,它是 Executor 框架的基础,它将任务的提交与任务的执行分离开来;
2、ThreadPoolExecutor:是线程池的核心实现类,用来执行被提交的任务;
3、ScheduledThreadPoolExecutor:是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor 比 Timer 更加灵活,功能更强大;
4、Future 接口和实现 Future 接口的 FutureTask 类,代表异步计算的结果;
5、Runnable 接口和 Callable 接口的实现类,都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。
Executor 框架的使用示意图如下图所示:
- 主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task) 或 Executors.callable(Runnable task, Object result))。
- 然后可以把Runnbale对象直接交给 ExecutorService 执行(ExecutorService.execute(Runnable command));或者也可以把Runnable或者Callable对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task) 或 ExcutorService.submit(Callable
task))。 - 如果执行ExecutorService.submit( … ),ExecutorService 将返回一个实现 Future 接口的对象。由于 FutureTask 实现了 Runnable,程序员也可以创建 FutureTask,然后直接交给 ExecutorService 执行。
- 最后,主线程可以执行 FutureTask.get() 方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning) 来取消此任务的执行。
1.2.2、Executor 框架的成员
Executor 框架的主要成员包括:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口和Executors。
1、ThreadPoolExecutor
ThreadPoolExecutor 通常使用工厂类 Executors 来创建。Executors 可以创建3种类型的 ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool 和 CachedThreadPool。
2、ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 通常使用工厂类 Executors 来创建。Executors可以创建2种类型的 ScheduledThreadPoolExecutor,如下:
- ScheduledThreadPoolExecutor:包含若干个线程的ScheduledThreadPoolExecutor;
- SingleThreadScheduledExecutor:包含一个线程的ScheduledThreadPoolExecutor。
3、Future接口
Future 接口和实现 Future 接口的 FutureTask 类用来表示异步计算的结果。当我们把 Runnable 接口或 Callable 接口的实现类提交(submit)给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 时,ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 会向我们返回一个 FutureTask 对象。
4、Runnable 接口和 Callable 接口
Runnable 接口和 Callable 接口的实现类,都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。它们之间的区别是 Runnable 不会返回结果,而 Callable 可以返回结果。
2、ThreadPoolExecutor 详解
其实关于 ThreadPoolExecutor 在上一篇文章:Java中的线程池详解已经进行了源码讲解,这里仅介绍下它的3种类型:SingleThreadExecutor、FixedThreadPool 和 CachedThreadPool。
2.1、FixedThreadPool
创建固定长度的线程池,每次提交任务创建一个线程,直到达到线程池的最大数量,线程池的大小不再变化。
这个线程池可以创建固定线程数的线程池。特点就是可以重用固定数量线程的线程池。它的构造源码如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads,nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool的corePoolSize和maxiumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads;
0L则表示当线程池中的线程数量操作核心线程的数量时,多余的线程将被立即停止;
最后一个参数表示FixedThreadPool使用了无界队列LinkedBlockingQueue作为线程池的做工队列,由于是无界的,当线程池的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池的线程数量不会超过corePoolSize,同时maxiumPoolSize也就变成了一个无效的参数,并且运行中的线程池并不会拒绝任务。
FixedThreadPool运行图如下:
执行过程如下:
1.如果当前工作中的线程数量少于corePool的数量,就创建新的线程来执行任务。
2.当线程池的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue。
3.线程执行完1中的任务后会从队列中去任务。
注意:LinkedBlockingQueue是无界队列,所以可以一直添加新任务到线程池。
2.2、SingleThreadExecutor
SingleThreadExecutor是使用单个worker线程的Executor。特点是使用单个工作线程执行任务。它的构造源码如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1,1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor 的 corePoolSize 和 maxiumPoolSize 都被设置1。其他参数均与 FixedThreadPool 相同,其运行图如下:
执行过程如下:
1.如果当前工作中的线程数量少于corePool的数量,就创建一个新的线程来执行任务。
2.当线程池的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue。
3.线程执行完1中的任务后会从队列中去任务。
注意:由于在线程池中只有一个工作线程,所以任务可以按照添加顺序执行。
2. 3、CachedThreadPool
CachedThreadPool是一个”无限“容量的线程池,它会根据需要创建新线程。特点是可以根据需要来创建新的线程执行任务,没有特定的corePool。下面是它的构造方法:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即maximum是无界的。这里keepAliveTime设置为60秒,意味着空闲的线程最多可以等待任务60秒,否则将被回收。
CachedThreadPool使用没有容量的SynchronousQueue作为主线程池的工作队列,它是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作。这意味着,如果主线程提交任务的速度高于线程池中处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU资源。其运行图如下:
执行过程如下:
1.首先执行SynchronousQueue.offer(Runnable task)。如果在当前的线程池中有空闲的线程正在执行SynchronousQueue.poll(),那么主线程执行的offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行。,execute()方法执行成功,否则执行步骤2;
2.当线程池为空(初始maximumPool为空)或没有空闲线程时,配对失败,将没有线程执行SynchronousQueue.poll操作。这种情况下,线程池会创建一个新的线程执行任务;
3.在创建完新的线程以后,将会执行poll操作。当步骤2的线程执行完成后,将等待60秒,如果此时主线程提交了一个新任务,那么这个空闲线程将执行新任务,否则被回收。因此长时间不提交任务的CachedThreadPool不会占用系统资源。
SynchronousQueue是一个不存储元素阻塞队列,每次要进行offer操作时必须等待poll操作,否则不能继续添加元素。
2.4、具体应用案例
1、newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:
public class Demo1 {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for(int i = 0; i < 10; i++){
final int index = i;
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable(){
@Override
public void run() {
System.out.println(index);
}
});
}
}
}
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
2、newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:
public class Demo2 {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable(){
@Override
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()。可参考PreloadDataCache。
3、newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:
public class Demo3 {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable(){
@Override
public void run() {
System.out.println("延迟3秒");
}
}, 3, TimeUnit.SECONDS);
}
}
表示延迟3秒执行。定期执行示例代码如下:
public class Demo4 {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(new Runnable(){
@Override
public void run() {
System.out.println("延迟1秒,每3秒执行1次");
}
}, 1, 3, TimeUnit.SECONDS);
}
}
表示延迟1秒后每3秒执行一次。ScheduledExecutorService比Timer更安全,功能更强大,后面会有一篇单独进行对比。
4、newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:
public class Demo5 {
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable(){
@Override
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
结果依次输出,相当于顺序执行各个任务。现行大多数GUI程序都是单线程的。Android中单线程可用于数据库操作,文件操作,应用批量安装,应用批量删除等不适合并发但可能IO阻塞性及影响UI线程响应的操作。
3、ScheduledThreadPoolExecutor 详解
我们知道Timer与TimerTask虽然可以实现线程的周期和延迟调度,但是Timer与TimerTask存在一些缺陷,所以对于这种定期、周期执行任务的调度策略,我们一般都是推荐ScheduledThreadPoolExecutor来实现。下面就深入分析ScheduledThreadPoolExecutor是如何来实现线程的周期、延迟调度的。
ScheduledThreadPoolExecutor,继承ThreadPoolExecutor且实现了ScheduledExecutorService接口,它就相当于提供了“延迟”和“周期执行”功能的ThreadPoolExecutor。在JDK API中是这样定义它的:ScheduledThreadPoolExecutor,它可另行安排在给定的延迟后运行命令,或者定期执行命令。需要多个辅助线程时,或者要求 ScheduledThreadPoolExecutor具有额外的灵活性或功能时,此类要优于 Timer。 一旦启用已延迟的任务就执行它,但是有关何时启用,启用后何时执行则没有任何实时保证。按照提交的先进先出 (FIFO) 顺序来启用那些被安排在同一执行时间的任务。
3.1、ScheduledThreadPoolExecutor 的运行机制
ScheduledThreadPoolExecutor 的执行示意图如下图所示:
DelayQueue 是一个无界队列,所以 ThreadPoolExecutor 的 maximumPoolSize 在 ScheduledThreadPoolExecutor 中没有什么意义。
ScheduledThreadPoolExecutor 的执行主要分为两大部分:
1、当调用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者 scheduledWithFixedDelay() 方法时,会向 ScheduledThreadPoolExecutor 的 DelayQueue 添加一个实现了 RunnableScheduledFuture 接口的 ScheduledFutureTask。
2、线程池中的线程从 DelayQueue 中获取 ScheduledFutureTask,然后执行任务。
ScheduledThreadPoolExecutor 为了实现周期性的执行任务,对 ThreadPoolExecutor 做了如下的修改:
1、使用 DelayQueue 作为任务队列;
2、获取任务的方式不同(后文会讲解到);
3、执行周期任务后,增加了额外的处理(后文会讲解到)。
3.2、ScheduledThreadPoolExecutor 的实现
先来看下ScheduledThreadPoolExecutor类中的主要结构:
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
implements ScheduledExecutorService {
private static final AtomicLong sequencer = new AtomicLong(0);
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
...
}
private void delayedExecute(RunnableScheduledFuture<?> task) { ... }
void reExecutePeriodic(RunnableScheduledFuture<?> task) { ... }
protected <V> RunnableScheduledFuture<V> decorateTask(...){...}
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {...}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {...}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {...}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {...}
public void execute(Runnable command) {...}
public Future<?> submit(Runnable task) {...}
public <T> Future<T> submit(Runnable task, T result) {...}
public <T> Future<T> submit(Callable<T> task) {...}
public void shutdown() {...}
public List<Runnable> shutdownNow() {...}
public BlockingQueue<Runnable> getQueue() {...}
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {...}
//......
}
ScheduledThreadPoolExecutor 会把调度的任务(ScheduledFutureTask)放到一个DelayQueue中。下面来看下ScheduledFutureTask主要包含的3个成员变量:
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
private final long sequenceNumber;
private long time;
private final long period;
...
}
1、long型成员变量time:表示这个任务将要执行的具体时间;
2、long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号;
3、long型成员变量period,表示任务执行的间隔周期。
DelayQueue 封装了一个 PriorityQueue,这个 PriorityQueue 会对队列中的 ScheduledEutureTask 进行排序。排序时,time 小的排在前面(时间早的任务将被先执行)。如果两个 ScheduledFutureTask 的 time 相同,就比较 sequenceNumber,sequenceNumber 小的排在前面,也就是说,如果两个任务的执行时间相同,那么先执行提交早的任务。
下图所示的是:ScheduledThreadPoolExecutor 中的线程1执行某个周期任务的4个步骤:
1、线程1从 DelayQueue 中获取已到期的 ScheduledFutureTask(DealyQueue.take())。到其任务是指 ScheduledFutureTask 的 time 大于等于当前时间;
2、线程1执行这个 ScheduledFutureTask;
3、线程1修改 ScheduledFutureTask 的 time 变量为下次将要被执行的时间;
4、线程1把这个修改 time 之后的 ScheduledFutureTask 放回 DelayQueue 中(DelayQueue.add())。
下面就看下DelayQueue.take()方法的源代码实现:【在源代码中:DelayQueue 就是 DelayedWorkQueue】
static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {
public RunnableScheduledFuture take() throws InterruptedException {
// 获取lock
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture first = queue[0]; // 获取任务
if (first == null)
available.await(); // 如果队列为空,则等待
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
else if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
}
DelayQueue.take()的执行示意图如下所示:
如上图所示的过程,大致可以分为3个步骤:
1、获取Lock;
2、获取周期任务;
- 2.1、如果 PriorityQueue 为空,当前线程到 Condition 中等待,否则执行下面的2.2;
- 2.2、如果 PriorityQueue 的头元素的 time 时间比当前时间大,到 Condition 中等待到 time 时间,否则执行2.3;
- 2.3、获取 PriorityQueue 的头元素,如果 PriorityQueue 不为空,则唤醒在 Condition 中等待的所有线程。
3、释放Lock。
ScheduledThreadFutureTask 在一个循环中执行步骤2,直到线程从 PriorityQueue 获取到一个元素之后,才会退出无限循环。
下面看下 ScheduledThreadFutureTask 中的线程把 ScheduledFutureTask 放入 DelayQueue 中的过程。下面是 DelayQueue.add() 的源代码实现:
static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture e = (RunnableScheduledFuture)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
......
}
下图是 DelayQueue.add() 的执行示意图:
如上图所示,添加任务分为3大步骤:
1、获取 Lock;
2、添加任务;
- 2.1、向 PriorityQueue 添加任务;
- 2.2、如果在上面2.1 中添加的任务是 PriorityQueue 的头元素,则唤醒在 Conditon 中等待的所有线程;
3、释放 Lock。
4、FutureTask 详解
Future 接口和实现 Future 接口的 FutureTask 类,代表异步计算的结果。
4.1、FutureTask 简介
FutureTask 除了实现了 Future 接口外,还实现了 Runnable接口。那么我们就先看下这两个接口的内部结构。
Future 接口的内部结构
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Runnable 接口的内部结构
public interface Runnable {
public abstract void run();
}
1、未启动:FutureTask.run()方法还没有被执行之前,FutureTask 处于未启动状态。当创建一个 FutureTask,且没有执行 FutureTask.run() 方法之前,这个 FutureTask 处于未启动状态;
2、已启动:FutureTask.run()方法被执行的过程中,FutureTask 处于已启动状态;
3、已完成:FutureTask.run()方法执行完成后正常结束,或被取消(FutureTask.cancel(…)),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask 处于已完成状态。
FutureTask 的状态迁移的示意图如下所示:
FutureTask 的 get 和 cancel 的执行示意图如下所示:
4.2、FutureTask 的实现
先看下 FutureTask 的内部结构:
public class FutureTask<V> implements RunnableFuture<V> {
private final Sync sync;
// 构造函数1 Callable
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
sync = new Sync(callable);
}
// 构造函数2 Runnable
public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}
// 调用的是sync中的innerCancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
return sync.innerCancel(mayInterruptIfRunning);
}
// 调用的是sync中的innerGet方法
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}
// 调用的是sync中的innerGet方法
public V get(long timeout, TimeUnit unit) {
throws InterruptedException, ExecutionException, TimeoutException {
return sync.innerGet(unit.toNanos(timeout));
}
// 调用的是sync中的innerRun方法
public void run () {
sync.innerRun();
}
private final class Sync extends AbstractQueuedSynchronizer {...
}
// .......
}
}
从 FutureTask 的源码中可以看出来,它的实现是基于 AbstractQueuedSynchronizer 。AQS 是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。基于 AQS 实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch 和 FutureTask。
每一个基于 AQS 实现的同步器都会包含两种类型的操作,如下:
1、至少一个 acquire 操作:这个操作阻塞调用线程,除非 / 直到 AQS 的状态允许这个线程继续执行。 FutureTask 的 acquire 操作为 get() / get(long timeout, TimeUnit unit)方法调用;
2、至少一个 release 操作:这个操作改变 AQS 的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask 的 release 操作包括 run() 方法和 cancel(…) 方法。
基于“复合优先继承”的原则,FutureTask 声明了一个内部私有的继承于 AQS 的子类 Sync,对 FutureTask 所有公有方法的调用都会委托给这个内部子类。
AQS 被作为“模板方法模式”的基础类提供给 FutureTask 的内部子类 Sync,这个内部子类只需要实现状态检测和状态更新的方法即可,这些方法将控制 FutureTask 的获取和释放操作。具体来说,Sync实现了 AQS 的 tryAcquireShared(int)方法和 tryReleaseShared(int)方法,Sync 通过这两个方法来检查和更新同步状态。
FutureTask 的设计示意图如下图所示:
如图所示,Sync 是 FutureTask 的内部私有类,它继承自 AQS。创建 FutureTask 时会创建内部私有的成员对象 Sync,FutureTask 所有的公有方法都直接委托给了内部私有的 Sync。
下面对 FutureTask 中主要的几个方法进行调用过程分析:
4.2.1、FutureTask.get() 方法
第1步:调用 FutureTask 中的 get() 方法
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}
从源码中很清楚的看到 get() 方法内部是由 sync 的 innerGet()方法实现的。
第2步:调用 Sync 中的 innerGet()方法
V innerGet() throws InterruptedException, ExecutionException {
acquireSharedInterruptibly(0);
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}
第3步:调用 AQS.acquireSharedInterruptibly(int args)方法。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
第4步:调用Sync.tryAcquireShared方法
第5步:调用 AQS.doAcquireSharedIntrruptibly方法
这个方法首先会在子类 Sync 中实现的 tryAcquireShared()方法来判断 acquire 操作是否可以成功,acquire 操作可以成功的条件为:state 为执行完成状态RAN 或取消状态 CANCELLED,且 runner 不为null。
【至于tryAcquireShared和doAcquireSharedIntrruptibly方法,这里不再做源码分析了,前面文章已经分析过多次了】
如果成功则立即返回,如果失败则到线程等待队列中去等待其他线程执行 release 操作。
当其他线程执行 release 操作(比如:FutureTask.run() 或 FutureTask.cancel(…))唤醒当前线程后,当前线程再次执行 tryAcquiredShared() 将返回正值 1,当前线程将离开线程等待队列,并唤醒它的后继节点线程。
最后返回计算的结果或者抛出异常。
4.2.2、FutureTask.run() 方法
第1步:调用了 FutureTask.run() 方法
public void run() {
sync.innerRun();
}
第2步:调用 Sync.innerRun() 方法
void innerRun() {
if (!compareAndSetState(READY, RUNNING))
return;
runner = Thread.currentThread();
if (getState() == RUNNING) { // recheck after setting thread
V result;
try {
result = callable.call();
} catch (Throwable ex) {
setException(ex);
return;
}
set(result);
} else {
releaseShared(0); // cancel
}
}
第3步:调用AQS.releaseShared(int args)方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
AQS.releaseShared(int args)首先会回调子类 Sync 中实现的 tryReleaseShared(int args)方法来执行 release操作。
第4步:调用 Sync.tryReleaseShared(int args) 方法
protected boolean tryReleaseShared(int ignore) {
runner = null;
return true;
}
设置允许任务线程 runner 为 null,然后返回 true。
第5步:调用AQS.doReleaseShared() 方法
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
唤醒线程等待队列中的第一个线程。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!