本文主要内容:

  • 解码器、编码器以及编解码器的概述
  • Netty的编解码器类

什么是编解码器

  • 编码器是将消息转换为适合于传输的格式(最有可能的就是字节流);

  • 解码器则是将网络字节流换回应用程序的消息格式;

  • 编码器操作出站数据,解码器处理入站数据;

解码器

  • 将字节码解码为消息———ByteToMessageDecoder和ReplayingDecoder;

  • 将一种消息类型解码为另一种———MessageToMessagedecoder

因为解码器是负责将入站数据从一种格式转换到另一种格式,所以Netty的解码器实现了ChannelInboundHandler。

每当需要为ChannelPipeline中的下一个ChannelInboundHanler转换入站数据时会用到解码器,可以将多个解码器链接在一起,以实现任意复杂的转换逻辑。

抽象类ByteToMessageDecoder

由于你不可能知道远程节点是否会一次性地发送一个完整的消息,所以这个类会对入站数据进行缓冲,直到它准备好处理。

ByteToMessageDecoder方法:

方法 描述
decode(ChannelHandlerContext ctx,ByteBuf in,List out) decode()方法被调用时将会传入一个包含了传入数据的ByteBuf,以及一个用来添加解码消息的List,对这个方法的调用将会重复进行,直到确定没有新的元素被添加到该List,或者该ByteBuf中没有更多可读取的字节时为止。然后,如果该List不为空,那么它的内容将会被传递给ChannelPipeline中的下一个ChannelInboundHandler。
decodeLast(ChannelHandlerContext ctx,ByteBuf in,Listsout) 默认实现只是简单地调用了decode()方法,当Channel的状态变为非活动时,这个方法将会被调用一次,可以重写该方法来提供特殊的处理,比如用来产生一个LastHttpContent消息

示例:假设你接收了一个包含简单int的字节流,每个int都需要被单独处理,在这种情况下,你需要从入站ByteBuf中读取每个int,并将它传递给ChannelPipeline中的下一个ChannelInboundHandler。为了解码这个字节流,你要扩展ByteToMessageDecoder类。

在这里插入图片描述

public class ToIntegerDecoder extends ByteToMessageDecoder {
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in,
        List<Object> out) throws Exception {
        //检查是否至少有 4 字节可读(一个 int 的字节长度)
        if (in.readableBytes() >= 4) {
            //从入站 ByteBuf 中读取一个 int,并将其添加到解码消息的 List 中
            out.add(in.readInt());
        }
    }
}

抽象类ReplayingDecoder

ReplayingDecoder扩展了ByteToMessageDecoder类,不必调用readableBytes()方法。它通过使用一个自定义的ByteBuf也就是ReplayingDecoderByteBuf来实现,包装传入的ByteBuf实现了这一点,readableBytes()方法将在内部执行。

public class ToIntegerDecoder2 extends ReplayingDecoder<Void> { // 扩展ReplayingDecoder<Void>以将字节解码为消息
    @Override // 传入的ByteBuf 是ReplayingDecoderByteBuf
    public void decode(ChannelHandlerContext ctx, ByteBuf in,List<Object> out) throws Exception { 
    	//从入站ByteBuf中读取一个int,并将其添加
        out.add(in.readInt());
    }
}

如果没有足够的字节可用,这个readInt()方法的实现将会抛出一个Error(实际上是Signal),其将在基类中被捕获并处理,当有更多的数据可供读取时,该decode()方法将会被再次调用。
这里有一个简单的准则:如果使用ByteToMessageDecoder 不会引入太多的复杂性,那么请使用它;否则,请使用ReplayingDecoder。

抽象类MessageToMessageDecoder

这里我们将编写一个IntegerToStringDecoder解码器来扩展MessageToMessageDecoder,尖括号里面的参数代表了输入参数的类型。
在这里插入图片描述
代码如下:

public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
    @Override
    public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        out.add(String.valueOf(msg));
    }
}

TooLongFrameException类

Netty提供了TooLongFrameException类,其将由解码器在帧超出指定的大小限制时抛出。

public class SafeByteToMessageDecoder extends ByteToMessageDecoder {
    private static final int MAX_FRAME_SIZE = 1024;
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in,
        List<Object> out) throws Exception {
        int readable = in.readableBytes();
        if (readable > MAX_FRAME_SIZE) { // 检查缓冲区中是否有超过MAX_FRAME_SIZE个字节
            in.skipBytes(readable); // 跳过所有的可读字节,抛出TooLongFrameException 并通知ChannelHandler
            throw new TooLongFrameException("Frame too big!");
        }
        // do something
        ...
    }
}

编码器

编码器实现了ChannelOutBoundHandler,并将出站数据从一种格式转换为另一种格式。Netty提供了一组类,用于帮助你编写具有以下功能的编码器:
将消息编码为字节;
将消息编码为另一种格式的消息;

抽象类MessageToByteEncoder

在这里插入图片描述
这个类只有一个方法,而解码器有两个,这是因为解码器通常需要在Channel关闭之后产生最后一个消息,因此也就有了decodeLast()方法,但是编码器就没必要在连接被关闭后仍产生消息了。
示例:接收一个Short类型的实例作为消息,将它编码为Short的原子类型,并将它写入ByteBuf中,其将随后被转发给ChannelPipeline中的下一个ChannelOutboundHandler,每个传出的Short值都将会被占用ByteBuf中的2字节。
在这里插入图片描述
代码如下:

