UDP 提供了向多个接收者发送消息的额外传输模式:
- 多播——传输到一个预定义的主机组;
- 广播——传输到网络(或者子网)上的所有主机。
本示例应用程序将通过发送能够被同一个网络中的所有主机所接收的消息来演示 UDP 广播的使用。为此,我们将使用特殊的受限广播地址或者零网络地址 255.255.255.255。 发送到这个地址的消息都将会被定向给本地网络(0.0.0.0)上的所有主机,而不会被路由器 转发给其他的网络。所有的在该 UDP 端口上监听的事件监视器都将会接收到广播消息。
代码:
1、消息 POJO: LogEvent
packagecom.dxz.nettydemo.chapter13; importjava.net.InetSocketAddress; public final classLogEvent { public static final byte SEPARATOR = (byte) ':'; private finalInetSocketAddress source; private finalString logfile; private finalString msg; private final longreceived; publicLogEvent(String logfile, String msg) { this(null, -1, logfile, msg); } public LogEvent(InetSocketAddress source, longreceived, String logfile, String msg) { this.source =source; this.logfile =logfile; this.msg =msg; this.received =received; } public static bytegetSeparator() { returnSEPARATOR; } publicInetSocketAddress getSource() { returnsource; } publicString getLogfile() { returnlogfile; } publicString getMsg() { returnmsg; } public longgetReceived() { returnreceived; } publicObject getReceivedTimestamp() { returnSystem.currentTimeMillis(); } }
2、编写广播者
Netty 的 DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程节点通信。类似于在我们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地址 以及消息的有效负载本身。 要将 LogEvent 消息转换为 DatagramPacket,我们将需要一个编码器。但是没有必要从 头开始编写我们自己的。我们将扩展 Netty 的 MessageToMessageEncoder:
编码:
packagecom.dxz.nettydemo.chapter13; importjava.net.InetSocketAddress; importjava.util.List; importio.netty.buffer.ByteBuf; importio.netty.channel.ChannelHandlerContext; importio.netty.channel.socket.DatagramPacket; importio.netty.handler.codec.MessageToMessageEncoder; importio.netty.util.CharsetUtil; public class LogEventEncoder extends MessageToMessageEncoder<LogEvent>{ private finalInetSocketAddress remoteAddress; publicLogEventEncoder(InetSocketAddress remoteAddress) { this.remoteAddress =remoteAddress; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, LogEvent logEvent, List<Object>out) throwsException { byte[] file =logEvent.getLogfile().getBytes(CharsetUtil.UTF_8); byte[] msg =logEvent.getMsg().getBytes(CharsetUtil.UTF_8); ByteBuf buf = channelHandlerContext.alloc().buffer(file.length + msg.length + 1); buf.writeBytes(file); buf.writeByte(LogEvent.SEPARATOR); buf.writeBytes(msg); out.add(newDatagramPacket(buf, remoteAddress)); } }
在 LogEventEncoder 被实现之后,我们已经准备好了引导该服务器,其包括设置各种各 样的 ChannelOption,以及在 ChannelPipeline 中安装所需要的 ChannelHandler。这将通过主类 LogEventBroadcaster 完成,如下代码清单:
packagecom.dxz.nettydemo.chapter13; importjava.io.File; importjava.io.RandomAccessFile; importjava.net.InetSocketAddress; importio.netty.bootstrap.Bootstrap; importio.netty.channel.Channel; importio.netty.channel.ChannelOption; importio.netty.channel.EventLoopGroup; importio.netty.channel.nio.NioEventLoopGroup; importio.netty.channel.socket.nio.NioDatagramChannel; public classLogEventBroadcaster { private finalEventLoopGroup group; private finalBootstrap bootstrap; private finalFile file; publicLogEventBroadcaster(InetSocketAddress address, File file) { group = newNioEventLoopGroup(); bootstrap = newBootstrap(); bootstrap.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true) .handler(newLogEventEncoder(address)); this.file =file; } public void run() throwsException { Channel ch = bootstrap.bind(0).sync().channel(); long pointer = 0; System.out.println("run() ch:" +ch); for(;;) { long len =file.length(); if (len <pointer) { //file was reset pointer =len; } else if (len >pointer) { System.out.println("read begin " +pointer); //Content was added RandomAccessFile raf = new RandomAccessFile(file, "r"); raf.seek(pointer); String line; while ((line = raf.readLine()) != null) { ch.writeAndFlush(new LogEvent(null, -1, file.getAbsolutePath(), line)); } pointer =raf.getFilePointer(); raf.close(); } try{ Thread.sleep(1000); } catch(InterruptedException e) { Thread.interrupted(); break; } } } public voidstop() { group.shutdownGracefully(); } public static void main(String[] args) throwsException { /*if (args.length != 2) { throw new IllegalArgumentException(); }*/ String port = "8888"; String path = "E:\myspace\netty-demo\src\main\java\com\dxz\nettydemo\chapter13\log.txt"; LogEventBroadcaster broadcaster = newLogEventBroadcaster( //new InetSocketAddress("192.168.5.12", //new InetSocketAddress("localhost", new InetSocketAddress("255.255.255.255", Integer.parseInt(port)), newFile(path)); try{ broadcaster.run(); } finally{ broadcaster.stop(); } } }
图 13-3 呈现了该 LogEventBroadcaster 的 ChannelPipeline 的一个高级别视图,展 示了 LogEvent 消息是如何流经它的。
正如你所看到的,所有的将要被传输的数据都被封装在了 LogEvent 消息中。LogEventBroadcaster 将把这些写入到 Channel 中,并通过 ChannelPipeline 发送它们,在那里它 们将会被转换(编码)为 DatagramPacket 消息。最后,他们都将通过 UDP 被广播,并由远 程节点(监视器)所捕获。
通过nc监控指定的8888端口如下:
然后运行LogEventBroadcaster类,再观察nc的日志如下:
编写监视器
我们的目标是将 netcat 替换为一个更加完整的事件消费者,我们称之为 LogEventMonitor。 这个程序将:
(1)接收由 LogEventBroadcaster 广播的 UDP DatagramPacket;
(2)将它们解码为 LogEvent 消息;
(3)将 LogEvent 消息写出到 System.out。
解码
ChannelPipeline 中的第一个解码器LogEventDecoder 负责将传入的DatagramPacket 解码为 LogEvent 消息ChannelPipeline 中的第一个解码器LogEventDecoder 负责将传入的DatagramPacket 解码为 LogEvent 消息:
packagecom.dxz.nettydemo.chapter13; importjava.util.List; importio.netty.buffer.ByteBuf; importio.netty.channel.ChannelHandlerContext; importio.netty.channel.socket.DatagramPacket; importio.netty.handler.codec.MessageToMessageDecoder; importio.netty.util.CharsetUtil; public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket>{ @Override protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throwsException { ByteBuf data =datagramPacket.content(); int idx = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR); String filename = data.slice(0, idx).toString(CharsetUtil.UTF_8); String logMsg = data.slice(idx + 1, data.readableBytes()).toString(CharsetUtil.UTF_8); LogEvent event = newLogEvent(datagramPacket.sender(), System.currentTimeMillis(), filename, logMsg); out.add(event); } }
第二个 ChannelHandler 的工作是对第一个 ChannelHandler 所创建的 LogEvent 消息执行一些处理:
LogEventHandler 将以一种简单易读的格式打印 LogEvent 消息,包括以下的各项:
以毫秒为单位的被接收的时间戳;
发送方的 InetSocketAddress,其由 IP 地址和端口组成;
生成 LogEvent 消息的日志文件的绝对路径名;
实际上的日志消息,其代表日志文件中的一行。
packagecom.dxz.nettydemo.chapter13; importio.netty.channel.ChannelHandlerContext; importio.netty.channel.SimpleChannelInboundHandler; public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent>{ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException { cause.printStackTrace(); ctx.close(); } @Override public void messageReceived(ChannelHandlerContext ctx, LogEvent event) throwsException { StringBuilder builder = newStringBuilder(); builder.append(event.getReceivedTimestamp()); builder.append(" ["); builder.append(event.getSource().toString()); builder.append("] ["); builder.append(event.getLogfile()); builder.append("] : "); builder.append(event.getMsg()); System.out.println(builder.toString()); } }
现在我们需要将我们的LogEventDecoder 和LogEventHandler 安装到ChannelPipeline 中,通过 LogEventMonitor 主类来做到这一点:
packagecom.dxz.nettydemo.chapter13; importjava.net.InetSocketAddress; importio.netty.bootstrap.Bootstrap; importio.netty.channel.Channel; importio.netty.channel.ChannelInitializer; importio.netty.channel.ChannelOption; importio.netty.channel.ChannelPipeline; importio.netty.channel.EventLoopGroup; importio.netty.channel.nio.NioEventLoopGroup; importio.netty.channel.socket.nio.NioDatagramChannel; public classLogEventMonitor { private finalEventLoopGroup group; private finalBootstrap bootstrap; publicLogEventMonitor(InetSocketAddress address) { group = newNioEventLoopGroup(); bootstrap = newBootstrap(); bootstrap.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throwsException { ChannelPipeline pipeline =channel.pipeline(); pipeline.addLast(newLogEventDecoder()); pipeline.addLast(newLogEventHandler()); } }).localAddress(address); } publicChannel bind() { returnbootstrap.bind().syncUninterruptibly().channel(); } public voidstop() { group.shutdownGracefully(); } public static void main(String[] args) throwsException { /*if (args.length != 1) { throw new IllegalArgumentException("Usage: LogEventMonitor <port>"); }*/ String port = "8888"; LogEventMonitor monitor = new LogEventMonitor(newInetSocketAddress(Integer.parseInt(port))); try{ Channel channel =monitor.bind(); System.out.println("LogEventMonitor running"); channel.closeFuture().sync(); } finally{ monitor.stop(); } } }
测试:
新修改下广播的地址为localhost,启动发送端和监听端程序,往log.txt文件中增加“fffffffffffff”保存后,如下图: