本文主要内容:
- 线程模型概述
- 事件循环的概念和实现
- 任务调度
- 实现细节
线程模型概述
基本的线程池化模式可以描述为:
从池的空闲列表中选择一个Thread,并且指派它去运行一个已提交的任务(一个Runnable的实现);
当任务完成时,将该Thread返回给该列表,使其可被重用。
(1)要执行的任务;
(2)任务递交给了线程池;
(3)从线程池中拉取一个可用的Thread,并执行任务,当任务完成时,将该Thread返回给空闲列表,使其可被重用
EventLoop接口
EventLoop是协同设计的一部分,采用了两个基本的API:并发和网络编程。
一个EventLoop将由一个永远都不会改变的Thread驱动,同时任务(Runnable或者Callable)可以直接提交给EventLoop实现,以立即执行或者调度执行。根据配置和可用核心不同,可能会创建多个EventLoop实例用以优化资源的使用,并且单个EventLoop可能会被指派用于服务多个Channel。
Netty4中的I/O和事件处理
所有的I/O操作和事件都由已经被分配给了EventLoop的那个Thread来处理。
任务调度
常见的用例是:发送心跳消息到远程节点,以检查连接是否还活着。
JDK的任务调度API
java.util.concurrent.Executors类的工厂方法
方法 | 描述 |
---|---|
newScheduledThreadPool(int corePoolSize) newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) |
创建一个ScheduledThreadExecutorService,用于调度命令在指定延迟之后运行或者周期性地执行。它使用corePoolSize参数来计算线程数 |
newSingleThreadScheduledExecutor() newSingleThreadScheduledExecutor(ThreadFactory threadFactory) |
创建一个ScheduledThreadExecutorService,用于调度命令在指定延迟之后运行或者周期性地执行。它使用一个线程来执行被调度的任务。 |
下面代码展示了如何使用ScheduledExecutorService来在60s的延迟之后执行一个任务:
public static void schedule() {
//创建一个其线程池具有 10 个线程的ScheduledExecutorService
ScheduledExecutorService executor =
Executors.newScheduledThreadPool(10);
ScheduledFuture<?> future = executor.schedule(
//创建一个 Runnable,以供调度稍后执行
new Runnable() {
@Override
public void run() {
//该任务要打印的消息
System.out.println("Now it is 60 seconds later");
}
//调度任务在从现在开始的 60 秒之后执行
}, 60, TimeUnit.SECONDS);
//...
//一旦调度任务执行完成,就关闭 ScheduledExecutorService 以释放资源
executor.shutdown();
}
高负载下性能上不佳
使用EventLoop调度任务
public static void scheduleViaEventLoop() {
Channel ch = CHANNEL_FROM_SOMEWHERE; // get reference from somewhere
ScheduledFuture<?> future = ch.eventLoop().schedule(
//创建一个 Runnable以供调度稍后执行
new Runnable() {
@Override
public void run() {
//要执行的代码
System.out.println("60 seconds later");
}
//调度任务在从现在开始的 60 秒之后执行
}, 60, TimeUnit.SECONDS);
}
如果要调度任务以每隔60s执行一次,则使用ScheduleAtFixedRate()方法。
要想取消或者检查(被调度任务的)执行状态,可以使用每个异步操作所返回的ScheduledFuture。
public static void cancelingTaskUsingScheduledFuture(){
Channel ch = CHANNEL_FROM_SOMEWHERE; // get reference from somewhere
//...
//调度任务,并获得所返回的ScheduledFuture
ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
System.out.println("Run every 60 seconds");
}
}, 60, 60, TimeUnit.SECONDS);
// Some other code that runs...
boolean mayInterruptIfRunning = false;
//取消该任务,防止它再次运行
future.cancel(mayInterruptIfRunning);
}
实现细节
线程管理
Netty线程模型的卓越性能取决于对于当前执行的Thread的身份的确定(通过调用EventLoop的inEventLoop(Thread)方法实现),也就是说,确定它是否是分配给当前Channel以及它的EventLoop的那一个线程。
如果当前调用线程正是支撑EventLoop的线程,那么所提交的代码块将会被直接执行,否则,EventLoop将调度该任务以便稍后执行,并将它放入到内部队列中,当EventLoop下次处理它的事件时,它会执行队列中的那些任务/事件。
注意:每个EventLoop都有他自己的任务队列,独立于任何其他的EventLoop。
EventLoop/线程的分配
异步传输
异步传输实现只使用了少量的EventLoop(以及和它们相关联的Thread),而且在当前的线程模型中,它们可能会被多个Channel所共享。这使得可以通过尽可能少的Thread来支撑大量的Channel,而不是每个Channel分配一个Thread。
EventLoopGroup负责为每个新创建的Channel分配一个EventLoop,并且相同的Event Loop可能会被分配给多个Channel。
一旦一个Channel被分配给要给EventLoop,它将在它的整个生命周期中都使用整个EventLoop(以及相关联的Thread)。
阻塞传输
每个Channel都将会被分配给一个EventLoop(以及它的Thread)
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!