本文主要内容:

  • 通过SSL/TLS保护Netty应用程序;
  • 构建基于Netty的HTTP/HTTPS应用程序
  • 处理空闲的连接和超时
  • 解码基于分隔符的协议和基于长度的协议
  • 写大型数据
  • Netty为许多通用协议提供了编解码器和处理器,几乎可以开箱即用。

通过SSL/TLS保护Netty应用程序

为了支持SSL/TLS,Java提供了javax.net.ssl包,它的SSLContext和SSLEngine类使得实现解密和加密相当简单直接。Netty通过一个名为SslHandler的ChannelHandler实现利用了这个API,其中SslHandler在内部使用了SSLEngine来完成实际的工作。
下图是通过SslHandler进行解密和加密的数据流。
在这里插入图片描述
Netty还提供了使用OpenSSL工具包的SSLEngine实现,该类提供了比JDK提供的SSLEngine具有更好的性能。
下面的代码展示了如何使用ChannelInitializer来将SslHandler添加到ChannelPipeline中。

public class SslChannelInitializer extends ChannelInitializer<Channel>{
    private final SslContext context; // 传入要使用的SslContext
    private final boolean startTls; // 如果设置为true,第一个写入的消息将不会被加密(客户端应该设置为true)
    public SslChannelInitializer(SslContext context,boolean startTls) {
        this.context = context;
        this.startTls = startTls;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        // 对于每个SslHandler实例,都使用Channel的ByteBufAllocator从SslContext获取一个新的SSLEngine
        SSLEngine engine = context.newEngine(ch.alloc());
        // 将SslHandler 作为第一个ChannelHandler 添加到ChannelPipeline 中
        ch.pipeline().addFirst("ssl",new SslHandler(engine, startTls));
    }
}

在大多数情况下,SslHandler将是ChannelPipeline中的第一个ChannelHandler,这确保了只有在所有其他的ChannelHandler将他们的逻辑应用到数据之后,才会进行加密。
SslHandler提供的一些方法:
在这里插入图片描述

构建基于Netty的HTTP/HTTPS应用程序

HTTP解码器、编码器和编解码器

HTTP是基于请求/响应模式的:客户端向服务器发送一个HTTP请求,然后服务器将会返回一个HTTP响应。
下图分别展示了生产和消费HTTP请求和HTTP响应的方法:
HTTP 请求的组成部分:
HTTP 请求的组成部分
HTTP 响应的组成部分:
在这里插入图片描述
下面的代码展示了将HTTP支持添加到你的应用程序,几乎只需要将正确的ChannelHandler添加到ChannelPipeline中。

public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
    private final boolean client;
    public HttpPipelineInitializer(boolean client) {
        this.client = client;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (client) { 
        	// 如果是客户端,则添加HttpResponseDecoder来处理来自服务器的响应
            pipeline.addLast("decoder", new HttpResponseDecoder());
            //如果是客户端,则添加HttpRequestEncoder来向服务器发送请求
            pipeline.addLast("encoder", new HttpRequestEncoder());
        } else {
        	//如果是服务器,则添加HttpRequestDecoder来接收来自客户端的请求
            pipeline.addLast("decoder", new HttpRequestDecoder());
            //如果是服务器,则添加HttpResponseEncoder以向客户端发送响应
            pipeline.addLast("encoder", new HttpResponseEncoder());
        }
    }
}

聚合HTTP消息

由于HTTP 的请求和响应可能由许多部分组成,因此你需要聚合它们以形成完整的消息。为了消除这项繁琐的任务,Netty 提供了一个聚合器,它可以将多个消息部分合并为FullHttpRequest 或者FullHttpResponse 消息。
引入这种自动聚合机制只不过是向ChannelPipeline中添加另外一个ChannelHandler罢了。
代码如下:

/**
* 自动聚合HTTP 的消息片段
*/
public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
    private final boolean isClient;
    public HttpAggregatorInitializer(boolean isClient) {
        this.isClient = isClient;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec());
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
        }
        //将最大的消息大小为512KB的HttpObjectAggregator添加到ChannelPipeline
        pipeline.addLast("aggregator",new HttpObjectAggregator(512 * 1024));
    }
}

HTTP压缩

