netty实现长连接心跳检

摘要:
主要逻辑:Netty用于实现长连接,服务器和客户端之间的连接主要通过心跳来维护。实现的逻辑主要如下:在服务器端:1.在服务器空闲一段时间后,服务器的故障心跳计数器增加1.2.如果接收到客户端的ping心跳包,则重置故障心跳计数器。如果连续n次未收到客户端的ping心跳数据包,请关闭链接,释放资源,等待客户端重新连接。在客户端:1.当客户端网络空闲并且在一定时间内没有写操作时,发送ping心跳包。2,如果

主要逻辑

使用netty实现长连接,主要靠心跳来维持服务器端及客户端连接。

实现的逻辑主要是:

服务器端方面

1, 服务器在网络空闲操作一定时间后,服务端失败心跳计数器加1。

2, 如果收到客户端的ping心跳包,则清零失败心跳计数器,如果连续n次未收到客户端的ping心跳包,则关闭链路,释放资源,等待客户端重连。


客户端方面

1, 客户端网络空闲在一定时间内没有进行写操作时,则发送一个ping心跳包。

2, 如果服务器端未在发送下一个心跳包之前回复pong心跳应答包,则失败心跳计数器加1。

3, 如果客户端连续发送n(此处根据具体业务进行定义)次ping心跳包,服务器端均未回复pong心跳应答包,则客户端断开连接,间隔一定时间进行重连操作,直至连接服务器成功。

环境:netty5,tomcat7,jdk7,myeclipse

