本文主要内容:

  • OIO:阻塞传输
  • NIO:异步传输
  • Local:JVM内部的异步通信
  • Embedded:测试你的ChannelHandler

案例研究:传输迁移

不通过Netty使用OIO和NIO

未使用Netty的阻塞网络编程代码如下:

public class PlainOioServer {
    public void serve(int port) throws IOException {
        //将服务器绑定到指定端口
        ServerSocket socket = new ServerSocket(port);
        try {
            for(;;){
                //接收连接
                final Socket clientSocket = socket.accept();
                System.out.println("Accepted connection from "+ clientSocket);
                //创建一个新的线程来处理该连接
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        OutputStream out;
                        try {
                            out = clientSocket.getOutputStream();
                            out.write("Hi!\r\n".getBytes(CharsetUtil.UTF_8));
                            //关闭连接
                            clientSocket.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        finally {
                            try {
                                clientSocket.close();
                            } catch (IOException e) {
                                
                            }
                        }
                    }
                }).start();//启动线程
            }
        }
        catch (IOException e){
            e.printStackTrace();
        }
    }
}

未使用Netty的异步网络编程代码如下:

public class PlainNioServer {
    public void serve(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket serverSocket = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        //将服务器绑定到选定的端口
        serverSocket.bind(address);
        //打开Selector来处理Channel
        Selector selector = Selector.open();
        //将ServerSocket注册到Selector来接收连接
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
        //等待需要处理的新事件,阻塞将一直持续到下一个传入事件
        for (;;){
            try {
                selector.select();
            }catch (IOException e){
                e.printStackTrace();
                break;
            }
            //获取所有接收事件的SelectorKey实例
            Set<SelectionKey> readKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readKeys.iterator();
            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    //检测事件是否是一个新的并且已经就绪可以被接收的连接
                    if(key.isAcceptable()){
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        //接收客户端,并将它注册到选择器
                        client.register(selector,SelectionKey.OP_WRITE | 
                                SelectionKey.OP_READ,msg.duplicate());
                        System.out.println("Accepted connection from " + client);
                    }
                    //检查套接字是否已经准备好写数据
                    if(key.isWritable()){
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        while(buffer.hasRemaining()){
                            //将数据写到已连接的客户端
                            if(client.write(buffer) == 0){
                                break;
                            }
                        }
                        //关闭连接
                        client.close();
                    }
                }catch (IOException e){
                    key.cancel();
                    try {
                        key.channel().close();
                    }catch (IOException ex){
                        
                    }
                }
            }
        }
    }
}

通过Netty使用OIO和NIO

使用Netty的阻塞网络处理代码如下:

public class NettyOioServer {
    public void server(int port) throws InterruptedException {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8));
        OioEventLoopGroup group = new OioEventLoopGroup();
        try {
            //创建ServerBootstrap
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    //使用OioEventLoopGroup以允许阻塞模式
                    .channel(OioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    //指定ChannelInitializer,对于每个已接收的连接都调用它
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //添加一个ChannelInboundHandlerAdapter以拦截和处理事件
                            socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelActive(ChannelHandlerContext ctx){
                                    //将消息写到客户端,并添加ChannelFutureListener,以便消息一被写完就关闭连接
                                    ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
                                }
                            });
                        }
                    });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully().sync();
        }
        
    }
}

非阻塞的Netty版本

public class NettyNioServer {
    public void server(int port) throws InterruptedException {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8));
        NioEventLoopGroup group = new NioEventLoopGroup();//
        try {
            //创建ServerBootstrap
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    //使用OioEventLoopGroup以允许阻塞模式
                    .channel(NioServerSocketChannel.class)//
                    .localAddress(new InetSocketAddress(port))
                    //指定ChannelInitializer,对于每个已接收的连接都调用它
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //添加一个ChannelInboundHandlerAdapter以拦截和处理事件
                            socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelActive(ChannelHandlerContext ctx){
                                    //将消息写到客户端,并添加ChannelFutureListener,以便消息一被写完就关闭连接
                                    ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
                                }
                            });
                        }
                    });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully().sync();
        }
    }
}

我们发现使用Netty框架,从阻塞传输切换到非阻塞传输代码要做的更改非常小。

