本文主要内容:

  • UDP概述
  • 一个广播应用程序示例

UDP的基础知识

面向连接的传输(如TCP)管理了两个网络端点之间的连接的建立,在连接的生命周期内的有序和可靠的消息传输,以及最后连接的有序终止。
类似于UDP这样的无连接协议中,并没有持久化连接这样的概念,并且每个消息(一个UDP数据报)都是一个单独的传输单元。

UDP广播

UDP提供了向多个接收者发送消息的额外传输模式:
多播:传播到一个预定义的主机组
广播:传输到网络上的所有主机

UDP示例应用程序

发布/订阅模式:一个生产者或者服务发布事件,而多个客户端进行订阅以接收它们。
在这里插入图片描述
所有的在该UDP端口上监听的事件监视器都将会接收到广播消息。

消息POJO:LogEvent

在消息处理应用程序中,数据通常由POJO表示,除了实际上的消息内容,其还可以包含配置或处理信息,在这个应用程序中,我们将会把消息作为事件处理,并且由于该数据来自于日志文件,所以我们称它为LogEvent。

public class LogEvent {
    public static final byte SEPARATOR = (byte) ':';
    private final InetSocketAddress source;
    private final String logfile;
    private final String msg;
    private final long received;
    public LogEvent(String logfile, String msg) { // 用于传出消息的构造函数
        this(null, -1, logfile, msg);
    }
    public LogEvent(InetSocketAddress source, long received, String logfile, String msg) { // 用于 传入消息的构造函数
        this.source = source;
        this.logfile = logfile;
        this.msg = msg;
        this.received = received;
    }
    public InetSocketAddress getSource() { // 返回发送LogEvent 的源的InetSocketAddress
        return source;
    }
    public String getLogfile() { // 返回所发送的LogEvent 的日志文件的名称
        return logfile;
    }
    public String getMsg() { // 返回消息内容
        return msg;
    }
    public long getReceivedTimestamp() { // 返回接收LogEvent的时间
        return received;
    }
}

定义好了消息组件,接下来便可以实现该应用程序的广播逻辑了。

编写广播者

Netty提供了大量的类来支持UDP应用程序的编写
在这里插入图片描述
Netty 的DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程节点通信。它包含了接收者(和可选的发送者)的地址以及消息的有效负载本身。
下图展示了正在广播的3个日志条目,每个都将通过一个专门的DatagramPacket进行广播:
在这里插入图片描述
下图呈现了该LogEventBroadcaster的ChannelPipeline的一个高级别视图,展示了LogEvent消息是如何流经它的。
在这里插入图片描述
所有的将要被传输的数据都被封装在了LogEvent消息中,LogEventBroadcaster将把这些写入到Channel中,并通过ChannelPipeline发送它们,在那里他们将会被转换(编码)为DatagramPacket消息,最后,它们都将通过UDP被广播,并由远程节点(监视器)所捕获。

public class LogEventEncoder extends MessageToMessageEncoder<LogEvent>{
    private final InetSocketAddress remoteAddress;
 
    //LogEventEncoder创建了即将被发送到指定的InetSocketAddress的DatagramPacket消息
    public LogEventEncoder(InetSocketAddress remoteAddress){
        this.remoteAddress = remoteAddress;
    }
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext,
                          LogEvent logEvent, List<Object> out) throws Exception {
        byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
        byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
        ByteBuf buf = channelHandlerContext.alloc().buffer(file.length + msg.length + 1);
        //将文件名写入到ByteBuf中
        buf.writeBytes(file);
        //添加一个SEPARATOR
        buf.writeByte(LogEvent.SEPARATOR);
        //将日志消息写入ByteBuf中
        buf.writeBytes(msg);
        //将一个拥有数据和目的地地址的新DatagramPacket添加到出站的消息列表中
        out.add(new io.netty.channel.socket.DatagramPacket(buf,remoteAddress));
    }
}

在LogEventEncoder被实现之后,我们已经准备好了引导该服务器,其包括设置各种各样的ChannelOption,以及在ChannelPipeline中安装所需要的ChannelHandler。这将通过主类LogEventBroadcaster完成。如下代码所示。

public class LogEventBroadcaster {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    private final File file;
 
