本文主要内容:
- UDP概述
- 一个广播应用程序示例
UDP的基础知识
面向连接的传输(如TCP)管理了两个网络端点之间的连接的建立,在连接的生命周期内的有序和可靠的消息传输,以及最后连接的有序终止。
类似于UDP这样的无连接协议中,并没有持久化连接这样的概念,并且每个消息(一个UDP数据报)都是一个单独的传输单元。
UDP广播
UDP提供了向多个接收者发送消息的额外传输模式:
多播:传播到一个预定义的主机组
广播:传输到网络上的所有主机
UDP示例应用程序
发布/订阅模式:一个生产者或者服务发布事件,而多个客户端进行订阅以接收它们。
所有的在该UDP端口上监听的事件监视器都将会接收到广播消息。
消息POJO:LogEvent
在消息处理应用程序中,数据通常由POJO表示,除了实际上的消息内容,其还可以包含配置或处理信息,在这个应用程序中,我们将会把消息作为事件处理,并且由于该数据来自于日志文件,所以我们称它为LogEvent。
public class LogEvent {
public static final byte SEPARATOR = (byte) ':';
private final InetSocketAddress source;
private final String logfile;
private final String msg;
private final long received;
public LogEvent(String logfile, String msg) { // 用于传出消息的构造函数
this(null, -1, logfile, msg);
}
public LogEvent(InetSocketAddress source, long received, String logfile, String msg) { // 用于 传入消息的构造函数
this.source = source;
this.logfile = logfile;
this.msg = msg;
this.received = received;
}
public InetSocketAddress getSource() { // 返回发送LogEvent 的源的InetSocketAddress
return source;
}
public String getLogfile() { // 返回所发送的LogEvent 的日志文件的名称
return logfile;
}
public String getMsg() { // 返回消息内容
return msg;
}
public long getReceivedTimestamp() { // 返回接收LogEvent的时间
return received;
}
}
定义好了消息组件,接下来便可以实现该应用程序的广播逻辑了。
编写广播者
Netty提供了大量的类来支持UDP应用程序的编写
Netty 的DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程节点通信。它包含了接收者(和可选的发送者)的地址以及消息的有效负载本身。
下图展示了正在广播的3个日志条目,每个都将通过一个专门的DatagramPacket进行广播:
下图呈现了该LogEventBroadcaster的ChannelPipeline的一个高级别视图,展示了LogEvent消息是如何流经它的。
所有的将要被传输的数据都被封装在了LogEvent消息中,LogEventBroadcaster将把这些写入到Channel中,并通过ChannelPipeline发送它们,在那里他们将会被转换(编码)为DatagramPacket消息,最后,它们都将通过UDP被广播,并由远程节点(监视器)所捕获。
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent>{
private final InetSocketAddress remoteAddress;
//LogEventEncoder创建了即将被发送到指定的InetSocketAddress的DatagramPacket消息
public LogEventEncoder(InetSocketAddress remoteAddress){
this.remoteAddress = remoteAddress;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
LogEvent logEvent, List<Object> out) throws Exception {
byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
ByteBuf buf = channelHandlerContext.alloc().buffer(file.length + msg.length + 1);
//将文件名写入到ByteBuf中
buf.writeBytes(file);
//添加一个SEPARATOR
buf.writeByte(LogEvent.SEPARATOR);
//将日志消息写入ByteBuf中
buf.writeBytes(msg);
//将一个拥有数据和目的地地址的新DatagramPacket添加到出站的消息列表中
out.add(new io.netty.channel.socket.DatagramPacket(buf,remoteAddress));
}
}
在LogEventEncoder被实现之后,我们已经准备好了引导该服务器,其包括设置各种各样的ChannelOption,以及在ChannelPipeline中安装所需要的ChannelHandler。这将通过主类LogEventBroadcaster完成。如下代码所示。
public class LogEventBroadcaster {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
private final File file;
public LogEventBroadcaster(InetSocketAddress address, File file){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
//引导该NioDatagramChannel(无连接)
bootstrap.group(group).channel(NioDatagramChannel.class)
//设置SO_BROADCAST套接字选项
.option(ChannelOption.SO_BROADCAST,true)
.handler(new LogEventEncoder(address));
this.file = file;
}
public void run() throws Exception{
//绑定Channel
Channel ch = bootstrap.bind(0).sync().channel();
long pointer = 0;
//启动主处理循环
for (;;){
long len = file.length();
if (len < pointer){
//file was reset
//如果有必要,将文件指针设置到该文件的最后一个字符
pointer = len;
} else if (len > pointer){
//Content was added
RandomAccessFile raf = new RandomAccessFile(file,"r");
//设置当前的文件指针,以确保没有任何的旧日志被发送
raf.seek(pointer);
String line;
while((line = raf.readLine()) != null){
//对于每条日志条目。,写入一个LogEvent到Channel中
ch.writeAndFlush(new LogEvent(null,-1,file.getAbsolutePath(),line));
}
//存储其在文件中的当前位置
pointer = raf.getFilePointer();
raf.close();
}
try {
//休眠1秒,如果被中断,则退出循环,否则重新处理它
Thread.sleep(1000);
}catch (InterruptedException e){
Thread.interrupted();
break;
}
}
}
public void stop(){
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception{
if (args.length != 2){
throw new IllegalArgumentException();
}
LogEventBroadcaster broadcaster = new LogEventBroadcaster(
new InetSocketAddress("255.255.255.255",Integer.parseInt(args[0])),new File(args[1]));
try {
broadcaster.run();
}finally {
broadcaster.stop();
}
}
}
6、编写监视器
目标是将netcat替换为一个更加完整的事件消费者,我们称之为LogEventMonitor。这个程序将:
(1)接收有LogEventBroadcaster广播的UDP DatagramPacket
(2)将它们解码为LogEvent消息
(3)将LogEvent消息写到System.out
和之前一样,该逻辑由一组自定义的ChannelHandler实现——对于我们的解码器来说,我们将扩展MessageToMessageDecoder。下图描绘LogEventMonitor的ChannelPipeline,并且展示了LogEvnet是如何流经它的。
ChannelPipeline中的第一个解码器LogEventDecoder负责传入的DatagramPacket解码为LogEvent消息(一个用于转换入站数据的任何Netty应用程序的典型设置)
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket>{
@Override
protected void decode(ChannelHandlerContext channelHandlerContext,
DatagramPacket datagramPacket, List<Object> out) throws Exception {
//获取对DatagramPacket中的数据的引用
ByteBuf data = datagramPacket.content();
//获取该SEPARATOR的索引
int idx = data.indexOf(0,data.readableBytes(),LogEvent.SEPARATOR);
//提取文件名
String fileName = data.slice(0,idx).toString(CharsetUtil.UTF_8);
//提取日志消息
String logMsg = data.slice(idx + 1,data.readableBytes()).toString(CharsetUtil.UTF_8);
//构建一个新的LogEvent对象,并且将它添加到列表中
LogEvent event = new LogEvent(datagramPacket.sender(),System.currentTimeMillis(),fileName,logMsg);
out.add(event);
}
}
第二个ChannelHandler的工作是对第一个ChannelHandler所创建的LogEvent消息执行一些处理。在这个场景下,它只是简单地将它们写到System.out。在真实世界的应用程序中,你可能需要聚合来源于不同日志文件的事件,或者将它们发布到数据库中。
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent>{
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//当异常发生时,打印栈跟踪信息,并关闭对应的Channel
cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext,
LogEvent event) throws Exception {
//创建StringBuilder,并且构建输出的字符串
StringBuilder builder = new StringBuilder();
builder.append(event.getReceived());
builder.append(" [");
builder.append(event.getSource().toString());
builder.append("] [");
builder.append(event.getLogfile());
builder.append("] : ");
builder.append(event.getMsg());
//打印LogEvent的数据
System.out.println(builder.toString());
}
}
LogEventHandler将以一种简单易读的格式打印LogEvent消息,现在我们需要将我们的LogEventDecoder和LogEventHandler安装到ChannelPipeline中。
public class LogEventMonitor {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
public LogEventMonitor(InetSocketAddress address){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST,true)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LogEventDecoder());
pipeline.addLast(new LogEventHandler());
}
}).localAddress(address);
}
public Channel bind(){
return bootstrap.bind().syncUninterruptibly().channel();
}
public void stop(){
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception{
if (args.length != 1){
throw new IllegalArgumentException("Usage:LoEventMonitor <port>");
}
LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0])));
try {
Channel channel = monitor.bind();
System.out.println("LogEventMonitor running");
channel.closeFuture().sync();
}finally {
monitor.stop();
}
}
}
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!