本文主要内容:
- ChannelHandler和ChannelPipeline
- 检测资源泄漏
- 异常处理
ChannelHandler家族
Channel的生命周期
状态 | 描述 |
---|---|
ChannelUnregistered | Channel已经被创建,但还未注册到EventLoop |
ChannelRegistered | Channel已经被注册到了EventLoop |
ChannelActive | Channel处于活动状态(已连接到远程节点),可以接收和发送数据了 |
ChannelInactive | Channel没有连接到远程节点 |
Channel的正常生命周期如下图所示,当这些状态发生改变时,将会生成对应的事件,这些事件将会被转发给ChannelPipeline中的ChannelHandler,其可以随后对它们做出响应。
ChannelHandler的生命周期
ChannelHandler的生命周期方法主要有下面几种,这些方法都有一个ChannelHandlerContext参数,在ChannelHandler被添加到ChannelPipeline中或者从ChannelPipeline中移除时都会调用这些操作。
类型 | 描述 |
---|---|
handlerAdded | 当把ChannelHandler添加到ChannelPipeline中时被调用 |
handlerRemoved | 当从ChannelPineline中移除ChannelHandler时被调用 |
exceptionCaught | 当处理过程中在ChannelPipeline中有错误产生时被调用 |
Netty定义了两个重要的ChannelHandler子接口:
- ChannelInboundHandler:处理入站数据以及各种状态变化;
- ChannelOutboundHandler:处理出站数据并且允许拦截所有的操作;
ChannelInboundHandler接口
下面这些是ChannelInboundHandler的生命周期方法,将会在数据被接收时或者与其对应的Channel状态发生改变时被调用。
类型 | 描述 |
---|---|
ChannelUnregistered | 当Channel从它的EventLoop注销并且无法处理任何I/O时被调用 |
ChannelRegistered | 当Channel已经注册到它的EventLoop,并且能够处理I/O时被调用 |
ChannelActive | 当Channel处于活动状态(已连接到远程节点)时被调用 |
ChannelInactive | 当Channel离开活动状态并且不再连接它的远程节点时被调用 |
ChannelReadComplete | 当Channel上的一个读操作完成时被调用 |
ChannelRead | 当从Channel读取数据时被调用 |
ChannelWritabilityChanged | 当Channel的可写状态发生改变时被调用 |
userEventTriggered | 当调用ChannelInboundHandler.fireUserEventTriggered()方法时被调用 |
当某个ChannelInboundHandler的实现重写了channelRead()方法时,它将负责显式地释放与池化和ByteBuf实例相关的内存。
public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//丢弃已接收的消息
ReferenceCountUtil.release(msg);
}
}
不过我们一般使用SimpleChannelInboundHandler来自动释放资源。
public class SimpleDiscardHandler
extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx,
Object msg) {
//不需要任何显式的资源释放
// No need to do anything special
}
}
由于使用SimpleChannelInboundHandler会自动释放资源,所以不能存储指向任何消息的引用供将来使用。
ChannelOutboundHandler接口
用来处理出站操作,它的方法将被Channel、ChannelPipeline以及ChannelHandlerContext调用。
ChannelOutboundHandler的一个强大的功能就是可以按需推迟操作或者事件,比如到远程节点的写入被暂停了,你可以推迟冲刷操作并在稍后继续。
public interface ChannelOutboundHandler extends ChannelHandler {
//当请求将Channel绑定到本地地址时被调用
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
//当请求将Channel连接到远程节点时被调用
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
//当请求将Channel从远程节点断开时被调用
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
//当请求关闭Channel时被调用
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
//当请求将Channel从它的EventLoop注销时被调用
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
//当请求从Channel读取更多的数据时被调用
void read(ChannelHandlerContext ctx) throws Exception;
//当请求通过Channel将数据写到远程节点时被调用
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
//当请求通过Channel将入队数据冲刷到远程节点时被调用
void flush(ChannelHandlerContext ctx) throws Exception;
}
ChannelOutboundHandler中的大部分方法都需要一个ChannelPromise参数,以便在操作完成时得到通知。ChannelPromise是ChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()和setFailure(),从而使ChannelFuture不可变。
ChannelHandler适配器
我们可以使用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter类作为自己的ChannelHandler的起始点。这两个适配器分别提供了ChannelInboundHandler和ChannelOutboundHandler的基本实现,通过扩展抽象类ChannelHandlerAdapter,他们获得了ChannelHandler的方法。生成的类的层次结构如下图
在ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中所提供的方法体调用了其相关联的ChannelHandlerContext上的等效方法,从而将事件转发到了ChannelPipeline中的下一个ChannelHandler中。
资源管理
每当通过调用ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()方法来处理数据时,你都需要确保没有任何的资源泄露。
Netty提供了4种泄漏检测级别,分别如下:
DISABLED——禁用泄露检测
SIMPLE——使用1%的默认采样率检测并报告任何发现的泄露
ADVANCED——使用默认的采样率,报告所发现的任何的泄露以及对应的消息被访问的位置
PARANOID——类似于ADVANCED,但是其将会对每次访问都进行采样,这对性能将会有很大的影响,应该只在调试阶段使用
泄露检测级别可以通过将下面的Java系统属性设置为表中的一个值来定义:
java -Dio.netty.leakDetectionLevel = ADVANCED
消费入站可以通过SimpleChannelInboundHandler类来实现,在消息被channelRead0()方法消费之后自动释放消息。
如果是出站消息,丢弃并释放资源的代码参考如下:
public class DiscardOutboundHandler
extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx,
Object msg, ChannelPromise promise) {
//通过使用 ReferenceCountUtil.realse(...)方法释放资源
ReferenceCountUtil.release(msg);
//通知 ChannelPromise数据已经被处理了
promise.setSuccess();
}
}
这里要注意的是,我们不仅要释放资源,同时也要通知ChannelPromise,否则可能会出现ChannelFutureListener收不到某个消息已经被处理的通知。
ChannelPipeline接口
每个新建的Channel都会被分配一个新的ChannelPipeline,Channel既不能附加另一个ChannelPipeline,也不能分离当前的。
根据事件的起源,事件将会被ChannelInboundHandler或者ChannelOutboundHandler处理,随后通过调用ChannelHandlerContext的实现,它将被转发给同一超类型的下一个ChannelHandler。
ChannelHandlerContext的作用就是使得ChannelHandler能够和它的ChannelPipeline以及其他的ChannelHandler交互。
Netty总是将ChannelPipeline的入站口作为头部,而将出站口作为尾部,在ChannelPipeline传播事件时,它会测试ChannelPipeline中下一个ChannelHandler的类型是否和事件的运动方向相匹配,如果不匹配就跳过,直到找到和该事件所期望的方向相匹配的为止。
修改ChannelPipeline
通过调用ChannelPipeline上的相关方法,ChannelHandler可以添加、删除或者替换其他的ChannelHandler,从而实时地修改ChannelPipeline的布局。
public static void modifyPipeline() {
ChannelPipeline pipeline = CHANNEL_PIPELINE_FROM_SOMEWHERE; // get reference to pipeline;
//创建一个 FirstHandler 的实例
FirstHandler firstHandler = new FirstHandler();
//将该实例作为"handler1"添加到ChannelPipeline 中
pipeline.addLast("handler1", firstHandler);
//将一个 SecondHandler的实例作为"handler2"添加到 ChannelPipeline的第一个槽中。这意味着它将被放置在已有的"handler1"之前
pipeline.addFirst("handler2", new SecondHandler());
//将一个 ThirdHandler 的实例作为"handler3"添加到 ChannelPipeline 的最后一个槽中
pipeline.addLast("handler3", new ThirdHandler());
//...
//通过名称移除"handler3"
pipeline.remove("handler3");
//通过引用移除FirstHandler(它是唯一的,所以不需要它的名称)
pipeline.remove(firstHandler);
//将 SecondHandler("handler2")替换为 FourthHandler:"handler4"
pipeline.replace("handler2", "handler4", new FourthHandler());
}
通常ChannelPipeline中的每个ChannelHandler都是通过它的EventLoop(I/O线程)来处理传递给它的事件的。
触发事件
- ChannelPipeline保存了与Channel相关联的ChannelHandler;
- ChannelPipeline可以根据需要、通过添加或者删除ChannelHandler来动态修改;
- ChannelPipeline有着丰富的API用以被调用、以响应入站和出站事件。
ChannelHandlerContext接口
前面也提到了,ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之间的关联,ChannelHandlerContext的主要功能就是管理它所关联的ChannelHandler和在同一个ChannelPipeline中的其他ChannelHandler之间的交互。
ChannelHandlerContext本身很多方法在Channel和ChannelPipeline中也存在,但是如果调用Channel或者ChannelPipeline上的这些方法,它们将沿着整个ChannelPipeline进行传播,而调用位于ChannelHandlerContext上的这些方法,则将从当前所关联的ChannelHandler开始,并且只会传播给位于该ChannelPipeline中的下一个能够处理该事件的ChannelHandler,因此它会产生更短的事件流,性能会得到提高。
另外,ChannelHandlerContext和ChannelHandler之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的。
使用ChannelHandlerContext
Channel、ChannelPipeline、ChannelHandler以及ChannelHandlerContext之间的关系如下图:
为什么想要从channelPipeline中的某个特定点开始传播事件呢?
为了减少将事件经传对它不感兴趣的ChannelHandler所带来的开销;
为了避免将事件传经那些可能会对它感兴趣的ChannelHandler。
如果想调用从某个特定的ChannelHandler开始的处理过程,必须获取到在(ChannelPipeline)该ChannelHandler之前的ChannelHandler所关联的ChannelHandlerContext,这个ChannelHandlerContext将调用和它所关联的ChannelHandler之后的ChannelHandler。
通过ChannelHandlerContext触发的操作的事件流如下图所示:
ChannelHandler和ChannelHandlerContext的高级用法
通过调用ChannelHandlerContext上的pipeline()方法来获得被封闭的ChannelPipeline的引用,这使得运行时得以操作ChannelPipeline的ChannelHandler。如可以通过将ChannelHandler添加到ChannelPipeline中来实现动态的协议切换;还可以缓存ChannelHandlerContext的引用以供后面使用。
public class WriteHandler extends ChannelHandlerAdapter {
private ChannelHandlerContext ctx;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
//存储到 ChannelHandlerContext的引用以供稍后使用
this.ctx = ctx;
}
public void send(String msg) {
//使用之前存储的到 ChannelHandlerContext的引用来发送消息
ctx.writeAndFlush(msg);
}
}
因为一个ChannelHandler可以从属于多个ChannelPipeline,所以也可以绑定多个ChannelHandlerContext实例,但是ChannelHandler必须使用@Sharable注解,并且ChannelHandler必须是线程安全的。
看个错误示例:
@Sharable
public class UnsharableHandler extends ChannelInboundHandlerAdapter {
private int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//将 count 字段的值加 1
count++;
//记录方法调用,并转发给下一个ChannelHandler
System.out.println("inboundBufferUpdated(...) called the "
+ count + " time");
ctx.fireChannelRead(msg);
}
}
为什么要共享一个ChannelHandler?
主要是用于收集跨越多个Channel的统计信息。
异常处理
处理入站异常
要想处理入站异常,需要在自己的ChannelInboundHandler实现中重写exceptionCaught()方法。
public class InboundExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
- ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给ChannelPipeline中的下一个ChannelHandler;
- 如果异常到达了ChannelPipeline的尾端,它将会被记录为未被处理;
- 要想自定义处理逻辑,需要重写exceptionCaught()方法,然后决定是否需要将异常传播出去。
处理出站异常
用于处理出站操作中的正常完成以及异常完成的选项,都基于以下的通知机制:
- 每个出站操作都将返回一个ChannelFuture。注册到ChannelFuture的ChannelFutureListener将在操作完成时被通知该操作是成功了还是失败了;
- 几乎所有的ChannelOutboundHandler上的方法都会传入一个ChannelPromise的实例,作为ChannelFuture的子类,ChannelPromise也可以被分配用于异步通知的监听器。
添加ChannelFutureListener到ChannelFuture的代码如下:
public static void addingChannelFutureListener(){
Channel channel = CHANNEL_FROM_SOMEWHERE; // get reference to pipeline;
ByteBuf someMessage = SOME_MSG_FROM_SOMEWHERE; // get reference to pipeline;
//...
io.netty.channel.ChannelFuture future = channel.write(someMessage);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(io.netty.channel.ChannelFuture f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
}
}
});
}
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!