Netty服务端与客户端(源码一)

摘要:
  Netty时间服务器服务端TimeServer:1packagenetty;23importio.netty.bootstrap.ServerBootstrap;4importio.netty.channel.ChannelFuture;5importio.netty.channel.ChannelInitializer;6importio.netty.channel.ChannelOption;7importio.netty.channel.EventLoopGroup;8importio.netty.channel.nio.NioEventLoopGroup;9importio.netty.channel.socket.SocketChannel;10importio.netty.channel.socket.nio.NioServerSocketChannel;111213publicclassTimeServer{1415publicvoidbindthrowsException{16//配置服务端的NIO线程组一个用于服务端接收客户端的连接,另一个用于进行SocketChannel的网络读写17EventLoopGroupbossGroup=newNioEventLoopGroup();18EventLoopGroupworkerGroup=newNioEventLoopGroup();19try{20ServerBootstrapb=newServerBootstrap();21b.group22.channel23.option24.childHandler;25//绑定端口,同步等待成功26ChannelFuturef=b.bind.sync();27//等待服务器监听端口关闭28f.channel().closeFuture().sync();29}finally{30//优雅退出,释放线程池资源31bossGroup.shutdownGracefully();32workerGroup.shutdownGracefully();33}34}3536privateclassChildChannelHandlerextendsChannelInitializer{37protectedvoidinitChannelthrowsException{38arg0.pipeline().addLast;39}40}41}ServerBootstrap是Netty用于启动NIO服务端的辅助类,目的是降低服务端的开发难度。使用bind绑定监听端口,随后,调用它的同步阻塞方法sync等待绑定操作完成,完成后Netty会返回一个ChannelFuture,主要用于异步操作的通知回调。

首先,整理NIO进行服务端开发的步骤:

(1)创建ServerSocketChannel,配置它为非阻塞模式

(2)绑定监听配置TCP参数backlog的大小

(3)创建一个独立的I/O线程,用于轮询多路复用器Selector

(4)创建Selector,将之前创建的ServerSocketChannel注册到Selector上监听SelectionKeyACCEPT

(5)启动I/O线程,在循环体中执行Selector.select()方法,轮训就绪的Channel。

(6)当轮询到了处于就绪状态的Channel时,需要对其进行判断,如果是OP_ACCEPT状态,说明是新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端。

(7)设置新接入的客户端链路SocketChannel为非阻塞模式,配置其他的一些TCP参数。

(8)将SocketChannel注册到Selector,监听OP_READ操作位

(9)如果轮询的Channel为OP_READ,则说明SocketChannel中有新的就绪的数据包需要读取,则构造ByteBuffer对象,读取数据包。

(10)如果轮询的Channel为OP_WRITE,则说明还有数据没有发送完成,需要继续发送。  

Netty时间服务器服务端 TimeServer:

1 packagenetty;
2 
3 importio.netty.bootstrap.ServerBootstrap;
4 importio.netty.channel.ChannelFuture;
5 importio.netty.channel.ChannelInitializer;
6 importio.netty.channel.ChannelOption;
7 importio.netty.channel.EventLoopGroup;
8 importio.netty.channel.nio.NioEventLoopGroup;
9 importio.netty.channel.socket.SocketChannel;
10 importio.netty.channel.socket.nio.NioServerSocketChannel;
11 
12 
13 public classTimeServer {
14     
15     public void bind(int port) throwsException{
16         //配置服务端的NIO线程组 一个用于服务端接收客户端的连接,另一个用于进行SocketChannel的网络读写
17         EventLoopGroup bossGroup = newNioEventLoopGroup();
18         EventLoopGroup workerGroup = newNioEventLoopGroup();
19         try{
20             ServerBootstrap b = newServerBootstrap();
21 b.group(bossGroup,workerGroup)
22             .channel(NioServerSocketChannel.class)
23             .option(ChannelOption.SO_BACKLOG, 1024)
24             .childHandler(newChildChannelHandler());
25             //绑定端口,同步等待成功
26             ChannelFuture f =b.bind(port).sync();
27             //等待服务器监听端口关闭
28 f.channel().closeFuture().sync();
29         }finally{
30             //优雅退出,释放线程池资源
31 bossGroup.shutdownGracefully();
32 workerGroup.shutdownGracefully();
33 }
34 }
35     
36     private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
37         protected void initChannel(SocketChannel arg0) throwsException{
38             arg0.pipeline().addLast(newTimeServerHandler());
39 }
40 }
41 }

ServerBootstrap是Netty用于启动NIO服务端的辅助类,目的是降低服务端的开发难度。

绑定childChannelHandler,其作用类似于Reactor模式中的handler类,主要用于处理网络I/O事件,例如记录日志、对消息进行编解码等。

使用bind绑定监听端口,随后,调用它的同步阻塞方法sync等待绑定操作完成,完成后Netty会返回一个ChannelFuture,主要用于异步操作的通知回调

Netty时间服务器服务端 TimeServerHandler:

1 packagenetty;
2 
3 importjava.io.IOException;
4 importio.netty.buffer.ByteBuf;
5 importio.netty.buffer.Unpooled;
6 importio.netty.channel.ChannelHandlerAdapter;
7 importio.netty.channel.ChannelHandlerContext;
8 
9 public class TimeServerHandler extendsChannelHandlerAdapter{
10     
11     public void channelRead(ChannelHandlerContext ctx,Object msg) throwsIOException{
12         //将msg转换成Netty的ByteBuf对象
13         ByteBuf buf =(ByteBuf)msg;
14         //将缓冲区中的字节数组复制到新建的byte数组中,
15         byte[] req = new byte[buf.readableBytes()];
16 buf.readBytes(req);
17         //获取请求消息
18         String body = new String(req,"UTF-8");
19         System.out.println("The time server receive order:" +body);
20         //如果是"QUERY TIME ORDER"则创建应答消息
21         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? newjava.util.Date(
22                 System.currentTimeMillis()).toString() : "BAD ORDER";
23                 ByteBuf resp =Unpooled.copiedBuffer(currentTime.getBytes());
24         //异步发送应答消息给客户端
25 ctx.write(resp);
26 }
27     
28     public void channelReadComplete(ChannelHandlerContext ctx) throwsException{
29 ctx.flush();
30 }
31     
32     public voidexceptionCaught(ChannelHandlerContext ctx,Throwable cause){
33 ctx.close();
34 }
35 }

相比昨天原生的NIO服务端,代码量大大减少。

Netty时间服务器客户端 TimeClient:

1 packagenetty;
2 
3 importio.netty.bootstrap.Bootstrap;
4 importio.netty.channel.ChannelFuture;
5 importio.netty.channel.ChannelInitializer;
6 importio.netty.channel.ChannelOption;
7 importio.netty.channel.EventLoopGroup;
8 importio.netty.channel.nio.NioEventLoopGroup;
9 importio.netty.channel.socket.SocketChannel;
10 importio.netty.channel.socket.nio.NioSocketChannel;
11 
12 public classTimeClient {
13     
14     public void connect(int port,String host) throwsException{
15         //创建客户端处理I/O读写的NioEventLoopGroup Group线程组
16         EventLoopGroup group = newNioEventLoopGroup();
17         try{
18             //创建客户端辅助启动类Bootstrap
19             Bootstrap b = newBootstrap();
20             b.group(group).channel(NioSocketChannel.class)
21             .option(ChannelOption.TCP_NODELAY, true)
22             .handler(new ChannelInitializer<SocketChannel>(){
23                 //将ChannelHandler设置到ChannelPipleline中,用于处理网络I/O事件
24 @Override
25                 protected void initChannel(SocketChannel ch) throwsException {
26                     ch.pipeline().addLast(newTimeClientHandler());
27 }
28 });
29             //发起异步连接操作,然后调用同步方法等待连接成功。
30             ChannelFuture f =b.connect(host,port).sync();
31             
32             //等待客户端链路关闭
33 f.channel().closeFuture().sync();
34         }finally{
35             //优雅退出,释放NIO线程组
36 group.shutdownGracefully();
37 }
38 }
39     
40     public static void main(String[] args) throwsException{
41         int port = 8080;
42         if(args != null && args.length > 0){
43             try{
44                 port = Integer.valueOf(args[0]);
45             }catch(NumberFormatException e){
46                 //采用默认值
47 }
48 }
49         new TimeClient().connect(port, "127.0.0.1");
50 }
51     
52 }

Netty时间服务器客户端 TimeClientHandler:

1 packagenetty;
2 
3 importio.netty.buffer.ByteBuf;
4 importio.netty.buffer.Unpooled;
5 importio.netty.channel.ChannelHandlerAdapter;
6 importio.netty.channel.ChannelHandlerContext;
7 
8 importjava.util.logging.Logger;
9 
10 public class TimeClientHandler extendsChannelHandlerAdapter{
11     
12     private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
13     
14     private finalByteBuf firstMessage;
15     
16     publicTimeClientHandler(){
17         byte[] req = "QUERY TIME ORDER".getBytes();
18         firstMessage =Unpooled.buffer(req.length);
19 firstMessage.writeBytes(req);
20 }
21     
22     //当客户端与服务端TCP链路简历成功后,Netty的NIO线程会调用该方法,发送查询时间的指令给服务器
23     public voidchannelActive(ChannelHandlerContext ctx){
24         //将请求消息发送给服务端
25 ctx.writeAndFlush(firstMessage);
26 }
27     
28     //当服务器返回应答消息时,该方法被调用
29     public void channelRead(ChannelHandlerContext ctx,Object msg) throwsException{
30         ByteBuf buf =(ByteBuf) msg;
31         byte[] req = new byte[buf.readableBytes()];
32 buf.readBytes(req);
33         String body = new String(req,"UTF-8");
34         System.out.println("Now is :" +body);
35 }
36     
37     public voidexceptionCaught(ChannelHandlerContext ctx,Throwable cause){
38         
39         //释放资源
40         logger.warning("Unexpected exception from downstream :" +cause.getMessage());
41 ctx.close();
42 }
43 }

运行结果:

Server:

Netty服务端与客户端(源码一)第1张

Client:

Netty服务端与客户端(源码一)第2张

免责声明:文章转载自《Netty服务端与客户端(源码一)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇MyBatis 映射文件详解(六)#Weex与Android交互(一)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

分享几种Linux软件的安装方法

Linux软件安装由于不同的Linux分支,安装方法也互不相同,介绍几种常见的安装方法。 http://wenku.baidu.com/link?url=hrOBvu_P-joieXLZfbUjkyRXMHC_CgeAZWjTTtiKKZZhcmNBTILoH2he0TJ9GuhCr75ud4IDuZohhHjzK3B_YPhCkWJ30umXLzdJZG...

opendpi 源码分析(二)

先介绍opendpi如何设置各个协议的处理,然后介绍如何处理每一个包,最后是软件的流程图。 第一:首先看下要用到的重要的宏 View Code #define IPOQUE_SAVE_AS_BITMASK(bitmask,value) (bitmask)=(((IPOQUE_PROTOCOL_BITMASK)1)<<(value)) #de...

天龙源码分析 客户端登录流程

1 登录状态定义 //登录状态 enum PLAYER_LOGIN_STATUS { LOGIN_DEBUG_SETTING, //!< — FOR DEBUG 用户参数 LOGIN_SELECT_SERVER, // 选择服务器界面. LOGIN_DISCONNECT, /...

原!linux机器 配置自动scp脚本

 方式一: 1.安装相关依赖包 yum install -y tcl tclx tcl-develyum -y install expect 2.脚本 scp.sh #!/usr/bin/expect #获取输入参数set f1 [lindex $argv 0]set f2 [lindex $argv 1]set dir [lindex $argv 2]s...

开源项目推荐:Qt有关的GitHub/Gitee开源项目

尊重作者,支持原创,如需转载,请附上原地址:https://libaineu2004.blog.csdn.net/article/details/77369837 Q:想请教下Qt5 之后推出的qml与之前qt4的ui 开发方式,有冲突吗?我公司开发桌面程序,是两种方式兼用?还是选择其中一种?A:桌面推荐使用QWidget,触摸式的嵌入式设备推荐使用QML...

ByteBuf Netty的数据容器

两个组件 ByteBuf ByteBufHolder  使用模式 1.堆缓冲区 backing array模式 直接缓冲区 直接缓冲区的内容将驻留在常规的会被垃圾回收的堆之外。 复合缓冲区 CompositeByteBuf 为了举例说明,让我们考虑一下一个由两部分——头部和主体——组成的将通过 HTTP 协议传输的消息。这两部分由应用程序的不同模...