本文主要内容:
- 实时Web的概念
- WebSocket协议
- 使用Netty构建一个基于WebSocket的聊天室服务器
WebSocket简介
WebSocket协议是完全重新设计的协议,旨在为Web上的双向数据传输问题提供一个切实可行的解决方案,使得客户端与服务器之间可以在任意时刻传输消息。
我们的WebSocket示例应用程序
添加WebSocket支持
处理HTTP请求
首先我们需要实现处理HTTP请求的组件,这个组件将提供用于访问聊天室并显示由连接的客户端发送的消息的网页。channelRead0()方法的实现是如何转发任何目标URI为/ws的请求的。
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String wsUri;
private static final File INDEX;
static {
URL location = HttpRequestHandler.class
.getProtectionDomain()
.getCodeSource().getLocation();
try {
String path = location.toURI() + "index.html";
path = !path.contains("file:") ? path : path.substring(5);
INDEX = new File(path);
} catch (URISyntaxException e) {
throw new IllegalStateException("Unable to locate index.html", e);
}
}
public HttpRequestHandler(String wsUri) {
this.wsUri = wsUri;
}
@Override
public void channelRead0(ChannelHandlerContext ctx,FullHttpRequest request) throws Exception {
// 如果请求了WebSocket协议升级,则增加引用计数(调用retain()方法),并将它传递给下一个 ChannelInboundHandler
if (wsUri.equalsIgnoreCase(request.getUri())) {
ctx.fireChannelRead(request.retain());
} else {
// 处理100 Continue请求以符合HTTP1.1 规范
if (HttpHeaders.is100ContinueExpected(request)) {
send100Continue(ctx);
}
// 读取index.html
RandomAccessFile file = new RandomAccessFile(INDEX, "r");
HttpResponse response = new DefaultHttpResponse(
request.getProtocolVersion(), HttpResponseStatus.OK);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE,"text/plain; charset=UTF-8");
boolean keepAlive = HttpHeaders.isKeepAlive(request);
if (keepAlive) {
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
response.headers().set( HttpHeaders.Names.CONNECTION,HttpHeaders.Values.KEEP_ALIVE);
}
// 将HttpResponse写到客户端
ctx.write(response);
if (ctx.pipeline().get(SslHandler.class) == null) {
// 将index.html写到客户端
ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
} else {
ctx.write(new ChunkedNioFile(file.getChannel()));
}
// 写LastHttpContent并冲刷至客户端
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
// 如果没有请求keep-alive,则在写操作完成后关闭Channel
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
如果该HTTP请求指向了地址为/ws的URI,那么HttpRequestHandler将调用FullHttpRequest对象上的retain()方法,并通过调用fireChannelRead(msg)方法将它转发给下一个ChannelInboundHandler。之所以需要调用retain()方法,是因为调用channelRead()方法完成之后,它将调用FullHttpRequest对象上的release()方法来释放它的资源。
处理WebSocket帧
TextWebSocketFrame是我们唯一真正需要处理的帧类型。Netty提供了WebSocketServerProtocolHandler来处理其他类型的帧。
下面代码展示了我们用于处理TextWebSocketFrame的ChannelInboundHandler,其还将在它的ChannelGroup中跟踪所有活动的WebSocket连接。
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private final ChannelGroup group;
public TextWebSocketFrameHandler(ChannelGroup group) {
this.group = group;
}
@Override // 重写userEventTriggered()方法以处理自定义事件
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 如果该事件表示握手成功,则从该Channelipeline中移除HttpRequestHandler,因为将不会接收到任何HTTP 消息了
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
ctx.pipeline().remove(HttpRequestHandler.class);
// 通知所有已经连接的WebSocket 客户端新的客户端已经连接上了
group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));
// 将新的WebSocket Channel添加到ChannelGroup 中,以便它可以接收到所有的消息
group.add(ctx.channel());
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx,TextWebSocketFrame msg) throws Exception {
// 增加消息的引用计数,并将它写到ChannelGroup 中所有已经连接的客户端
group.writeAndFlush(msg.retain());
}
}
和之前一样,对于retain()方法的调用是必需的,因为当channelRead0()方法返回时,TextWebSocketFrame 的引用计数将会被减少。由于所有的操作都是异步的,因此,writeAndFlush()方法可能会在channelRead0()方法返回之后完成,而且它绝对不能访问一个已经失效的引用。
初始化ChannelPipeline
为了将ChannelHandler安装到ChannelPipeline中,需要扩展ChannelInitializer,并实现initChannel()方法。
public class ChatServerInitializer extends ChannelInitializer<Channel> {
private final ChannelGroup group;
public ChatServerInitializer(ChannelGroup group) {
this.group = group;
}
@Override
protected void initChannel(Channel ch) throws Exception {
//将所有有需要的ChannelHandler添加到ChannelPipeline中
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(new HttpRequestHandler("/ws"));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler(group));
}
}
下面是基于WebSocket聊天服务器的ChannelHandler及各自的职责:
Netty的WebSocketServerProtocolHandler处理了所有委托管理的WebSocket帧类型以及升级握手本身,如果握手成功,那么所需的ChannelHandler将会被添加到ChannelPipeline中,而那些不再需要的ChannelHandler则将会被移除。
WebSocket 协议升级之前的ChannelPipeline:
WebSocket 协议升级完成之后的ChannelPipeline:
引导
由ChatServer类来引导服务器,并安装ChatServerInitializer的代码。
public class ChatServer {
//创建DefaultChannelGroup,其将保存所有已经连接的WebSocketChannel
private final ChannelGroup channelGroup =
new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
private final EventLoopGroup group = new NioEventLoopGroup();
private Channel channel;
//引导服务器
public ChannelFuture start(InetSocketAddress address) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(createInitializer(channelGroup));
ChannelFuture future = bootstrap.bind(address);
future.syncUninterruptibly();
channel = future.channel();
return future;
}
//创建ChatServerInitializer
protected ChannelInitializer<Channel> createInitializer(
ChannelGroup group) {
return new ChatServerInitializer(group);
}
//处理服务器关闭,并释放所有的资源
public void destroy() {
if (channel != null) {
channel.close();
}
channelGroup.close();
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Please give port as argument");
System.exit(1);
}
int port = Integer.parseInt(args[0]);
final ChatServer endpoint = new ChatServer();
ChannelFuture future = endpoint.start(
new InetSocketAddress(port));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
endpoint.destroy();
}
});
future.channel().closeFuture().syncUninterruptibly();
}
}
测试该应用程序
略
如何进行加密
为ChannelPipeline加密:
//扩展ChatServerInitializer来加密
public class SecureChatServerInitializer extends ChatServerInitializer {
private final SslContext context;
public SecureChatServerInitializer(ChannelGroup group, SslContext context) {
super(group);
this.context = context;
}
@Override
protected void initChannel(Channel ch) throws Exception {
super.initChannel(ch);
//调用父类的initChannel()方法
SSLEng.ine engine = context.newEngine(ch.alloc());
engine.setUseClientMode(false);
//将SslHandler添加到ChannelPipeline中
ch.pipeline().addFirst(new SslHandler(engine));
}
}
最后一步是调整ChatServer以使用SecureChatServerInitializer,以便在ChannelPipeline中安装SslHandler。
public class SecureChatServer extends ChatServer {
private final SslContext context;
public SecureChatServer(SslContext context) {
this.context = context;
}
@Override
protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
return new SecureChatServerInitializer(group, context); // 返回之前创建的SecureChatServerInitializer 以启用加密
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Please give port as argument");
System.exit(1);
}
int port = Integer.parseInt(args[0]);
SelfSignedCertificate cert = new SelfSignedCertificate();
SslContext context = SslContext.newServerContext(cert.certificate(), cert.privateKey());
final SecureChatServer endpoint = new SecureChatServer(context);
ChannelFuture future = endpoint.start(new InetSocketAddress(port));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
endpoint.destroy();
}
});
future.channel().closeFuture().syncUninterruptibly();
}
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!