本文主要内容:

  • ChannelHandler和ChannelPipeline
  • 检测资源泄漏
  • 异常处理

ChannelHandler家族

Channel的生命周期

状态 描述
ChannelUnregistered Channel已经被创建,但还未注册到EventLoop
ChannelRegistered Channel已经被注册到了EventLoop
ChannelActive Channel处于活动状态(已连接到远程节点),可以接收和发送数据了
ChannelInactive Channel没有连接到远程节点

Channel的正常生命周期如下图所示,当这些状态发生改变时,将会生成对应的事件,这些事件将会被转发给ChannelPipeline中的ChannelHandler,其可以随后对它们做出响应。

在这里插入图片描述

ChannelHandler的生命周期

ChannelHandler的生命周期方法主要有下面几种,这些方法都有一个ChannelHandlerContext参数,在ChannelHandler被添加到ChannelPipeline中或者从ChannelPipeline中移除时都会调用这些操作。

类型 描述
handlerAdded 当把ChannelHandler添加到ChannelPipeline中时被调用
handlerRemoved 当从ChannelPineline中移除ChannelHandler时被调用
exceptionCaught 当处理过程中在ChannelPipeline中有错误产生时被调用

Netty定义了两个重要的ChannelHandler子接口:

  • ChannelInboundHandler:处理入站数据以及各种状态变化;
  • ChannelOutboundHandler:处理出站数据并且允许拦截所有的操作;

ChannelInboundHandler接口

下面这些是ChannelInboundHandler的生命周期方法,将会在数据被接收时或者与其对应的Channel状态发生改变时被调用。

类型 描述
ChannelUnregistered 当Channel从它的EventLoop注销并且无法处理任何I/O时被调用
ChannelRegistered 当Channel已经注册到它的EventLoop,并且能够处理I/O时被调用
ChannelActive 当Channel处于活动状态(已连接到远程节点)时被调用
ChannelInactive 当Channel离开活动状态并且不再连接它的远程节点时被调用
ChannelReadComplete 当Channel上的一个读操作完成时被调用
ChannelRead 当从Channel读取数据时被调用
ChannelWritabilityChanged 当Channel的可写状态发生改变时被调用
userEventTriggered 当调用ChannelInboundHandler.fireUserEventTriggered()方法时被调用

当某个ChannelInboundHandler的实现重写了channelRead()方法时,它将负责显式地释放与池化和ByteBuf实例相关的内存。

public class DiscardHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //丢弃已接收的消息
        ReferenceCountUtil.release(msg);
    }
}

不过我们一般使用SimpleChannelInboundHandler来自动释放资源。

public class SimpleDiscardHandler
    extends SimpleChannelInboundHandler<Object> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx,
        Object msg) {
        //不需要任何显式的资源释放
        // No need to do anything special
    }
}

由于使用SimpleChannelInboundHandler会自动释放资源,所以不能存储指向任何消息的引用供将来使用。

ChannelOutboundHandler接口

用来处理出站操作,它的方法将被Channel、ChannelPipeline以及ChannelHandlerContext调用。

ChannelOutboundHandler的一个强大的功能就是可以按需推迟操作或者事件,比如到远程节点的写入被暂停了,你可以推迟冲刷操作并在稍后继续。

public interface ChannelOutboundHandler extends ChannelHandler {
	//当请求将Channel绑定到本地地址时被调用
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
	//当请求将Channel连接到远程节点时被调用
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;
	//当请求将Channel从远程节点断开时被调用
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
	//当请求关闭Channel时被调用
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
	//当请求将Channel从它的EventLoop注销时被调用
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
	//当请求从Channel读取更多的数据时被调用
    void read(ChannelHandlerContext ctx) throws Exception;
	//当请求通过Channel将数据写到远程节点时被调用
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
	//当请求通过Channel将入队数据冲刷到远程节点时被调用
    void flush(ChannelHandlerContext ctx) throws Exception;
}

ChannelOutboundHandler中的大部分方法都需要一个ChannelPromise参数,以便在操作完成时得到通知。ChannelPromise是ChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()和setFailure(),从而使ChannelFuture不可变。

ChannelHandler适配器

我们可以使用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter类作为自己的ChannelHandler的起始点。这两个适配器分别提供了ChannelInboundHandler和ChannelOutboundHandler的基本实现,通过扩展抽象类ChannelHandlerAdapter,他们获得了ChannelHandler的方法。生成的类的层次结构如下图

