Java中的线程即是工作单元也是执行机制,从JDK 5后,工作单元与执行机制被分离。工作单元包括Runnable和Callable,执行机制由JDK 5中增加的java.util.concurrent包中Executor框架提供。

1、Executor 框架简介

1.1、Executor 框架的两级调度模型

在 HotSpot VM 的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也被回收。操作系统会调度所有线程并将它们分配给可用的CPU。

在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定的数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。这两级的调度模型的示意图如下所示:

image-20210530202212035

从图中可以看出,应用程序通过 Executor 框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。

1.2、Executor 框架的结构与成员

1.2.1、Executor 框架的结构

Executor 框架主要由3大部分组成如下:

1、任务:包括被执行任务需要实现的接口:Runnable 接口或 Callable 接口;

2、任务的执行:包括任务执行机制的核心接口Executor,以及继承自 Executor 的 ExecutorService 接口。Executor 框架有两个关键类实现了 ExecutorService 接口,即:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor;

3、异步计算的结果:包括接口 Future 和实现 Future 接口的 FutureTask 类。

Executor 框架包含的主要类和接口如下图所示:

image-20210530202253095

1、Executor:是一个接口,它是 Executor 框架的基础,它将任务的提交与任务的执行分离开来;

2、ThreadPoolExecutor:是线程池的核心实现类,用来执行被提交的任务;

3、ScheduledThreadPoolExecutor:是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor 比 Timer 更加灵活,功能更强大;

4、Future 接口和实现 Future 接口的 FutureTask 类,代表异步计算的结果;

5、Runnable 接口和 Callable 接口的实现类,都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。

Executor 框架的使用示意图如下图所示:

image-20210530202322127

  • 主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task) 或 Executors.callable(Runnable task, Object result))。
  • 然后可以把Runnbale对象直接交给 ExecutorService 执行(ExecutorService.execute(Runnable command));或者也可以把Runnable或者Callable对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task) 或 ExcutorService.submit(Callable task))。
  • 如果执行ExecutorService.submit( … ),ExecutorService 将返回一个实现 Future 接口的对象。由于 FutureTask 实现了 Runnable,程序员也可以创建 FutureTask,然后直接交给 ExecutorService 执行。
  • 最后,主线程可以执行 FutureTask.get() 方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning) 来取消此任务的执行。

1.2.2、Executor 框架的成员

Executor 框架的主要成员包括:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口和Executors。

1、ThreadPoolExecutor

ThreadPoolExecutor 通常使用工厂类 Executors 来创建。Executors 可以创建3种类型的 ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool 和 CachedThreadPool。

2、ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 通常使用工厂类 Executors 来创建。Executors可以创建2种类型的 ScheduledThreadPoolExecutor,如下:

  • ScheduledThreadPoolExecutor:包含若干个线程的ScheduledThreadPoolExecutor;
  • SingleThreadScheduledExecutor:包含一个线程的ScheduledThreadPoolExecutor。

3、Future接口

Future 接口和实现 Future 接口的 FutureTask 类用来表示异步计算的结果。当我们把 Runnable 接口或 Callable 接口的实现类提交(submit)给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 时,ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 会向我们返回一个 FutureTask 对象。

4、Runnable 接口和 Callable 接口

Runnable 接口和 Callable 接口的实现类,都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。它们之间的区别是 Runnable 不会返回结果,而 Callable 可以返回结果。

2、ThreadPoolExecutor 详解

其实关于 ThreadPoolExecutor 在上一篇文章:Java中的线程池详解已经进行了源码讲解,这里仅介绍下它的3种类型:SingleThreadExecutor、FixedThreadPool 和 CachedThreadPool。

2.1、FixedThreadPool

创建固定长度的线程池,每次提交任务创建一个线程,直到达到线程池的最大数量,线程池的大小不再变化。

这个线程池可以创建固定线程数的线程池。特点就是可以重用固定数量线程的线程池。它的构造源码如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads,nThreads,
                                  0L,
                                  TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

FixedThreadPool的corePoolSize和maxiumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads;

0L则表示当线程池中的线程数量操作核心线程的数量时,多余的线程将被立即停止;

最后一个参数表示FixedThreadPool使用了无界队列LinkedBlockingQueue作为线程池的做工队列,由于是无界的,当线程池的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池的线程数量不会超过corePoolSize,同时maxiumPoolSize也就变成了一个无效的参数,并且运行中的线程池并不会拒绝任务。