Netty 为压缩和解压缩提供了ChannelHandler 实现,它们同时支持gzip 和deflate 编码。
客户端可以通过提供以下头部信息来指示服务器它所支持的压缩格式:

GET /encrypted-area HTTP/1.1
Host: www.example.com
Accept-Encoding: gzip, deflate

注意:服务器不需要压缩它所发送的数据
下面代码展示了自动压缩HTTP消息

/**
* 自动压缩HTTP 消息
*/
public class HttpCompressionInitializer extends ChannelInitializer<Channel> {
    private final boolean isClient;
    public HttpCompressionInitializer(boolean isClient) {
        this.isClient = isClient;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec());
            // 如果是客户端,则添加HttpContentDecompressor 以处理来自服务器的压缩内容.
            pipeline.addLast("decompressor",new HttpContentDecompressor());
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
            // 如果是服务器,则添加HttpContentCompressor来压缩数据(如果客户端支持它)
            pipeline.addLast("compressor",new HttpContentCompressor());
        }
    }
}

使用HTTPS

启用HTTPS只需要将SslHandler添加到ChannelPipeline的ChannelHandler组合中。
代码如下:

public class HttpsCodecInitializer extends ChannelInitializer<Channel> {
    private final SslContext context;
    private final boolean isClient;
    public HttpsCodecInitializer(SslContext context, boolean isClient) {
        this.context = context;
        this.isClient = isClient;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        SSLEngine engine = context.newEngine(ch.alloc());
        //将SslHandler添加到ChannelPipeline中以使用HTTPS
        pipeline.addFirst("ssl", new SslHandler(engine));
        //如果是客户端,则添加HttpClientCodec
        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec());
        //如果是服务器,则添加HttpServerCodec
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
        }
    }
}

WebSocket

WebSocket为网页和远程服务器之间的双向通信提供了一种替代HTTP轮询的方案。
如果想要在应用程序中添加对于WebSocket的支持,只需要将适当的客户端或者服务器WebSocketChannelHandler添加到ChannelPipeline中,这个类将处理由WebSocket定义的称为帧的特殊消息类型。
WebSocket协议如下:
在这里插入图片描述
WebSocketFrame可以被归类于数据帧或者控制帧,主要类型如下:
在这里插入图片描述
在服务器端支持WebSocket的代码如下:

public class WebSocketServerInitializer extends ChannelInitializer<Channel>{
    @Override
        ch.pipeline().addLast(
        new HttpServerCodec(),
        new HttpObjectAggregator(65536), // 为握手提供聚合的HttpRequest
        new WebSocketServerProtocolHandler("/websocket"), // 如果被请求的端点是"/websocket",则处理该升级握手
        new TextFrameHandler(), // TextFrameHandler 处理TextWebSocketFrame
        new BinaryFrameHandler(),
        new ContinuationFrameHandler());
    }
    public static final class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx,TextWebSocketFrame msg) throws Exception {
            // Handle text frame
        }
    }
    public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx,BinaryWebSocketFrame msg) throws Exception {
            // Handle binary frame
        }
    }
    public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx,ContinuationWebSocketFrame msg) throws Exception {
            // Handle continuation frame
        }
    }
}

想要为WebSocket添加安全性,只需要将SslHandler作为第一个ChannelHandler添加到ChannelPipeline中。

空闲的连接和超时

检测空闲连接以及超时连接对于及时释放资源来说是至关重要的,Netty特地为它提供了几个ChannelHandler实现。
在这里插入图片描述
下列代码展示了当我们通常的发送心跳消息到远程节点的方法时,如果在60s内没有接收或者发送任何的数据,我们将如何得到通知;如果没有响应,则连接会被关闭。

public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // IdleStateHandler 将在被触发时发送一个IdleStateEvent 事件
        pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
        //将一个HeartbeatHandler添加到ChannelPipeline中
        pipeline.addLast(new HeartbeatHandler());
    }
    //实现userEventTriggered()方法以发送心跳消息
    public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {
    	//发送到远程节点的心跳消息
        private static final ByteBuf HEARTBEAT_SEQUENCE =Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1));
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx,Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) { 
            	// 发送心跳消息,并在发送失败时关闭该连接
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
                    .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else { 
            	// 不是IdleStateEvent事件,所以将它传递给下一个ChannelInboundHandler
                super.userEventTriggered(ctx, evt);
            }
        }
    }
}

