netty中的UDP

摘要:
UDP提供了向多个接收者发送消息的额外传输模式:多播——传输到一个预定义的主机组;广播——传输到网络上的所有主机。本示例应用程序将通过发送能够被同一个网络中的所有主机所接收的消息来演示UDP广播的使用。所有的在该UDP端口上监听的事件监视器都将会接收到广播消息。代码:1、消息POJO:LogEventpackagecom.dxz.nettydemo.chapter13;importjava.net.InetSocketAddress;publicfinalclassLogEvent{publicstaticfinalbyteSEPARATOR=':';privatefinalInetSocketAddresssource;privatefinalStringlogfile;privatefinalStringmsg;privatefinallongreceived;publicLogEvent{this;}publicLogEvent{this.source=source;this.logfile=logfile;this.msg=msg;this.received=received;}publicstaticbytegetSeparator(){returnSEPARATOR;}publicInetSocketAddressgetSource(){returnsource;}publicStringgetLogfile(){returnlogfile;}publicStringgetMsg(){returnmsg;}publiclonggetReceived(){returnreceived;}publicObjectgetReceivedTimestamp(){returnSystem.currentTimeMillis();}}2、编写广播者Netty的DatagramPacket是一个简单的消息容器,DatagramChannel实现用它来和远程节点通信。要将LogEvent消息转换为DatagramPacket,我们将需要一个编码器。

UDP 提供了向多个接收者发送消息的额外传输模式:

  • 多播——传输到一个预定义的主机组;
  • 广播——传输到网络(或者子网)上的所有主机。

本示例应用程序将通过发送能够被同一个网络中的所有主机所接收的消息来演示 UDP 广播的使用。为此,我们将使用特殊的受限广播地址或者零网络地址 255.255.255.255。 发送到这个地址的消息都将会被定向给本地网络(0.0.0.0)上的所有主机,而不会被路由器 转发给其他的网络。所有的在该 UDP 端口上监听的事件监视器都将会接收到广播消息。

netty中的UDP第1张

代码:

1、消息 POJO: LogEvent

packagecom.dxz.nettydemo.chapter13;
importjava.net.InetSocketAddress;
public final classLogEvent {
    public static final byte SEPARATOR = (byte) ':';
    private finalInetSocketAddress source;
    private finalString logfile;
    private finalString msg;
    private final longreceived;
    publicLogEvent(String logfile, String msg) {
        this(null, -1, logfile, msg);
    }
    public LogEvent(InetSocketAddress source, longreceived, String logfile, String msg) {
        this.source =source;
        this.logfile =logfile;
        this.msg =msg;
        this.received =received;
    }
    public static bytegetSeparator() {
        returnSEPARATOR;
    }
    publicInetSocketAddress getSource() {
        returnsource;
    }
    publicString getLogfile() {
        returnlogfile;
    }
    publicString getMsg() {
        returnmsg;
    }
    public longgetReceived() {
        returnreceived;
    }
    publicObject getReceivedTimestamp() {
        returnSystem.currentTimeMillis();
    }
}

2、编写广播者

Netty 的 DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程节点通信。类似于在我们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地址 以及消息的有效负载本身。 要将 LogEvent 消息转换为 DatagramPacket,我们将需要一个编码器。但是没有必要从 头开始编写我们自己的。我们将扩展 Netty 的 MessageToMessageEncoder:

netty中的UDP第2张

编码:

packagecom.dxz.nettydemo.chapter13;
importjava.net.InetSocketAddress;
importjava.util.List;
importio.netty.buffer.ByteBuf;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.socket.DatagramPacket;
importio.netty.handler.codec.MessageToMessageEncoder;
importio.netty.util.CharsetUtil;
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent>{
    private finalInetSocketAddress remoteAddress;
    publicLogEventEncoder(InetSocketAddress remoteAddress) {
        this.remoteAddress =remoteAddress;
    }
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, LogEvent logEvent, List<Object>out)
            throwsException {
        byte[] file =logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
        byte[] msg =logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
        ByteBuf buf = channelHandlerContext.alloc().buffer(file.length + msg.length + 1);
        buf.writeBytes(file);
        buf.writeByte(LogEvent.SEPARATOR);
        buf.writeBytes(msg);
        out.add(newDatagramPacket(buf, remoteAddress));
    }
}

在 LogEventEncoder 被实现之后,我们已经准备好了引导该服务器,其包括设置各种各 样的 ChannelOption,以及在 ChannelPipeline 中安装所需要的 ChannelHandler。这将通过主类 LogEventBroadcaster 完成,如下代码清单:

packagecom.dxz.nettydemo.chapter13;
importjava.io.File;
importjava.io.RandomAccessFile;
importjava.net.InetSocketAddress;
importio.netty.bootstrap.Bootstrap;
importio.netty.channel.Channel;
importio.netty.channel.ChannelOption;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importio.netty.channel.socket.nio.NioDatagramChannel;
public classLogEventBroadcaster {
    private finalEventLoopGroup group;
    private finalBootstrap bootstrap;
    private finalFile file;
    publicLogEventBroadcaster(InetSocketAddress address, File file) {
        group = newNioEventLoopGroup();
        bootstrap = newBootstrap();
        bootstrap.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true)
                .handler(newLogEventEncoder(address));
        this.file =file;
    }
    public void run() throwsException {
        Channel ch = bootstrap.bind(0).sync().channel();
        long pointer = 0;
        System.out.println("run() ch:" +ch);
        for(;;) {
            long len =file.length();
            if (len <pointer) {
                //file was reset
                pointer =len;
            } else if (len >pointer) {
                System.out.println("read begin " +pointer);
                //Content was added
                RandomAccessFile raf = new RandomAccessFile(file, "r");
                raf.seek(pointer);
                String line;
                while ((line = raf.readLine()) != null) {
                    ch.writeAndFlush(new LogEvent(null, -1, file.getAbsolutePath(), line));
                }
                pointer =raf.getFilePointer();
                raf.close();
            }
            try{
                Thread.sleep(1000);
            } catch(InterruptedException e) {
                Thread.interrupted();
                break;
            }
        }
    }
    public voidstop() {
        group.shutdownGracefully();
    }
    public static void main(String[] args) throwsException {
        /*if (args.length != 2) {
            throw new IllegalArgumentException();
        }*/
        String port = "8888";
        String path = "E:\myspace\netty-demo\src\main\java\com\dxz\nettydemo\chapter13\log.txt";
        LogEventBroadcaster broadcaster = newLogEventBroadcaster(
                        //new InetSocketAddress("192.168.5.12",
                        //new InetSocketAddress("localhost",
                        new InetSocketAddress("255.255.255.255",    
                Integer.parseInt(port)), newFile(path));
        try{
            broadcaster.run();
        } finally{
            broadcaster.stop();
        }
    }
}

图 13-3 呈现了该 LogEventBroadcaster 的 ChannelPipeline 的一个高级别视图,展 示了 LogEvent 消息是如何流经它的。

netty中的UDP第3张

正如你所看到的,所有的将要被传输的数据都被封装在了 LogEvent 消息中。LogEventBroadcaster 将把这些写入到 Channel 中,并通过 ChannelPipeline 发送它们,在那里它 们将会被转换(编码)为 DatagramPacket 消息。最后,他们都将通过 UDP 被广播,并由远 程节点(监视器)所捕获。

通过nc监控指定的8888端口如下:

netty中的UDP第4张

然后运行LogEventBroadcaster类,再观察nc的日志如下:

netty中的UDP第5张

编写监视器

我们的目标是将 netcat 替换为一个更加完整的事件消费者,我们称之为 LogEventMonitor。 这个程序将:

(1)接收由 LogEventBroadcaster 广播的 UDP DatagramPacket;

(2)将它们解码为 LogEvent 消息;

(3)将 LogEvent 消息写出到 System.out。

netty中的UDP第6张

解码

ChannelPipeline 中的第一个解码器LogEventDecoder 负责将传入的DatagramPacket 解码为 LogEvent 消息ChannelPipeline 中的第一个解码器LogEventDecoder 负责将传入的DatagramPacket 解码为 LogEvent 消息:

packagecom.dxz.nettydemo.chapter13;
importjava.util.List;
importio.netty.buffer.ByteBuf;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.socket.DatagramPacket;
importio.netty.handler.codec.MessageToMessageDecoder;
importio.netty.util.CharsetUtil;
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket>{
    @Override
    protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throwsException {
        ByteBuf data =datagramPacket.content();
        int idx = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR);
        String filename = data.slice(0, idx).toString(CharsetUtil.UTF_8);
        String logMsg = data.slice(idx + 1, data.readableBytes()).toString(CharsetUtil.UTF_8);
        LogEvent event = newLogEvent(datagramPacket.sender(), System.currentTimeMillis(), filename, logMsg);
        out.add(event);
    }
}

第二个 ChannelHandler 的工作是对第一个 ChannelHandler 所创建的 LogEvent 消息执行一些处理:

LogEventHandler 将以一种简单易读的格式打印 LogEvent 消息,包括以下的各项:

 以毫秒为单位的被接收的时间戳;

 发送方的 InetSocketAddress,其由 IP 地址和端口组成;

 生成 LogEvent 消息的日志文件的绝对路径名;

 实际上的日志消息,其代表日志文件中的一行。

packagecom.dxz.nettydemo.chapter13;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.SimpleChannelInboundHandler;
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent>{
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {
        cause.printStackTrace();
        ctx.close();
    }
    @Override
    public void messageReceived(ChannelHandlerContext ctx, LogEvent event) throwsException {
        StringBuilder builder = newStringBuilder();
        builder.append(event.getReceivedTimestamp());
        builder.append(" [");
        builder.append(event.getSource().toString());
        builder.append("] [");
        builder.append(event.getLogfile());
        builder.append("] : ");
        builder.append(event.getMsg());
        System.out.println(builder.toString());
    }
}

