本文主要内容:

  • 单元测试
  • EmbeddedChannel概述
  • 使用EmbeddedChannel测试ChannelHandler

单元测试的标准:不仅能够证明你的实现是正确的,而且还要能够很容易地隔离那些因修改代码而突然出现的问题。

因为正在被测试的代码模块或者单元将在它正常的运行环境之外被执行,所以还需要一个框架或者脚手架以便在其中运行它(这里我们选择的是JUnit4)。

EmbeddedChannel概述

我们的业务逻辑是通过将ChannelPipeline中的ChannelHandler实现链接在一起来完成的,每个ChannelHandler都将处理一个明确定义的任务或者是步骤。

Netty提供了EmbeddedChannel,用于测试ChannelHandler,其原理就是将入站数据或者出站数据写入EmbeddedChannel中,然后检查是否有任何东西到达了ChannelPipeline的尾端,以这种方式便可以确定消息是否已经被编译或者被解码过了,以及是否触发了任何的ChannelHandler动作。

EmbeddedChannel方法:

名称 描述
writeInbound(Object…msgs) 将入站消息写入到EmbeddedChannel中,如果可以通过readInbound()方法从EmbeddedChannel中读取数据,则返回true
readInbound() 从EmbeddedChannel中读取一个入站消息,任何返回的消息都经过了整个ChannelPipeline,如果没有数据则返回null
writeOutbound(…) 将出站消息写入到EmbeddedChannel中,如果可以通过readOutbound()方法从EmbeddedChannel中读取数据,则返回true
readOutbound(…) 从EmbeddedChannel中读取一个出站消息,任何返回的消息都经过了整个ChannelPipeline,如果没有数据则返回null
finish() 结束EmbeddedChannel,如果里面有任何类型的可读数据都会返回true,它也会调用Channel的close方法

EmbeededChannel的处理过程如图所示:

img

使用EmbeededChannel测试ChannelHandler

JUnit断言

org.junit.Assert类提供了很多用于测试的静态方法,失败的断言将导致一个异常被抛出,并将终止当前正在执行的测试。

测试入站消息

接下来实现一个简单的ByteToMessageDecoder,给定足够的数据,然后这个解码器将产生固定大小的帧,如果没有足够的数据可供的读取,它将等待下一个数据块的到来,并将再次检查是否能够产生一个新的帧。

我们代码是这个解码器产生固定为3个字节大小的帧,它可能会需要多个事件来提供足够的字节数来产生一个帧,最终每个帧都会被传递给ChannelPipeline中的下一个ChannelHandler。

public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
    private final int frameLength;

    //指定要生成的帧的长度
    public FixedLengthFrameDecoder(int frameLength) {
        if (frameLength <= 0) {
            throw new IllegalArgumentException(
                "frameLength must be a positive integer: " + frameLength);
        }
        this.frameLength = frameLength;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
        List<Object> out) throws Exception {
        //检查是否有足够的字节可以被读取,以生成下一个帧
        while (in.readableBytes() >= frameLength) {
            //从 ByteBuf 中读取一个新帧
            ByteBuf buf = in.readBytes(frameLength);
            //将该帧添加到已被解码的消息列表中
            out.add(buf);
        }
    }
}

现在创建一个单元测试:

public class FixedLengthFrameDecoderTest {
    @Test                       
    public void testFramesDecoded(){
        ByteBuf buffer = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buffer.writeByte(i);
        }
        ByteBuf duplicate = buffer.duplicate();
        //创建一个EmbeddedChannel,并添加一个FixedLengthFramesDecoded,将其以3字节的帧长度被测试
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));
        //将数据写入EmbeddedChannel
        Assert.assertTrue(embeddedChannel.writeInbound(duplicate.retain()));
        //标记Channel为已完成的状态
        Assert.assertTrue(embeddedChannel.finish());
        
        //读取所生成的消息,并且验证是否有3帧,其中每帧都为3字节
        ByteBuf read = (ByteBuf) embeddedChannel.readInbound();
        Assert.assertEquals(buffer.readSlice(3),read);
        for (int i = 0; i < read.capacity(); i++) {
            System.out.println(read.getByte(i));
        }
        read.release();

        read = (ByteBuf)embeddedChannel.readInbound();
        Assert.assertEquals(buffer.readSlice(3),read);
        read.release();

        read = embeddedChannel.readInbound();
        Assert.assertEquals(buffer.readSlice(3),read);
        read.release();

        Assert.assertNull(embeddedChannel.readInbound());
        buffer.release();
    }
    @Test
    public void testFramesDecoded2(){
        ByteBuf buffer = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buffer.writeByte(i);
        }
        ByteBuf duplicate = buffer.duplicate();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));
        //返回false,因为没有一个完整的可供读取的帧
        Assert.assertFalse(embeddedChannel.writeInbound(buffer.readBytes(2)));
        Assert.assertTrue(embeddedChannel.writeInbound(buffer.readBytes(7)));
        Assert.assertTrue(embeddedChannel.finish());
        //读取所生成的消息,并且验证是否有3帧,其中每帧都为3字节
        ByteBuf read = (ByteBuf) embeddedChannel.readInbound();
        Assert.assertEquals(buffer.readSlice(3),read);
        for (int i = 0; i < read.capacity(); i++) {
            System.out.println(read.getByte(i));
        }
        read.release();

        read = (ByteBuf)embeddedChannel.readInbound();
        Assert.assertEquals(buffer.readSlice(3),read);
        read.release();

        read = embeddedChannel.readInbound();
        Assert.assertEquals(buffer.readSlice(3),read);
        read.release();

        Assert.assertNull(embeddedChannel.readInbound());
        buffer.release();
    }
}