上面这个示例演示了如何使用IdleStateHandler来测试远程节点是否仍然还活着,并且在它失活时通过关闭连接来释放资源。
如果连接超过60s没有接收或者发送任何的数据,那么IdleStateHandler将会使用一个IdleStateEvent事件来调用fireUserEventTriggered()方法。HeartbeatHandler实现了userEventTriggered()方法,如果这个方法检测到IdleSstateEvent事件,它将会发送心跳消息,并且添加一个将在发送操作失败时关闭该连接的ChannelFutureListener。

解码基于分隔符的协议和基于长度的协议

基于分隔符的协议

基于分隔符的(delimited)消息协议使用定义的字符来标记的消息或者消息段(通常被称为帧)的开头或者结尾。由RFC文档正式定义的许多协议(如SMTP、POP3、IMAP以及Telnet)都是这样的。
在这里插入图片描述
下图展示了当帧由行尾序列\r\n分割时是如何被处理的:
在这里插入图片描述
代码如下:

public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //该LineBasedFrameDecoder将提取的帧转发给下一个ChannelInboundHandler
        pipeline.addLast(new LineBasedFrameDecoder(64 * 1024));
        //添加FrameHandler来接收帧
        pipeline.addLast(new FrameHandler());
    }
    public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        //传入了单个帧的内容
        public void channelRead0(ChannelHandlerContext ctx,ByteBuf msg) throws Exception {
            // Do something with the data extracted from the frame
        }
    }
}

作为示例,我们将使用下面的协议规范:
1.传入数据流是一系列的帧,每个帧都由换行符(\n)分隔;
2.每个帧都由一系列的元素组成,每个元素都由单个空格字符分隔;
3.一个帧的内容代表一个命令,定义为一个命令名称后跟着数目可变的参数。
我们用于这个协议的自定义解码器将定义以下类:
1.Cmd—将帧(命令)的内容存储在ByteBuf 中,一个ByteBuf 用于名称,另一个用于参数;
2.CmdDecoder—从被重写了的decode()方法中获取一行字符串,并从它的内容构建一个Cmd 的实例;
3.CmdHandler —从CmdDecoder 获取解码的Cmd 对象,并对它进行一些处理;
4.CmdHandlerInitializer —为了简便起见,我们将会把前面的这些类定义为专门的ChannelInitializer 的嵌套类,其将会把这些ChannelInboundHandler 安装到ChannelPipeline 中。

public class CmdHandlerInitializer extends ChannelInitializer<Channel> {
    final byte SPACE = (byte)' ';
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new CmdDecoder(64 * 1024)); // 添加CmdDecoder 以提取Cmd 对象,并将它转发给下一个ChannelInboundHandler
        pipeline.addLast(new CmdHandler()); // 添加CmdHandler 以接收和处理Cmd 对象
    }
    public static final class Cmd {
        private final ByteBuf name;
        private final ByteBuf args;
        public Cmd(ByteBuf name, ByteBuf args) {
            this.name = name;
            this.args = args;
        }
        public ByteBuf name() {
            return name;
        }
        public ByteBuf args() {
            return args;
        }
    }
    public static final class CmdDecoder extends LineBasedFrameDecoder {
        public CmdDecoder(int maxLength) {
            super(maxLength);
        }
        @Override
        protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        	//从ByteBuf中提取由行尾符序列分割的帧
            ByteBuf frame = (ByteBuf) super.decode(ctx, buffer);
            if (frame == null) {
                return null;
            }
            // 查找第一个空格字符的索引。前面是命令名称,接着是参数
            int index = frame.indexOf(frame.readerIndex(),frame.writerIndex(), SPACE);
            // 使用包含有命令名称和参数的切片创建新的Cmd 对象
            return new Cmd(frame.slice(frame.readerIndex(), index),frame.slice(index + 1, frame.writerIndex()));
        }
    }
    public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, Cmd msg) throws Exception {
            // Do something with the command(获取Cmd对象进一步操作)
        }
    }
}

基于长度的协议

在这里插入图片描述
在这里插入图片描述
如果遇到被编码到消息头部的帧大小不是固定值的协议,为了处理这种变长帧,可以使用LengthFieldBasedFrameDecoder,它将从头部字段确定帧长,然后从数据流中提取指定的字节数。
在这里插入图片描述
代码如下:

public class LengthBasedInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new LengthFieldBasedFrameDecoder(64 * 1024, 0, 8));
        pipeline.addLast(new FrameHandler());
    }
    public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx,ByteBuf msg) throws Exception {
            // Do something with the frame
        }
    }
}

写大型数据

由于写操作是非阻塞的,所在存在内存耗尽的风险,因此在写大型数据时,需要准备好处理到远程节点的连接是慢速连接的情况,这种情况会导致内存释放的延迟。
下面代码展示了如何通过从FileInputStream创建一个DefaultRegion,并将其写入Channel,从而利用零拷贝特性来传输一个文件的内容。

FileInputStream in = new FileInputStream(file);
// 以该文件的完整长度创建一个新的DefaultFileRegion
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length());
// 发送该DefaultFileRegion,并注册一个ChannelFutureListener
channel.writeAndFlush(region).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (!future.isSuccess()) {
            Throwable cause = future.cause(); // 处理失败
            // Do something
        }
    }
});

这个示例只适用于文件内容的直接传输,不包括应用程序对数据的任何处理。在需要将数据从文件系统复制到用户内存中时,可以使用ChunkedWriteHandler,它支持异步写大型数据流,而又不会导致大量的内存消耗。
在这里插入图片描述
下面代码展示了ChunkedStream的用法:

public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
    private final File file;
    private final SslContext sslCtx;
    public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
        this.file = file;
        this.sslCtx = sslCtx;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //将SslHandler添加到ChannelPipeline中
        pipeline.addLast(new SslHandler(sslCtx.newEngine(ch.alloc());
        //添加ChunkedWritHandler以处理作为ChunkedInput传入的数据
        pipeline.addLast(new ChunkedWriteHandler());
        // 一旦连接建立,WriteStreamHandler就开始写文件数据
        pipeline.addLast(new WriteStreamHandler()); 
    }
    public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {
    	//当连接建立时,channelActive()方法将使用ChunkedInput写文件数据
        @Override
        public void channelActive(ChannelHandlerContext ctx)throws Exception {
            super.channelActive(ctx);
            ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
        }
    }
}

序列化数据

JDK序列化

在这里插入图片描述

JBoss Marshalling序列化

在这里插入图片描述
下面代码展示了如何使用MarshallingDecoder和MarshallingEncoder:

public class MarshallingInitializer extends ChannelInitializer<Channel> {
    private final MarshallerProvider marshallerProvider;
    private final UnmarshallerProvider unmarshallerProvider;
    public MarshallingInitializer(UnmarshallerProvider unmarshallerProvider,MarshallerProvider marshallerProvider) {
        this.marshallerProvider = marshallerProvider;
        this.unmarshallerProvider = unmarshallerProvider;
    }
    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        //添加MarshallingDecoder以将ByteBuf转换为POJO
        pipeline.addLast(new MarshallingDecoder(unmarshallerProvider));
        //添加MarshallingEncoder以将POJO转换为ByteBuf
        pipeline.addLast(new MarshallingEncoder(marshallerProvider));
        // 添加ObjectHandler,以处理普通的实现了Serializable 接口的POJO
        pipeline.addLast(new ObjectHandler()); 
    }
    public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
        @Override
        public void channelRead0(ChannelHandlerContext channelHandlerContext,Serializable serializable) throws Exception {
            // Do something
        }
    }
}

Protocol Buffers序列化

Protocol Buffers 以一种紧凑而高效的方式对结构化的数据进行编码以及解码。它具有许多的编程语言绑定,使得它很适合跨语言的项目。(由Google公司开发的、现在已经开源的数据交换格式。)
在这里插入图片描述
代码:

public class ProtoBufInitializer extends ChannelInitializer<Channel> {
    private final MessageLite lite;
    public ProtoBufInitializer(MessageLite lite) {
        this.lite = lite;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        pipeline.addLast(new ProtobufEncoder()); ①
        pipeline.addLast(new ProtobufDecoder(lite));
        pipeline.addLast(new ObjectHandler());
    }
    public static final class ObjectHandler extends SimpleChannelInboundHandler<Object> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            // Do something with the object
        }
    }
}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

12.Netty--WebSocket Previous
10.Netty--编解码器框架 Next