FixedThreadPool运行图如下:

image-20210530202413593

执行过程如下:

1.如果当前工作中的线程数量少于corePool的数量,就创建新的线程来执行任务。

2.当线程池的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue。

3.线程执行完1中的任务后会从队列中去任务。

注意:LinkedBlockingQueue是无界队列,所以可以一直添加新任务到线程池。

2.2、SingleThreadExecutor

SingleThreadExecutor是使用单个worker线程的Executor。特点是使用单个工作线程执行任务。它的构造源码如下:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1,1,
                                    0L,
                                    TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

SingleThreadExecutor 的 corePoolSize 和 maxiumPoolSize 都被设置1。其他参数均与 FixedThreadPool 相同,其运行图如下:

image-20210530202529591

执行过程如下:

1.如果当前工作中的线程数量少于corePool的数量,就创建一个新的线程来执行任务。

2.当线程池的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue。

3.线程执行完1中的任务后会从队列中去任务。

注意:由于在线程池中只有一个工作线程,所以任务可以按照添加顺序执行。

2. 3、CachedThreadPool

CachedThreadPool是一个”无限“容量的线程池,它会根据需要创建新线程。特点是可以根据需要来创建新的线程执行任务,没有特定的corePool。下面是它的构造方法:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0,Integer.MAX_VALUE,
                                  60L,
                                  TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即maximum是无界的。这里keepAliveTime设置为60秒,意味着空闲的线程最多可以等待任务60秒,否则将被回收。

CachedThreadPool使用没有容量的SynchronousQueue作为主线程池的工作队列,它是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作。这意味着,如果主线程提交任务的速度高于线程池中处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU资源。其运行图如下:

image-20210530202625591

执行过程如下:

1.首先执行SynchronousQueue.offer(Runnable task)。如果在当前的线程池中有空闲的线程正在执行SynchronousQueue.poll(),那么主线程执行的offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行。,execute()方法执行成功,否则执行步骤2;

2.当线程池为空(初始maximumPool为空)或没有空闲线程时,配对失败,将没有线程执行SynchronousQueue.poll操作。这种情况下,线程池会创建一个新的线程执行任务;

3.在创建完新的线程以后,将会执行poll操作。当步骤2的线程执行完成后,将等待60秒,如果此时主线程提交了一个新任务,那么这个空闲线程将执行新任务,否则被回收。因此长时间不提交任务的CachedThreadPool不会占用系统资源。

SynchronousQueue是一个不存储元素阻塞队列,每次要进行offer操作时必须等待poll操作,否则不能继续添加元素。

2.4、具体应用案例

1、newCachedThreadPool

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:

public class Demo1 {
    public static void main(String[] args) {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for(int i = 0; i < 10; i++){
            final int index = i;
            try {
                Thread.sleep(index * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cachedThreadPool.execute(new Runnable(){
                @Override
                public void run() {
                    System.out.println(index);
                }
            });
        }
    }
}

线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

2、newFixedThreadPool

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:

public class Demo2 {
    public static void main(String[] args) {
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            fixedThreadPool.execute(new Runnable(){
                @Override
                public void run() {
                    try {
                        System.out.println(index);
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()。可参考PreloadDataCache。

3、newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:

public class Demo3 {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        scheduledThreadPool.schedule(new Runnable(){
            @Override
            public void run() {
                System.out.println("延迟3秒");
            }
        }, 3, TimeUnit.SECONDS);
    }
}

表示延迟3秒执行。定期执行示例代码如下:

public class Demo4 {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        scheduledThreadPool.scheduleAtFixedRate(new Runnable(){
            @Override
            public void run() {
                System.out.println("延迟1秒,每3秒执行1次");
            }
        }, 1, 3, TimeUnit.SECONDS);
    }
}

表示延迟1秒后每3秒执行一次。ScheduledExecutorService比Timer更安全,功能更强大,后面会有一篇单独进行对比。

4、newSingleThreadExecutor

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:

public class Demo5 {
    public static void main(String[] args) {
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            singleThreadExecutor.execute(new Runnable(){
                @Override
                public void run() {
                    try {
                        System.out.println(index);
                        Thread.sleep(2000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

结果依次输出,相当于顺序执行各个任务。现行大多数GUI程序都是单线程的。Android中单线程可用于数据库操作,文件操作,应用批量安装,应用批量删除等不适合并发但可能IO阻塞性及影响UI线程响应的操作。

3、ScheduledThreadPoolExecutor 详解

我们知道Timer与TimerTask虽然可以实现线程的周期和延迟调度,但是Timer与TimerTask存在一些缺陷,所以对于这种定期、周期执行任务的调度策略,我们一般都是推荐ScheduledThreadPoolExecutor来实现。下面就深入分析ScheduledThreadPoolExecutor是如何来实现线程的周期、延迟调度的。

ScheduledThreadPoolExecutor,继承ThreadPoolExecutor且实现了ScheduledExecutorService接口,它就相当于提供了“延迟”和“周期执行”功能的ThreadPoolExecutor。在JDK API中是这样定义它的:ScheduledThreadPoolExecutor,它可另行安排在给定的延迟后运行命令,或者定期执行命令。需要多个辅助线程时,或者要求 ScheduledThreadPoolExecutor具有额外的灵活性或功能时,此类要优于 Timer。 一旦启用已延迟的任务就执行它,但是有关何时启用,启用后何时执行则没有任何实时保证。按照提交的先进先出 (FIFO) 顺序来启用那些被安排在同一执行时间的任务。

3.1、ScheduledThreadPoolExecutor 的运行机制

ScheduledThreadPoolExecutor 的执行示意图如下图所示:

image-20210530202849399

DelayQueue 是一个无界队列,所以 ThreadPoolExecutor 的 maximumPoolSize 在 ScheduledThreadPoolExecutor 中没有什么意义。

ScheduledThreadPoolExecutor 的执行主要分为两大部分:

1、当调用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者 scheduledWithFixedDelay() 方法时,会向 ScheduledThreadPoolExecutor 的 DelayQueue 添加一个实现了 RunnableScheduledFuture 接口的 ScheduledFutureTask。

2、线程池中的线程从 DelayQueue 中获取 ScheduledFutureTask,然后执行任务。

ScheduledThreadPoolExecutor 为了实现周期性的执行任务,对 ThreadPoolExecutor 做了如下的修改:

1、使用 DelayQueue 作为任务队列;

2、获取任务的方式不同(后文会讲解到);

3、执行周期任务后,增加了额外的处理(后文会讲解到)。

3.2、ScheduledThreadPoolExecutor 的实现

先来看下ScheduledThreadPoolExecutor类中的主要结构:

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    private static final AtomicLong sequencer = new AtomicLong(0);
    private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {
...
    }
    private void delayedExecute(RunnableScheduledFuture<?> task) { ... }
    void reExecutePeriodic(RunnableScheduledFuture<?> task) { ... }
    protected <V> RunnableScheduledFuture<V> decorateTask(...){...}
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {...}
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {...}
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {...}
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {...}
    public void execute(Runnable command) {...}
    public Future<?> submit(Runnable task) {...}
    public <T> Future<T> submit(Runnable task, T result) {...}
    public <T> Future<T> submit(Callable<T> task) {...}
    public void shutdown() {...}
    public List<Runnable> shutdownNow() {...}
    public BlockingQueue<Runnable> getQueue() {...}
    static class DelayedWorkQueue extends AbstractQueue<Runnable>
            implements BlockingQueue<Runnable> {...}
//......
}

ScheduledThreadPoolExecutor 会把调度的任务(ScheduledFutureTask)放到一个DelayQueue中。下面来看下ScheduledFutureTask主要包含的3个成员变量:

private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {
    private final long sequenceNumber;
    private long time;
    private final long period;
...
}

1、long型成员变量time:表示这个任务将要执行的具体时间;

2、long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号;

3、long型成员变量period,表示任务执行的间隔周期。

DelayQueue 封装了一个 PriorityQueue,这个 PriorityQueue 会对队列中的 ScheduledEutureTask 进行排序。排序时,time 小的排在前面(时间早的任务将被先执行)。如果两个 ScheduledFutureTask 的 time 相同,就比较 sequenceNumber,sequenceNumber 小的排在前面,也就是说,如果两个任务的执行时间相同,那么先执行提交早的任务。

下图所示的是:ScheduledThreadPoolExecutor 中的线程1执行某个周期任务的4个步骤:

image-20210530203014524

1、线程1从 DelayQueue 中获取已到期的 ScheduledFutureTask(DealyQueue.take())。到其任务是指 ScheduledFutureTask 的 time 大于等于当前时间;

2、线程1执行这个 ScheduledFutureTask;

3、线程1修改 ScheduledFutureTask 的 time 变量为下次将要被执行的时间;

4、线程1把这个修改 time 之后的 ScheduledFutureTask 放回 DelayQueue 中(DelayQueue.add())。

下面就看下DelayQueue.take()方法的源代码实现:【在源代码中:DelayQueue 就是 DelayedWorkQueue】

static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {
    public RunnableScheduledFuture take() throws InterruptedException {
        // 获取lock
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                RunnableScheduledFuture first = queue[0]; // 获取任务
                if (first == null)
                    available.await(); // 如果队列为空,则等待
                else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0)
                        return finishPoll(first);
                    else if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }
}

DelayQueue.take()的执行示意图如下所示:

image-20210530203108451

如上图所示的过程,大致可以分为3个步骤:

1、获取Lock;

2、获取周期任务;

  • 2.1、如果 PriorityQueue 为空,当前线程到 Condition 中等待,否则执行下面的2.2;
  • 2.2、如果 PriorityQueue 的头元素的 time 时间比当前时间大,到 Condition 中等待到 time 时间,否则执行2.3;
  • 2.3、获取 PriorityQueue 的头元素,如果 PriorityQueue 不为空,则唤醒在 Condition 中等待的所有线程。

3、释放Lock。

ScheduledThreadFutureTask 在一个循环中执行步骤2,直到线程从 PriorityQueue 获取到一个元素之后,才会退出无限循环。

下面看下 ScheduledThreadFutureTask 中的线程把 ScheduledFutureTask 放入 DelayQueue 中的过程。下面是 DelayQueue.add() 的源代码实现:

static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {
    public boolean offer(Runnable x) {
        if (x == null)
            throw new NullPointerException();
        RunnableScheduledFuture e = (RunnableScheduledFuture)x;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = size;
            if (i >= queue.length)
                grow();
            size = i + 1;
            if (i == 0) {
                queue[0] = e;
                setIndex(e, 0);
            } else {
                siftUp(i, e);
            }
            if (queue[0] == e) {
                leader = null;
                available.signal();
            }
        } finally {
            lock.unlock();
        }
        return true;
    }
......
}

下图是 DelayQueue.add() 的执行示意图:

image-20210530203204118

如上图所示,添加任务分为3大步骤:

1、获取 Lock;

2、添加任务;

  • 2.1、向 PriorityQueue 添加任务;
  • 2.2、如果在上面2.1 中添加的任务是 PriorityQueue 的头元素,则唤醒在 Conditon 中等待的所有线程;

3、释放 Lock。

4、FutureTask 详解

Future 接口和实现 Future 接口的 FutureTask 类,代表异步计算的结果。

4.1、FutureTask 简介

FutureTask 除了实现了 Future 接口外,还实现了 Runnable接口。那么我们就先看下这两个接口的内部结构。

Future 接口的内部结构

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
}

Runnable 接口的内部结构

public interface Runnable {  
	public abstract  void run();  
}

1、未启动:FutureTask.run()方法还没有被执行之前,FutureTask 处于未启动状态。当创建一个 FutureTask,且没有执行 FutureTask.run() 方法之前,这个 FutureTask 处于未启动状态;

2、已启动:FutureTask.run()方法被执行的过程中,FutureTask 处于已启动状态;

3、已完成:FutureTask.run()方法执行完成后正常结束,或被取消(FutureTask.cancel(…)),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask 处于已完成状态。

FutureTask 的状态迁移的示意图如下所示:

image-20210530203308304

FutureTask 的 get 和 cancel 的执行示意图如下所示:

image-20210530203414523

4.2、FutureTask 的实现

先看下 FutureTask 的内部结构:

public class FutureTask<V> implements RunnableFuture<V> {
    private final Sync sync;

    // 构造函数1 Callable
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable);
    }

    // 构造函数2 Runnable
    public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }

    // 调用的是sync中的innerCancel方法
    public boolean cancel(boolean mayInterruptIfRunning) {
        return sync.innerCancel(mayInterruptIfRunning);
    }

    // 调用的是sync中的innerGet方法
    public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
    }

    // 调用的是sync中的innerGet方法
    public V get(long timeout, TimeUnit unit) {
throws InterruptedException, ExecutionException, TimeoutException {
            return sync.innerGet(unit.toNanos(timeout));
        }
        // 调用的是sync中的innerRun方法
        public void run () {
            sync.innerRun();
        }
        private final class Sync extends AbstractQueuedSynchronizer {...
        }
        // .......
    }
}

从 FutureTask 的源码中可以看出来,它的实现是基于 AbstractQueuedSynchronizer 。AQS 是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。基于 AQS 实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch 和 FutureTask。

每一个基于 AQS 实现的同步器都会包含两种类型的操作,如下:

1、至少一个 acquire 操作:这个操作阻塞调用线程,除非 / 直到 AQS 的状态允许这个线程继续执行。 FutureTask 的 acquire 操作为 get() / get(long timeout, TimeUnit unit)方法调用;

2、至少一个 release 操作:这个操作改变 AQS 的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask 的 release 操作包括 run() 方法和 cancel(…) 方法。

基于“复合优先继承”的原则,FutureTask 声明了一个内部私有的继承于 AQS 的子类 Sync,对 FutureTask 所有公有方法的调用都会委托给这个内部子类。

AQS 被作为“模板方法模式”的基础类提供给 FutureTask 的内部子类 Sync,这个内部子类只需要实现状态检测和状态更新的方法即可,这些方法将控制 FutureTask 的获取和释放操作。具体来说,Sync实现了 AQS 的 tryAcquireShared(int)方法和 tryReleaseShared(int)方法,Sync 通过这两个方法来检查和更新同步状态。

FutureTask 的设计示意图如下图所示:

image-20210530203513078

如图所示,Sync 是 FutureTask 的内部私有类,它继承自 AQS。创建 FutureTask 时会创建内部私有的成员对象 Sync,FutureTask 所有的公有方法都直接委托给了内部私有的 Sync。

下面对 FutureTask 中主要的几个方法进行调用过程分析:

4.2.1、FutureTask.get() 方法

第1步:调用 FutureTask 中的 get() 方法

public V get() throws  InterruptedException, ExecutionException {
	return sync.innerGet();
}

从源码中很清楚的看到 get() 方法内部是由 sync 的 innerGet()方法实现的。

第2步:调用 Sync 中的 innerGet()方法

V innerGet() throws InterruptedException, ExecutionException {
    acquireSharedInterruptibly(0);
    if (getState() == CANCELLED)
        throw new CancellationException();
    if (exception != null)
        throw new ExecutionException(exception);
    return result;
}

第3步:调用 AQS.acquireSharedInterruptibly(int args)方法。

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

第4步:调用Sync.tryAcquireShared方法

第5步:调用 AQS.doAcquireSharedIntrruptibly方法

这个方法首先会在子类 Sync 中实现的 tryAcquireShared()方法来判断 acquire 操作是否可以成功,acquire 操作可以成功的条件为:state 为执行完成状态RAN 或取消状态 CANCELLED,且 runner 不为null。

【至于tryAcquireShared和doAcquireSharedIntrruptibly方法,这里不再做源码分析了,前面文章已经分析过多次了】

如果成功则立即返回,如果失败则到线程等待队列中去等待其他线程执行 release 操作。

当其他线程执行 release 操作(比如:FutureTask.run() 或 FutureTask.cancel(…))唤醒当前线程后,当前线程再次执行 tryAcquiredShared() 将返回正值 1,当前线程将离开线程等待队列,并唤醒它的后继节点线程。

最后返回计算的结果或者抛出异常。

4.2.2、FutureTask.run() 方法

第1步:调用了 FutureTask.run() 方法

public void run() {
	sync.innerRun();
}

第2步:调用 Sync.innerRun() 方法

void innerRun() {
    if (!compareAndSetState(READY, RUNNING))
        return;
    runner = Thread.currentThread();
    if (getState() == RUNNING) { // recheck after setting thread
        V result;
        try {
            result = callable.call();
        } catch (Throwable ex) {
            setException(ex);
            return;
        }
        set(result);
    } else {
        releaseShared(0); // cancel
    }
}

第3步:调用AQS.releaseShared(int args)方法

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

AQS.releaseShared(int args)首先会回调子类 Sync 中实现的 tryReleaseShared(int args)方法来执行 release操作。

第4步:调用 Sync.tryReleaseShared(int args) 方法

protected boolean tryReleaseShared(int ignore) {
	runner = null;
	return true;
}

设置允许任务线程 runner 为 null,然后返回 true。

第5步:调用AQS.doReleaseShared() 方法

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue; // loop on failed CAS
        }
        if (h == head) // loop if head changed
            break;
    }
}

唤醒线程等待队列中的第一个线程。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

Fork/Join框架 Previous
并发知识点 Next