测试出站消息

利用EmbeddedChannel来测试一个编码器形式的ChannelOutboundHandler,这里我们只是简单的将负数转换为绝对值。

步骤:

  • 持有AbsIntegerEncoder的EmbeddedChannel将会以4字节的负整数的形式写出站数据;
  • 编码器将从传入的ByteBuf中读取每个负整数,并将会调用Math.abs()方法来获取其绝对值;
  • 编码器将会把每个负整数的绝对值写到ChannelPipeline中

代码如下:

public class AbsIntegerEncoder extends
    MessageToMessageEncoder<ByteBuf> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext,
        ByteBuf in, List<Object> out) throws Exception {
        //检查是否有足够的字节用来编码
        while (in.readableBytes() >= 4) {
            //从输入的 ByteBuf中读取下一个整数,并且计算其绝对值
            int value = Math.abs(in.readInt());
            //将该整数写入到编码消息的 List 中
            out.add(value);
        }
    }
}

使用了EmbeddedChannel来测试代码:

public class AbsIntegerEncoderTest {
    @Test
    public void testEncoded(){
        //创建一个ByteBuf,并且写入9个负整数
        ByteBuf buf = Unpooled.buffer();
        for (int i = 1; i < 10; i++) {
            buf.writeInt(i * -1);
        }
        //创建一个EmbeddedChannel,并安装一个要测试的AbsIntegerEncoder
        EmbeddedChannel channel = new EmbeddedChannel(new AbsIntegerEncoder());
        //写入ByteBuf,并断言调用readOutbound()方法将会产生数据
        assertTrue(channel.writeOutbound(buf));
        //将该Channel标记为已完成状态
        assertTrue(channel.finish());
        for (int i = 1; i < 10; i++) {
            assertEquals(i,channel.readOutbound());
        }
        assertNull(channel.readOutbound());
    }
}

测试异常处理

示例:如果所读取的字节数超出了某个特定的限制,我们将会抛出一个TooLongFrameException,这是一种经常用来防范资源被耗尽的方法。

public class FrameChunkDecoder extends ByteToMessageDecoder {
    private final int maxFrameSize;

    //指定将要产生的帧的最大允许大小
    public FrameChunkDecoder(int maxFrameSize) {
        this.maxFrameSize = maxFrameSize;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
        List<Object> out)
        throws Exception {
        int readableBytes = in.readableBytes();
        if (readableBytes > maxFrameSize) {
            // discard the bytes
            //如果该帧太大,则丢弃它并抛出一个 TooLongFrameException……
            in.clear();
            throw new TooLongFrameException();
        }
        //……否则,从 ByteBuf 中读取一个新的帧
        ByteBuf buf = in.readBytes(readableBytes);
        //将该帧添加到解码 读取一个新的帧消息的 List 中
        out.add(buf);
    }
}

然后使用EmbeddedChannel来测试之前的代码是否正确:

public class FrameChunkDecoderTest {
    @Test
    public void testFramesDecoded(){
        //创建一个ByteBuf,并向它写入9字节
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        ByteBuf input = buf.duplicate();
        //创建一个EmbeddedChannel,并向其安装一个帧大小为3字节的FrameChunkDecoder
        EmbeddedChannel channel = new EmbeddedChannel(new FrameChunkDecoder(3));
        //向它写入2字节,并断言它们将会产生一个新帧
        assertTrue(channel.writeInbound(2));
        try {
            //写入4字节大小的帧,并捕获预期的TooLongFrameException
            channel.writeInbound(input.readBytes(4));
            //如果上面没有捕获,那么就会到达这个断言,并且测试失败
            fail();
        }catch (TooLongFrameException e){
            //
        }
        //因为之前的异常拦截里面没有做任何处理,所以程序可以继续执行,写入剩余的2字节,并断言将会产生一个有效帧
        assertTrue(channel.writeInbound(input.readBytes(3)));
        //将该Channel标记为已完成状态
        assertTrue(channel.finish());
        //读取产生的消息,并且验证值
        ByteBuf read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(2),read);
        read.release();

        read = (ByteBuf)channel.readInbound();
        assertEquals(buf.skipBytes(4).readSlice(3),read);
        read.release();
        buf.release();
    }
}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

10.Netty--编解码器框架 Previous
8.Netty--引导 Next