在这里插入图片描述

在这里插入图片描述

在ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中所提供的方法体调用了其相关联的ChannelHandlerContext上的等效方法,从而将事件转发到了ChannelPipeline中的下一个ChannelHandler中。

资源管理

每当通过调用ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()方法来处理数据时,你都需要确保没有任何的资源泄露。

Netty提供了4种泄漏检测级别,分别如下:

  • DISABLED——禁用泄露检测

  • SIMPLE——使用1%的默认采样率检测并报告任何发现的泄露

  • ADVANCED——使用默认的采样率,报告所发现的任何的泄露以及对应的消息被访问的位置

  • PARANOID——类似于ADVANCED,但是其将会对每次访问都进行采样,这对性能将会有很大的影响,应该只在调试阶段使用

泄露检测级别可以通过将下面的Java系统属性设置为表中的一个值来定义:

java -Dio.netty.leakDetectionLevel = ADVANCED

消费入站可以通过SimpleChannelInboundHandler类来实现,在消息被channelRead0()方法消费之后自动释放消息。

如果是出站消息,丢弃并释放资源的代码参考如下:

public class DiscardOutboundHandler
    extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx,
        Object msg, ChannelPromise promise) {
        //通过使用 ReferenceCountUtil.realse(...)方法释放资源
        ReferenceCountUtil.release(msg);
        //通知 ChannelPromise数据已经被处理了
        promise.setSuccess();
    }
}

这里要注意的是,我们不仅要释放资源,同时也要通知ChannelPromise,否则可能会出现ChannelFutureListener收不到某个消息已经被处理的通知。

ChannelPipeline接口

每个新建的Channel都会被分配一个新的ChannelPipeline,Channel既不能附加另一个ChannelPipeline,也不能分离当前的。

根据事件的起源,事件将会被ChannelInboundHandler或者ChannelOutboundHandler处理,随后通过调用ChannelHandlerContext的实现,它将被转发给同一超类型的下一个ChannelHandler。

ChannelHandlerContext的作用就是使得ChannelHandler能够和它的ChannelPipeline以及其他的ChannelHandler交互。

Netty总是将ChannelPipeline的入站口作为头部,而将出站口作为尾部,在ChannelPipeline传播事件时,它会测试ChannelPipeline中下一个ChannelHandler的类型是否和事件的运动方向相匹配,如果不匹配就跳过,直到找到和该事件所期望的方向相匹配的为止。

修改ChannelPipeline

通过调用ChannelPipeline上的相关方法,ChannelHandler可以添加、删除或者替换其他的ChannelHandler,从而实时地修改ChannelPipeline的布局。

public static void modifyPipeline() {
        ChannelPipeline pipeline = CHANNEL_PIPELINE_FROM_SOMEWHERE; // get reference to pipeline;
        //创建一个 FirstHandler 的实例
        FirstHandler firstHandler = new FirstHandler();
        //将该实例作为"handler1"添加到ChannelPipeline 中
        pipeline.addLast("handler1", firstHandler);
        //将一个 SecondHandler的实例作为"handler2"添加到 ChannelPipeline的第一个槽中。这意味着它将被放置在已有的"handler1"之前
        pipeline.addFirst("handler2", new SecondHandler());
        //将一个 ThirdHandler 的实例作为"handler3"添加到 ChannelPipeline 的最后一个槽中
        pipeline.addLast("handler3", new ThirdHandler());
        //...
        //通过名称移除"handler3"
        pipeline.remove("handler3");
        //通过引用移除FirstHandler(它是唯一的,所以不需要它的名称)
        pipeline.remove(firstHandler);
        //将 SecondHandler("handler2")替换为 FourthHandler:"handler4"
        pipeline.replace("handler2", "handler4", new FourthHandler());
    }

通常ChannelPipeline中的每个ChannelHandler都是通过它的EventLoop(I/O线程)来处理传递给它的事件的。

触发事件

  • ChannelPipeline保存了与Channel相关联的ChannelHandler;
  • ChannelPipeline可以根据需要、通过添加或者删除ChannelHandler来动态修改;
  • ChannelPipeline有着丰富的API用以被调用、以响应入站和出站事件。

ChannelHandlerContext接口

前面也提到了,ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之间的关联,ChannelHandlerContext的主要功能就是管理它所关联的ChannelHandler和在同一个ChannelPipeline中的其他ChannelHandler之间的交互。