传输API

传输API 的核心就是Channel接口,它被用于所有的I/O操作,其层次结构如下图所示:

在这里插入图片描述

如图所示,每个Channel都将会被分配一个ChannelPipeline和ChannelConfig,ChannelConfig包含了该Channel的所有配置设置,并且支持热更新

ChannelPipeline持有所有将应用于入站和出站数据以及事件的ChannelHandler实例,这些ChannelHandler实现了应用程序用于处理状态变化以及数据处理的逻辑

ChannelHandler的典型用途包括:

  • 将数据从一种格式转换为另一种格式:
  • 提供异常的通知;
  • 提供Channel变为活动的或者非活动的通知;
  • 提供当Channel注册到EventLoop或者从EventLoop注销时的通知;
  • 提供有关用户自定义事件的通知。

channel的方法:

方法名 描述
eventLoop 返回分配给Channel的EventLoop
pipeline 返回分配给Channel的ChannelPipeline
isActive 如果Channel是活动的,则返回true
localAddress 返回本地的SocketAddress
remoteAddress 返回远程的SocketAddress
write 将数据写到缓冲区,这个数据将被传递给ChannelPipeline,并且排队直到它被冲刷
flush 将缓冲区数据冲刷到底层传输,如一个Socket
writeAndFlush 等同于调用write()并接着调用flush()方法

前面我们也提到过,Netty的Channel实现是线程安全的,因此我们可以存储一个到Channel的引用,并且每当需要向远程节点写数据时,都可以使用它。

代码如下:

public class ChannelOperationExamples {
    private static final Channel CHANNEL_FROM_SOMEWHERE = new NioSocketChannel();  
	public static void writingToChannelFromManyThreads() {
        // 存储一个Channel的引用
        final Channel channel = CHANNEL_FROM_SOMEWHERE; 
        //创建持有要写数据的ByteBuf
        final ByteBuf buf = Unpooled.copiedBuffer("your data",
                CharsetUtil.UTF_8);
        //创建将数据写到Channel 的 Runnable
        Runnable writer = new Runnable() {
            @Override
            public void run() {
                channel.write(buf.duplicate());
            }
        };
        //获取到线程池Executor 的引用
        Executor executor = Executors.newCachedThreadPool();

        //递交写任务给线程池以便在某个线程中执行
        executor.execute(writer);

        //递交另一个写任务以便在另一个线程中执行
        executor.execute(writer);
        //...
    }
}

内置的传输

名称 描述
NIO 基于选择器的方式
Epoll 由JNI驱动的epoll()和非阻塞IO,支持只有在Linux上可用的多种特性,如SO_REUSEPORT,比NIO传输更快,完全非阻塞
OIO 阻塞IO
Local 可以在VM内部通过管道进行通信的本地传输
Embedded 允许使用ChannelHandler而又不需要一个真正的基于网络的传输,主要用于测试

NIO

NIO提供了一个所有I/O操作的全异步的实现,利用选择器来获取Channel状态改变时的通知,可能的状态变化有:

  • 新的Channel已被接收并且就绪;
  • Channel连接已经完成;
  • Channel有已经就绪的可供读取的数据;
  • Channel可用于写数据。

选择器运行在一个检查状态变化并对其做出响应的线程上,在应用程序对状态的改变作出响应之后,选择器将会被重置,并将重复这个过程。

选择操作的位模式:

名称 描述
OP_ACCEPT 请求在接收新连接并创建Channel时获得通知
OP_CONNECT 请求在建立一个连接时获得通知
OP_READ 请求当数据已经就绪,可以从Channel中读取时获得通知
OP_WRITE 请求当可以向Channel中写入更多的数据时获得通知

处理流程如下:

在这里插入图片描述

知识点扩展:

零拷贝:零拷贝时一种目前只有在使用NIO和Epoll传输时才可使用的特性,可以快速高效地将数据从文件系统移动到网络接口,而不需要将其从内核空间复制到用户空间,因此CPU不需要为数据在内存之间的拷贝消耗资源。但是它对于实现了数据加密或者压缩的文件系统是不可用的,只能传输文件的原始内容。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

5.Netty--ByteBuf Previous
3.Netty--Netty的组件和设计 Next