public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
    @Override
    public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out)throws Exception {
        out.writeShort(msg);
    }
}

抽象类MessageToMessageEncoder

示例:使用IntegerToStringEncoder扩展了MessageToMessageEncoder,设计如图所示:
在这里插入图片描述
代码如下:

public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> {
    @Override
    public void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        out.add(String.valueOf(msg));
    }
}

抽象的编解码器类

抽象类ByteToMessageCodec

应用场景:我们需要将字节解码为某种形式的消息,随后再次对它进行编码。
在这里插入图片描述

抽象类MessageToMessageCodec

在这里插入图片描述
下面代码展示了Web浏览器与服务器之间的双向通信可能的实现方式:我们的WebSocketConvertHandler在参数化MessageToMessageCodec时将使用INBOUN_IN类型的WebSocketFrame,以及OUTBOUND_IN类型的MyWebSocketFrame,后者是WebSocketConvertHandler本身的一个静态嵌套类。

public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {
    @Override // 将MyWebSocketFrame 编码为指定的WebSocketFrame 子类型
    protected void encode(ChannelHandlerContext ctx, WebSocketConvertHandler.MyWebSocketFrame msg, List<Object> out) throws Exception {
        ByteBuf payload = msg.getData().duplicate().retain();
        switch (msg.getType()) { // 实例化一个指定子类型的WebSocketFrame
            case BINARY:
                out.add(new BinaryWebSocketFrame(payload));
                break;
            case TEXT:
                out.add(new TextWebSocketFrame(payload));
                break;
            case CLOSE:
                out.add(new CloseWebSocketFrame(true, 0, payload));
                break;
            case CONTINUATION:
                out.add(new ContinuationWebSocketFrame(payload));
                break;
            case PONG:
                out.add(new PongWebSocketFrame(payload));
                break;
            case PING:
                out.add(new PingWebSocketFrame(payload));
                break;
            default:
                throw new IllegalStateException(
                        "Unsupported websocket msg " + msg);
        }
    }
    @Override // 将WebSocketFrame 解码为MyWebSocketFrame,并设置FrameType
    protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
        ByteBuf payload = msg.content().duplicate().retain();
        if (msg instanceof BinaryWebSocketFrame) {
            out.add(new MyWebSocketFrame(
                    MyWebSocketFrame.FrameType.BINARY, payload));
        } else
        if (msg instanceof CloseWebSocketFrame) {
            out.add(new MyWebSocketFrame (
                    MyWebSocketFrame.FrameType.CLOSE, payload));
        } else
        if (msg instanceof PingWebSocketFrame) {
            out.add(new MyWebSocketFrame (
                    MyWebSocketFrame.FrameType.PING, payload));
        } else
        if (msg instanceof PongWebSocketFrame) {
            out.add(new MyWebSocketFrame (
                    MyWebSocketFrame.FrameType.PONG, payload));
        } else
        if (msg instanceof TextWebSocketFrame) {
            out.add(new MyWebSocketFrame (
                    MyWebSocketFrame.FrameType.TEXT, payload));
        } else
        if (msg instanceof ContinuationWebSocketFrame) {
            out.add(new MyWebSocketFrame (
                    MyWebSocketFrame.FrameType.CONTINUATION, payload));
        } else
        {
            throw new IllegalStateException(
                    "Unsupported websocket msg " + msg);
        }
    }
    //声明WebSocketConvertHandler所使用的OUTBOUND_IN类型
    public static final class MyWebSocketFrame {
    	//定义拥有被包装的有效负载的WebSocketFrame的类型
        public enum FrameType {
            BINARY,
            CLOSE,
            PING,
            PONG,
            TEXT,
            CONTINUATION
        }
        private final FrameType type;
        private final ByteBuf data;
        public MyWebSocketFrame(FrameType type, ByteBuf data) {
            this.type = type;
            this.data = data;
        }
        public FrameType getType() {
            return type;
        }
        public ByteBuf getData() {
            return data;
        }
    }
}

CombinedChannelDuplexHandler类

结合一个解码器和编码器可能会对可重用性造成影响。但是,有一种方法既能够避免这种惩罚,又不会牺牲将一个解码器和一个编码器作为一个单独的单元部署所带来的便利性。CombinedChannelDuplexHandler 提供了这个解决方案,其声明为:

public class CombinedChannelDuplexHandler <I extends ChannelInboundHandler,O extends ChannelOutboundHandler>

首先看看ByteToCharDecoder类:

public class ByteToCharDecoder extends ByteToMessageDecoder {
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in,
        List<Object> out) throws Exception {
        while (in.readableBytes() >= 2) {
            out.add(in.readChar());
        }
    }
}

这里的decode()方法一次将从ByteBuf中提取2字节,并将它们作为char写入到List中,其将会被自动装箱为Character对象。
再来看看CharToByteEncoder类:

public class CharToByteEncoder extends MessageToByteEncoder<Character> {
    @Override
    public void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception {
        out.writeChar(msg);
    }
}

既然我们有了编码器和解码器,我们可以将它们结合起来构建一个编解码器:

//通过该编码器和解码器实现参数化CombinnedByteCharCodec
public class CombinedByteCharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
    public CombinedByteCharCodec() {
    	//将委托实例传递给父类
        super(new ByteToCharDecoder(), new CharToByteEncoder());
    }
}