现在我们需要将我们的LogEventDecoder 和LogEventHandler 安装到ChannelPipeline 中,通过 LogEventMonitor 主类来做到这一点:

packagecom.dxz.nettydemo.chapter13;
importjava.net.InetSocketAddress;
importio.netty.bootstrap.Bootstrap;
importio.netty.channel.Channel;
importio.netty.channel.ChannelInitializer;
importio.netty.channel.ChannelOption;
importio.netty.channel.ChannelPipeline;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importio.netty.channel.socket.nio.NioDatagramChannel;
public classLogEventMonitor {
    private finalEventLoopGroup group;
    private finalBootstrap bootstrap;
    publicLogEventMonitor(InetSocketAddress address) {
        group = newNioEventLoopGroup();
        bootstrap = newBootstrap();
        bootstrap.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throwsException {
                        ChannelPipeline pipeline =channel.pipeline();
                        pipeline.addLast(newLogEventDecoder());
                        pipeline.addLast(newLogEventHandler());
                    }
                }).localAddress(address);
    }
    publicChannel bind() {
        returnbootstrap.bind().syncUninterruptibly().channel();
    }
    public voidstop() {
        group.shutdownGracefully();
    }
    public static void main(String[] args) throwsException {
        /*if (args.length != 1) {
            throw new IllegalArgumentException("Usage: LogEventMonitor <port>");
        }*/
        String port = "8888";
        LogEventMonitor monitor = new LogEventMonitor(newInetSocketAddress(Integer.parseInt(port)));
        try{
            Channel channel =monitor.bind();
            System.out.println("LogEventMonitor running");
            channel.closeFuture().sync();
        } finally{
            monitor.stop();
        }
    }
}

测试:

新修改下广播的地址为localhost,启动发送端和监听端程序,往log.txt文件中增加“fffffffffffff”保存后,如下图:

netty中的UDP第7张

免责声明:文章转载自《netty中的UDP》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇SQL GUID和自增列做主键的优缺点Kubernetes网络的iptables模式和ipvs模式支持ping分析下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

ansible 调试 debug 一介凡人

一、debug模块 1、debug模块是Ansible Playbook中最常用的调试模块,可以在Playbook执行过程打印调试信息,特别是跟when条件语句一起使用时,可以调试特定条件下的执行过程。 比如:当变量 a 定义时,将 a 的值打印出来,当任务成功后,打印执行结果等。 msg:调试输出的消息 var:将某个任务执行的输出作为变量传递给d...

消息中间件(1)

问题的起源 ​ 在深入了解消息中间件之前,我想先搞清楚为什么会出现消息中间件这么一款产品,换句话说我们需要弄清楚消息中间件到底解决了一个什么问题。 ​ 在互联网的初级阶段,那个时候一方面没有想现在如此多的用户,另一方面也没有太复杂的业务场景,在那个阶段,应用的架构往往是垂直式的,通俗的讲就是在一个工程中解决所有的问题。那么从进程角度来看,在这一阶段中所有的...

SpringBoot集成redisson分布式锁

原文链接:https://blog.csdn.net/sinat_25295611/article/details/80420086 https://www.cnblogs.com/yangzhilong/p/7605807.html 业务场景:在电商项目中,往往会有这样的一个功能设计,当用户下单后一段时间没有付款,系统就会在超时后关闭该订单。 通常我们会...

mybatis传入多个参数

需要查阅本文的基本都是需要传入多个参数的,这里记住一句话:无论你传的参数是什么样的,最后mybtis都会将你传入的转换为map的,那么既然这样,当我们要传入多个参数时,何不直接给与map类型即可,然后mapper.xml通过#{map.key}来获取值即可,这个特别适合动态搜索,或者多个参数的查询,并且可以在mapper的xml语句中通过if判断来实现若为...

SpringBoot 获取前端页面参数的集中方式总结

SpringBoot的一个好处就是通过注解可以轻松获取前端页面的参数,之后尅将参数经过一系列处理传送到后台数据库,前端时间正好用到。大致分为一下几种: 1.指定前端URL请求参数名称与方法名称一致,这种方式简单来说就是URL请求格式中参数需要与方法的参数名称对应上,举个例子,这么一个URL请求:http://localhost:8080/0919/tes...

Java大文件分片上传/多线程上传

这里只写后端的代码,基本的思想就是,前端将文件分片,然后每次访问上传接口的时候,向后端传入参数:当前为第几块文件,和分片总数 下面直接贴代码吧,一些难懂的我大部分都加上注释了: 上传文件实体类: 看得出来,实体类中已经有很多我们需要的功能了,还有实用的属性。如MD5秒传的信息。 public class FileInf {      public File...