本文主要内容:
- OIO:阻塞传输
- NIO:异步传输
- Local:JVM内部的异步通信
- Embedded:测试你的ChannelHandler
案例研究:传输迁移
不通过Netty使用OIO和NIO
未使用Netty的阻塞网络编程代码如下:
public class PlainOioServer {
public void serve(int port) throws IOException {
//将服务器绑定到指定端口
ServerSocket socket = new ServerSocket(port);
try {
for(;;){
//接收连接
final Socket clientSocket = socket.accept();
System.out.println("Accepted connection from "+ clientSocket);
//创建一个新的线程来处理该连接
new Thread(new Runnable() {
@Override
public void run() {
OutputStream out;
try {
out = clientSocket.getOutputStream();
out.write("Hi!\r\n".getBytes(CharsetUtil.UTF_8));
//关闭连接
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
finally {
try {
clientSocket.close();
} catch (IOException e) {
}
}
}
}).start();//启动线程
}
}
catch (IOException e){
e.printStackTrace();
}
}
}
未使用Netty的异步网络编程代码如下:
public class PlainNioServer {
public void serve(int port) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket serverSocket = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
//将服务器绑定到选定的端口
serverSocket.bind(address);
//打开Selector来处理Channel
Selector selector = Selector.open();
//将ServerSocket注册到Selector来接收连接
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
//等待需要处理的新事件,阻塞将一直持续到下一个传入事件
for (;;){
try {
selector.select();
}catch (IOException e){
e.printStackTrace();
break;
}
//获取所有接收事件的SelectorKey实例
Set<SelectionKey> readKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readKeys.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
try {
//检测事件是否是一个新的并且已经就绪可以被接收的连接
if(key.isAcceptable()){
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
//接收客户端,并将它注册到选择器
client.register(selector,SelectionKey.OP_WRITE |
SelectionKey.OP_READ,msg.duplicate());
System.out.println("Accepted connection from " + client);
}
//检查套接字是否已经准备好写数据
if(key.isWritable()){
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
while(buffer.hasRemaining()){
//将数据写到已连接的客户端
if(client.write(buffer) == 0){
break;
}
}
//关闭连接
client.close();
}
}catch (IOException e){
key.cancel();
try {
key.channel().close();
}catch (IOException ex){
}
}
}
}
}
}
通过Netty使用OIO和NIO
使用Netty的阻塞网络处理代码如下:
public class NettyOioServer {
public void server(int port) throws InterruptedException {
final ByteBuf buf = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8));
OioEventLoopGroup group = new OioEventLoopGroup();
try {
//创建ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(group)
//使用OioEventLoopGroup以允许阻塞模式
.channel(OioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
//指定ChannelInitializer,对于每个已接收的连接都调用它
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//添加一个ChannelInboundHandlerAdapter以拦截和处理事件
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx){
//将消息写到客户端,并添加ChannelFutureListener,以便消息一被写完就关闭连接
ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
}
});
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
}finally {
group.shutdownGracefully().sync();
}
}
}
非阻塞的Netty版本
public class NettyNioServer {
public void server(int port) throws InterruptedException {
final ByteBuf buf = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8));
NioEventLoopGroup group = new NioEventLoopGroup();//
try {
//创建ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(group)
//使用OioEventLoopGroup以允许阻塞模式
.channel(NioServerSocketChannel.class)//
.localAddress(new InetSocketAddress(port))
//指定ChannelInitializer,对于每个已接收的连接都调用它
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//添加一个ChannelInboundHandlerAdapter以拦截和处理事件
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx){
//将消息写到客户端,并添加ChannelFutureListener,以便消息一被写完就关闭连接
ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
}
});
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
}finally {
group.shutdownGracefully().sync();
}
}
}
我们发现使用Netty框架,从阻塞传输切换到非阻塞传输代码要做的更改非常小。
传输API
传输API 的核心就是Channel接口,它被用于所有的I/O操作,其层次结构如下图所示:
如图所示,每个Channel都将会被分配一个ChannelPipeline和ChannelConfig,ChannelConfig包含了该Channel的所有配置设置,并且支持热更新。
ChannelPipeline持有所有将应用于入站和出站数据以及事件的ChannelHandler实例,这些ChannelHandler实现了应用程序用于处理状态变化以及数据处理的逻辑。
ChannelHandler的典型用途包括:
- 将数据从一种格式转换为另一种格式:
- 提供异常的通知;
- 提供Channel变为活动的或者非活动的通知;
- 提供当Channel注册到EventLoop或者从EventLoop注销时的通知;
- 提供有关用户自定义事件的通知。
channel的方法:
方法名 | 描述 |
---|---|
eventLoop | 返回分配给Channel的EventLoop |
pipeline | 返回分配给Channel的ChannelPipeline |
isActive | 如果Channel是活动的,则返回true |
localAddress | 返回本地的SocketAddress |
remoteAddress | 返回远程的SocketAddress |
write | 将数据写到缓冲区,这个数据将被传递给ChannelPipeline,并且排队直到它被冲刷 |
flush | 将缓冲区数据冲刷到底层传输,如一个Socket |
writeAndFlush | 等同于调用write()并接着调用flush()方法 |
前面我们也提到过,Netty的Channel实现是线程安全的,因此我们可以存储一个到Channel的引用,并且每当需要向远程节点写数据时,都可以使用它。
代码如下:
public class ChannelOperationExamples {
private static final Channel CHANNEL_FROM_SOMEWHERE = new NioSocketChannel();
public static void writingToChannelFromManyThreads() {
// 存储一个Channel的引用
final Channel channel = CHANNEL_FROM_SOMEWHERE;
//创建持有要写数据的ByteBuf
final ByteBuf buf = Unpooled.copiedBuffer("your data",
CharsetUtil.UTF_8);
//创建将数据写到Channel 的 Runnable
Runnable writer = new Runnable() {
@Override
public void run() {
channel.write(buf.duplicate());
}
};
//获取到线程池Executor 的引用
Executor executor = Executors.newCachedThreadPool();
//递交写任务给线程池以便在某个线程中执行
executor.execute(writer);
//递交另一个写任务以便在另一个线程中执行
executor.execute(writer);
//...
}
}
内置的传输
名称 | 描述 |
---|---|
NIO | 基于选择器的方式 |
Epoll | 由JNI驱动的epoll()和非阻塞IO,支持只有在Linux上可用的多种特性,如SO_REUSEPORT,比NIO传输更快,完全非阻塞 |
OIO | 阻塞IO |
Local | 可以在VM内部通过管道进行通信的本地传输 |
Embedded | 允许使用ChannelHandler而又不需要一个真正的基于网络的传输,主要用于测试 |
NIO
NIO提供了一个所有I/O操作的全异步的实现,利用选择器来获取Channel状态改变时的通知,可能的状态变化有:
- 新的Channel已被接收并且就绪;
- Channel连接已经完成;
- Channel有已经就绪的可供读取的数据;
- Channel可用于写数据。
选择器运行在一个检查状态变化并对其做出响应的线程上,在应用程序对状态的改变作出响应之后,选择器将会被重置,并将重复这个过程。
选择操作的位模式:
名称 | 描述 |
---|---|
OP_ACCEPT | 请求在接收新连接并创建Channel时获得通知 |
OP_CONNECT | 请求在建立一个连接时获得通知 |
OP_READ | 请求当数据已经就绪,可以从Channel中读取时获得通知 |
OP_WRITE | 请求当可以向Channel中写入更多的数据时获得通知 |
处理流程如下:
知识点扩展:
零拷贝:零拷贝时一种目前只有在使用NIO和Epoll传输时才可使用的特性,可以快速高效地将数据从文件系统移动到网络接口,而不需要将其从内核空间复制到用户空间,因此CPU不需要为数据在内存之间的拷贝消耗资源。但是它对于实现了数据加密或者压缩的文件系统是不可用的,只能传输文件的原始内容。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!