    public LogEventBroadcaster(InetSocketAddress address, File file){
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        //引导该NioDatagramChannel(无连接)
        bootstrap.group(group).channel(NioDatagramChannel.class)
                //设置SO_BROADCAST套接字选项
                .option(ChannelOption.SO_BROADCAST,true)
                .handler(new LogEventEncoder(address));
        this.file = file;
    }
    public void run() throws Exception{
        //绑定Channel
        Channel ch = bootstrap.bind(0).sync().channel();
        long pointer = 0;
        //启动主处理循环
        for (;;){
            long len = file.length();
            if (len < pointer){
                //file was reset
                //如果有必要,将文件指针设置到该文件的最后一个字符
                pointer = len;
            } else if (len > pointer){
                //Content was added
                RandomAccessFile raf = new RandomAccessFile(file,"r");
                //设置当前的文件指针,以确保没有任何的旧日志被发送
                raf.seek(pointer);
                String line;
                while((line = raf.readLine()) != null){
                    //对于每条日志条目。,写入一个LogEvent到Channel中
                    ch.writeAndFlush(new LogEvent(null,-1,file.getAbsolutePath(),line));
                }
                //存储其在文件中的当前位置
                pointer = raf.getFilePointer();
                raf.close();
            }
            try {
                //休眠1秒,如果被中断,则退出循环,否则重新处理它
                Thread.sleep(1000);
            }catch (InterruptedException e){
                Thread.interrupted();
                break;
            }
        }
    }
    public void stop(){
        group.shutdownGracefully();
    }
    public static void main(String[] args) throws Exception{
        if (args.length != 2){
            throw new IllegalArgumentException();
        }
        LogEventBroadcaster broadcaster = new LogEventBroadcaster(
                new InetSocketAddress("255.255.255.255",Integer.parseInt(args[0])),new File(args[1]));
        try {
            broadcaster.run();
        }finally {
            broadcaster.stop();
        }
    }
}

6、编写监视器

目标是将netcat替换为一个更加完整的事件消费者,我们称之为LogEventMonitor。这个程序将:

(1)接收有LogEventBroadcaster广播的UDP DatagramPacket

(2)将它们解码为LogEvent消息

(3)将LogEvent消息写到System.out

和之前一样,该逻辑由一组自定义的ChannelHandler实现——对于我们的解码器来说,我们将扩展MessageToMessageDecoder。下图描绘LogEventMonitor的ChannelPipeline,并且展示了LogEvnet是如何流经它的。
在这里插入图片描述
ChannelPipeline中的第一个解码器LogEventDecoder负责传入的DatagramPacket解码为LogEvent消息(一个用于转换入站数据的任何Netty应用程序的典型设置)

public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket>{
 
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext,
                          DatagramPacket datagramPacket, List<Object> out) throws Exception {
        //获取对DatagramPacket中的数据的引用
        ByteBuf data = datagramPacket.content();
        //获取该SEPARATOR的索引
        int idx = data.indexOf(0,data.readableBytes(),LogEvent.SEPARATOR);
        //提取文件名
        String fileName = data.slice(0,idx).toString(CharsetUtil.UTF_8);
        //提取日志消息
        String logMsg = data.slice(idx + 1,data.readableBytes()).toString(CharsetUtil.UTF_8);
        //构建一个新的LogEvent对象,并且将它添加到列表中
        LogEvent event = new LogEvent(datagramPacket.sender(),System.currentTimeMillis(),fileName,logMsg);
        out.add(event);
    }
}

第二个ChannelHandler的工作是对第一个ChannelHandler所创建的LogEvent消息执行一些处理。在这个场景下,它只是简单地将它们写到System.out。在真实世界的应用程序中,你可能需要聚合来源于不同日志文件的事件,或者将它们发布到数据库中。

public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent>{
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //当异常发生时,打印栈跟踪信息,并关闭对应的Channel
        cause.printStackTrace();
        ctx.close();
    }
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext,
                                LogEvent event) throws Exception {
        //创建StringBuilder,并且构建输出的字符串
        StringBuilder builder = new StringBuilder();
        builder.append(event.getReceived());
        builder.append(" [");
        builder.append(event.getSource().toString());
        builder.append("] [");
        builder.append(event.getLogfile());
        builder.append("] : ");
        builder.append(event.getMsg());
        //打印LogEvent的数据
        System.out.println(builder.toString());
    }
}

LogEventHandler将以一种简单易读的格式打印LogEvent消息,现在我们需要将我们的LogEventDecoder和LogEventHandler安装到ChannelPipeline中。

public class LogEventMonitor {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
 
    public LogEventMonitor(InetSocketAddress address){
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST,true)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new LogEventDecoder());
                        pipeline.addLast(new LogEventHandler());
                    }
                }).localAddress(address);
    }
    public Channel bind(){
        return bootstrap.bind().syncUninterruptibly().channel();
    }
    public void stop(){
        group.shutdownGracefully();
    }
    public static void main(String[] args) throws Exception{
        if (args.length != 1){
            throw new IllegalArgumentException("Usage:LoEventMonitor <port>");
        }
        LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0])));
        try {
            Channel channel = monitor.bind();
            System.out.println("LogEventMonitor running");
            channel.closeFuture().sync();
        }finally {
            monitor.stop();
        }
    }
}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

1. Redis--安装Redis Previous
12.Netty--WebSocket Next