ChannelHandlerContext本身很多方法在Channel和ChannelPipeline中也存在,但是如果调用Channel或者ChannelPipeline上的这些方法,它们将沿着整个ChannelPipeline进行传播,而调用位于ChannelHandlerContext上的这些方法,则将从当前所关联的ChannelHandler开始,并且只会传播给位于该ChannelPipeline中的下一个能够处理该事件的ChannelHandler,因此它会产生更短的事件流,性能会得到提高。

另外,ChannelHandlerContext和ChannelHandler之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的。

使用ChannelHandlerContext

Channel、ChannelPipeline、ChannelHandler以及ChannelHandlerContext之间的关系如下图:

img

为什么想要从channelPipeline中的某个特定点开始传播事件呢?

  • 为了减少将事件经传对它不感兴趣的ChannelHandler所带来的开销;

  • 为了避免将事件传经那些可能会对它感兴趣的ChannelHandler。

如果想调用从某个特定的ChannelHandler开始的处理过程,必须获取到在(ChannelPipeline)该ChannelHandler之前的ChannelHandler所关联的ChannelHandlerContext,这个ChannelHandlerContext将调用和它所关联的ChannelHandler之后的ChannelHandler。

通过ChannelHandlerContext触发的操作的事件流如下图所示:

img

ChannelHandler和ChannelHandlerContext的高级用法

通过调用ChannelHandlerContext上的pipeline()方法来获得被封闭的ChannelPipeline的引用,这使得运行时得以操作ChannelPipeline的ChannelHandler。如可以通过将ChannelHandler添加到ChannelPipeline中来实现动态的协议切换;还可以缓存ChannelHandlerContext的引用以供后面使用。

public class WriteHandler extends ChannelHandlerAdapter {
    private ChannelHandlerContext ctx;
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        //存储到 ChannelHandlerContext的引用以供稍后使用
        this.ctx = ctx;
    }
    public void send(String msg) {
        //使用之前存储的到 ChannelHandlerContext的引用来发送消息
        ctx.writeAndFlush(msg);
    }
}

因为一个ChannelHandler可以从属于多个ChannelPipeline,所以也可以绑定多个ChannelHandlerContext实例,但是ChannelHandler必须使用@Sharable注解,并且ChannelHandler必须是线程安全的。

看个错误示例:

@Sharable
public class UnsharableHandler extends ChannelInboundHandlerAdapter {
    private int count;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //将 count 字段的值加 1
        count++;
        //记录方法调用,并转发给下一个ChannelHandler
        System.out.println("inboundBufferUpdated(...) called the "
                + count + " time");
        ctx.fireChannelRead(msg);
    }
}

为什么要共享一个ChannelHandler?

主要是用于收集跨越多个Channel的统计信息。

异常处理

处理入站异常

要想处理入站异常,需要在自己的ChannelInboundHandler实现中重写exceptionCaught()方法。

public class InboundExceptionHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
        Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  • ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给ChannelPipeline中的下一个ChannelHandler;
  • 如果异常到达了ChannelPipeline的尾端,它将会被记录为未被处理;
  • 要想自定义处理逻辑,需要重写exceptionCaught()方法,然后决定是否需要将异常传播出去。

处理出站异常

用于处理出站操作中的正常完成以及异常完成的选项,都基于以下的通知机制:

  • 每个出站操作都将返回一个ChannelFuture。注册到ChannelFuture的ChannelFutureListener将在操作完成时被通知该操作是成功了还是失败了;
  • 几乎所有的ChannelOutboundHandler上的方法都会传入一个ChannelPromise的实例,作为ChannelFuture的子类,ChannelPromise也可以被分配用于异步通知的监听器。

添加ChannelFutureListener到ChannelFuture的代码如下:

public static void addingChannelFutureListener(){
    Channel channel = CHANNEL_FROM_SOMEWHERE; // get reference to pipeline;
    ByteBuf someMessage = SOME_MSG_FROM_SOMEWHERE; // get reference to pipeline;
    //...
    io.netty.channel.ChannelFuture future = channel.write(someMessage);
    future.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(io.netty.channel.ChannelFuture f) {
            if (!f.isSuccess()) {
                f.cause().printStackTrace();
                f.channel().close();
            }
        }
    });
}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

7.Netty--EventLoop和线程模型 Previous
5.Netty--ByteBuf Next