服务器端心跳处理类:

  1. public class HeartBeatRespHandler extends ChannelInboundHandlerAdapter {   
  2.     private  final Logger log=Logger.getLogger(HeartBeatRespHandler.class);  
  3.        //线程安全心跳失败计数器  
  4.        private AtomicInteger unRecPingTimes = new AtomicInteger(1);  
  5.        @Override  
  6.        public void channelRead(ChannelHandlerContext ctx, Object msg)    
  7.                 throws Exception {    
  8.            NettyMessageProto message = (NettyMessageProto)msg;  
  9.            unRecPingTimes = new AtomicInteger(1);  
  10.            //接收客户端心跳信息  
  11.            if(message.getHeader() != null  && message.getHeader().getType() == Constants.MSGTYPE_HEARTBEAT_REQUEST){  
  12.                 //清零心跳失败计数器  
  13.                 log.info("server receive client"+ctx.channel().attr(SysConst.SERIALNO_KEY)+" ping msg :---->"+message);  
  14.                 //接收客户端心跳后,进行心跳响应  
  15.                 NettyMessageProto replyMsg = buildHeartBeat();  
  16.                 ctx.writeAndFlush(replyMsg);  
  17.             }else{  
  18.                 ctx.fireChannelRead(msg);  
  19.             }  
  20.         }  
  21.          
  22.          
  23.         /** 
  24.          * 事件触发器,该处用来处理客户端空闲超时,发送心跳维持连接。 
  25.          */  
  26.         @Override  
  27.         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {    
  28.             if (evt instanceof IdleStateEvent) {    
  29.                 IdleStateEvent event = (IdleStateEvent) evt;    
  30.                 if (event.state() == IdleState.READER_IDLE) {    
  31.                     /*读超时*/    
  32.                     log.info("===服务器端===(READER_IDLE 读超时)");  
  33.                     unRecPingTimes.getAndIncrement();   
  34.                   //客户端未进行ping心跳发送的次数等于3,断开此连接  
  35.                     if(unRecPingTimes.intValue() == 3){    
  36.                           
  37.                           ctx.disconnect();  
  38.                           System.out.println("此客户端连接超时,服务器主动关闭此连接....");  
  39.                           log.info("此客户端连接超时,服务器主动关闭此连接....");  
  40.                     }   
  41.                 } else if (event.state() == IdleState.WRITER_IDLE) {    
  42.                     /*服务端写超时*/       
  43.                     log.info("===服务器端===(WRITER_IDLE 写超时)");  
  44.                       
  45.                 } else if (event.state() == IdleState.ALL_IDLE) {    
  46.                     /*总超时*/    
  47.                     log.info("===服务器端===(ALL_IDLE 总超时)");    
  48.                 }    
  49.             }    
  50.         }  
  51.           
  52.          
  53.        /** 
  54.         * 创建心跳响应消息 
  55.         * @return 
  56.         */  
  57.        private NettyMessageProto buildHeartBeat(){  
  58.            HeaderProto header = HeaderProto.newBuilder().setType(Constants.MSGTYPE_HEARTBEAT_RESPONSE).build();  
  59.            NettyMessageProto message =NettyMessageProto.newBuilder().setHeader(header).build();  
  60.            return message;  
  61.        }  

客户端心跳处理类:
  1. public class HeartBeatReqHandler extends ChannelHandlerAdapter {  
  2.     private  final Logger log=Logger.getLogger(HeartBeatReqHandler.class);  
  3.       
  4.     //线程安全心跳失败计数器  
  5.     private AtomicInteger unRecPongTimes = new AtomicInteger(1);  
  6.       
  7.     public void channelRead(ChannelHandlerContext ctx, Object msg)    
  8.             throws Exception {    
  9.         NettyMessageProto message = (NettyMessageProto)msg;    
  10.         //服务器端心跳回复  
  11.         if(message.getHeader() != null  && message.getHeader().getType() == Constants.MSGTYPE_HEARTBEAT_RESPONSE){  
  12.             //如果服务器进行pong心跳回复,则清零失败心跳计数器  
  13.             unRecPongTimes = new AtomicInteger(1);  
  14.             log.debug("client receive server pong msg :---->"+message);  
  15.         }else{  
  16.             ctx.fireChannelRead(msg);  
  17.         }  
  18.     }    
  19.       
  20.     /** 
  21.      * 事件触发器,该处用来处理客户端空闲超时,发送心跳维持连接。 
  22.      */  
  23.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {    
  24.         if (evt instanceof IdleStateEvent) {    
  25.             IdleStateEvent event = (IdleStateEvent) evt;    
  26.             if (event.state() == IdleState.READER_IDLE) {    
  27.                 /*读超时*/    
  28.                 log.info("===客户端===(READER_IDLE 读超时)");  
  29.             } else if (event.state() == IdleState.WRITER_IDLE) {    
  30.                 /*客户端写超时*/       
  31.                 log.info("===客户端===(WRITER_IDLE 写超时)");  
  32.                 unRecPongTimes.getAndIncrement();    
  33.                 //服务端未进行pong心跳响应的次数小于3,则进行发送心跳,否则则断开连接。  
  34.                 if(unRecPongTimes.intValue() < 3){    
  35.                     //发送心跳,维持连接  
  36.                     ctx.channel().writeAndFlush(buildHeartBeat()) ;   
  37.                     log.info("客户端:发送心跳");  
  38.                 }else{    
  39.                     ctx.channel().close();    
  40.                 }    
  41.             } else if (event.state() == IdleState.ALL_IDLE) {    
  42.                 /*总超时*/    
  43.                 log.info("===客户端===(ALL_IDLE 总超时)");    
  44.             }    
  45.         }    
  46.     }  
  47.           
  48.     private NettyMessageProto buildHeartBeat(){  
  49.         HeaderProto header = HeaderProto.newBuilder().setType(Constants.MSGTYPE_HEARTBEAT_REQUEST).build();  
  50.         NettyMessageProto  message = NettyMessageProto.newBuilder().setHeader(header).build();  
  51.         return message;  
  52.     }  
  53.       
  54.     /** 
  55.      * 异常处理 
  56.      */  
  57.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception{  
  58.         ctx.fireExceptionCaught(cause);  
  59.     }  
  60.   
  61. }  


  1. <pre code_snippet_id="2489110" snippet_file_name="blog_20170719_2_6056366" name="code" class="java"><pre code_snippet_id="2489110" snippet_file_name="blog_20170719_2_6056366"></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre></pre>  
  2. <pre></pre>  
  3. <pre></pre>  
  4. <pre></pre>  
  5. <pre></pre>  
  6. <link rel="stylesheet" href="http://static.blog.csdn.net/public/res-min/markdown_views.css?v=1.0">  
  7.                         
 
 

版权声明:本文为博主原创文章,未经博主允许不得转载。

免责声明:文章转载自《netty实现长连接心跳检》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇C#模拟鼠标键盘控制其他窗口(一)数据可视化之powerBI技巧(二十一)简单三个步骤,轻松管理你的Power BI度量值下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

netty源码分析(一)

一、先看服务端的入门示例代码 public class MyServer { public static void main(String[] args) throws InterruptedException { //第一步 创建bossGroup 接受数据然后转发给workerGroup,是一个死循环 Even...

keepalived配置日志

1.编辑配置文件/etc/sysconfig/keepalived,将第14行的KEEPALIVED_OPTIONS="-D"修改为KEEPALIVED_OPTIONS="-D -d -S 0" sudo sed -i '14 s#KEEPALIVED_OPTIONS="-D"#KEEPALIVED_OPTIONS="-D -d -S 0"#g' /etc...

laravel的monolog使用

Laravel 集成了Monolog日志函数库,Monolog 支持和提供多种强大的日志处理功能。 1、设置,日志模式 (1)Laravel 提供可立即使用的single、daily、syslog和errorlog日志模式。 例如,如果你想要每天保存一个日志文件,而不是单个文件,则可以在config/app.php配置文件内设置log变量:'log' =&...

五、安装Kibana

kibana简介 kibana将收取到存放在es数据库中的日志通过web页面展示出来,更加方便我们直观的去查看日志内容跟告警。 kibana安装 注意安装版本要与es数据库版本一致,我这的es版本为7.9.3,kibana版本也必须为7.9.3 1、下载官网下载 我这选择的是kibana7.9.3的RPM包 2、安装rpm包 rpm -ivh kibana...

java网络编程,简单的客户端和服务器端

1.服务器端 import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.ServerSocket; import java.net.Socke...

linux安装mongodb,设为全局和后台启动

curl -O https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-3.6.5.tgz # 下载 tar -zxvf mongodb-linux-x86_64-3.6.5.tgz # 解压 mv mongodb-linux-x86_6...