本文主要内容:

  • 线程模型概述
  • 事件循环的概念和实现
  • 任务调度
  • 实现细节

线程模型概述

基本的线程池化模式可以描述为:

  • 从池的空闲列表中选择一个Thread,并且指派它去运行一个已提交的任务(一个Runnable的实现);

  • 当任务完成时,将该Thread返回给该列表,使其可被重用。

在这里插入图片描述

(1)要执行的任务;

(2)任务递交给了线程池;

(3)从线程池中拉取一个可用的Thread,并执行任务,当任务完成时,将该Thread返回给空闲列表,使其可被重用

EventLoop接口

EventLoop是协同设计的一部分,采用了两个基本的API:并发和网络编程。

在这里插入图片描述

一个EventLoop将由一个永远都不会改变的Thread驱动,同时任务(Runnable或者Callable)可以直接提交给EventLoop实现,以立即执行或者调度执行。根据配置和可用核心不同,可能会创建多个EventLoop实例用以优化资源的使用,并且单个EventLoop可能会被指派用于服务多个Channel。

Netty4中的I/O和事件处理

所有的I/O操作和事件都由已经被分配给了EventLoop的那个Thread来处理。

任务调度

常见的用例是:发送心跳消息到远程节点,以检查连接是否还活着。

JDK的任务调度API

java.util.concurrent.Executors类的工厂方法

方法 描述
newScheduledThreadPool(int corePoolSize)

newScheduledThreadPool(int corePoolSize,
ThreadFactory threadFactory)
创建一个ScheduledThreadExecutorService,用于调度命令在指定延迟之后运行或者周期性地执行。它使用corePoolSize参数来计算线程数
newSingleThreadScheduledExecutor()

newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
创建一个ScheduledThreadExecutorService,用于调度命令在指定延迟之后运行或者周期性地执行。它使用一个线程来执行被调度的任务。

下面代码展示了如何使用ScheduledExecutorService来在60s的延迟之后执行一个任务:

public static void schedule() {
        //创建一个其线程池具有 10 个线程的ScheduledExecutorService
        ScheduledExecutorService executor =
                Executors.newScheduledThreadPool(10);

        ScheduledFuture<?> future = executor.schedule(
            //创建一个 Runnable,以供调度稍后执行
            new Runnable() {
            @Override
            public void run() {
                //该任务要打印的消息
                System.out.println("Now it is 60 seconds later");
            }
        //调度任务在从现在开始的 60 秒之后执行
        }, 60, TimeUnit.SECONDS);
        //...
        //一旦调度任务执行完成,就关闭 ScheduledExecutorService 以释放资源
        executor.shutdown();
    }

高负载下性能上不佳

使用EventLoop调度任务

public static void scheduleViaEventLoop() {
     Channel ch = CHANNEL_FROM_SOMEWHERE; // get reference from somewhere
     ScheduledFuture<?> future = ch.eventLoop().schedule(
         //创建一个 Runnable以供调度稍后执行
         new Runnable() {
         @Override
         public void run() {
             //要执行的代码
             System.out.println("60 seconds later");
         }
         //调度任务在从现在开始的 60 秒之后执行
     }, 60, TimeUnit.SECONDS);
 }

如果要调度任务以每隔60s执行一次,则使用ScheduleAtFixedRate()方法。

要想取消或者检查(被调度任务的)执行状态,可以使用每个异步操作所返回的ScheduledFuture。

public static void cancelingTaskUsingScheduledFuture(){
       Channel ch = CHANNEL_FROM_SOMEWHERE; // get reference from somewhere
       //...
       //调度任务,并获得所返回的ScheduledFuture
       ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate(
               new Runnable() {
                   @Override
                   public void run() {
                       System.out.println("Run every 60 seconds");
                   }
               }, 60, 60, TimeUnit.SECONDS);
       // Some other code that runs...
       boolean mayInterruptIfRunning = false;
       //取消该任务,防止它再次运行
       future.cancel(mayInterruptIfRunning);
   }

实现细节

线程管理

Netty线程模型的卓越性能取决于对于当前执行的Thread的身份的确定(通过调用EventLoop的inEventLoop(Thread)方法实现),也就是说,确定它是否是分配给当前Channel以及它的EventLoop的那一个线程。

如果当前调用线程正是支撑EventLoop的线程,那么所提交的代码块将会被直接执行,否则,EventLoop将调度该任务以便稍后执行,并将它放入到内部队列中,当EventLoop下次处理它的事件时,它会执行队列中的那些任务/事件。

注意:每个EventLoop都有他自己的任务队列,独立于任何其他的EventLoop。

在这里插入图片描述

EventLoop/线程的分配

异步传输

异步传输实现只使用了少量的EventLoop(以及和它们相关联的Thread),而且在当前的线程模型中,它们可能会被多个Channel所共享。这使得可以通过尽可能少的Thread来支撑大量的Channel,而不是每个Channel分配一个Thread。

在这里插入图片描述

EventLoopGroup负责为每个新创建的Channel分配一个EventLoop,并且相同的Event Loop可能会被分配给多个Channel。

一旦一个Channel被分配给要给EventLoop,它将在它的整个生命周期中都使用整个EventLoop(以及相关联的Thread)。

阻塞传输

每个Channel都将会被分配给一个EventLoop(以及它的Thread)

在这里插入图片描述