Netty通过心跳保持长链接

摘要:
Netty有自己的心跳检测功能IdleStateHandler。客户端在写入空闲时主动发起心跳请求,服务器在收到心跳请求后发出心跳响应。如果客户端在某个时间范围内没有响应,则链接将断开。Java代码公共类NettyClient{public void connect(String remoteServer,int port)throws异常

 Netty自带心跳检测功能,IdleStateHandler,客户端在写空闲时主动发起心跳请求,服务器接受到心跳请求后给出一个心跳响应。当客户端在一定时间范围内不能够给出响应则断开链接。

Java代码  收藏代码
  1. public class NettyClient {  
  2.     public void connect(String remoteServer, int port) throws Exception {  
  3.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
  4.         try {  
  5.             Bootstrap b = new Bootstrap();  
  6.             b.group(workerGroup).channel(NioSocketChannel.class).remoteAddress(remoteServer, port)  
  7.                     .handler(new ChildChannelHandler());  
  8.   
  9.             ChannelFuture f = b.connect();  
  10.             System.out.println("Netty time Client connected at port " + port);  
  11.   
  12.             f.channel().closeFuture().sync();  
  13.         } finally {  
  14.             try {  
  15.                 TimeUnit.SECONDS.sleep(5);  
  16.                 try {  
  17.                     System.out.println("重新链接。。。");  
  18.                     connect(remoteServer, port);  
  19.                 } catch (Exception e) {  
  20.                     e.printStackTrace();  
  21.                 }  
  22.             } catch (Exception e) {  
  23.                 e.printStackTrace();  
  24.             }  
  25.         }  
  26.     }  
  27.   
  28.     public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {  
  29.   
  30.         @Override  
  31.         protected void initChannel(final SocketChannel ch) throws Exception {  
  32.             // -8表示lengthAdjustment,让解码器从0开始截取字节,并且包含消息头  
  33.             ch.pipeline().addLast(new RpcEncoder(NettyMessage.class)).addLast(new RpcDecoder(NettyMessage.class))  
  34.                     .addLast(new IdleStateHandler(120, 10, 0, TimeUnit.SECONDS)).addLast(new HeartBeatReqHandler());  
  35.         }  
  36.   
  37.     }  
  38.   
  39.     public static void main(String[] args) {  
  40.         try {  
  41.             new NettyClient().connect("127.0.0.1", 12000);  
  42.         } catch (Exception e) {  
  43.             e.printStackTrace();  
  44.         }  
  45.     }  
  46. }  
Java代码  收藏代码
  1. public class SerializationUtil {  
  2.   
  3.     private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();  
  4.   
  5.     private static Objenesis                objenesis    = new ObjenesisStd(true);  
  6.   
  7.     private static <T> Schema<T> getSchema(Class<T> clazz) {  
  8.         @SuppressWarnings("unchecked")  
  9.         Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);  
  10.         if (schema == null) {  
  11.             schema = RuntimeSchema.getSchema(clazz);  
  12.             if (schema != null) {  
  13.                 cachedSchema.put(clazz, schema);  
  14.             }  
  15.         }  
  16.         return schema;  
  17.     }  
  18.   
  19.     /** 
  20.      * 序列化 
  21.      * 
  22.      * @param obj 
  23.      * @return 
  24.      */  
  25.     public static <T> byte[] serializer(T obj) {  
  26.         @SuppressWarnings("unchecked")  
  27.         Class<T> clazz = (Class<T>) obj.getClass();  
  28.         LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);  
  29.         try {  
  30.             Schema<T> schema = getSchema(clazz);  
  31.             byte result[] = ProtostuffIOUtil.toByteArray(obj, schema, buffer);  
  32.             return result;  
  33.         } catch (Exception e) {  
  34.             throw new IllegalStateException(e.getMessage(), e);  
  35.         } finally {  
  36.             buffer.clear();  
  37.         }  
  38.     }  
  39.   
  40.     /** 
  41.      * 反序列化 
  42.      * 
  43.      * @param data 
  44.      * @param clazz 
  45.      * @return 
  46.      */  
  47.     public static <T> T deserializer(byte[] data, Class<T> clazz) {  
  48.         try {  
  49.             T obj = objenesis.newInstance(clazz);  
  50.             Schema<T> schema = getSchema(clazz);  
  51.             ProtostuffIOUtil.mergeFrom(data, obj, schema);  
  52.             return obj;  
  53.         } catch (Exception e) {  
  54.             throw new IllegalStateException(e.getMessage(), e);  
  55.         }  
  56.     }  
  57. }  
Java代码  收藏代码
  1. @SuppressWarnings("rawtypes")  
  2. public class RpcEncoder extends MessageToByteEncoder {  
  3.   
  4.     private Class<?> genericClass;  
  5.   
  6.     public RpcEncoder(Class<?> genericClass) {  
  7.         this.genericClass = genericClass;  
  8.     }  
  9.   
  10.     @Override  
  11.     public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {  
  12.         if (genericClass.isInstance(in)) {  
  13.             System.out.println("发送的请求是:"+in);  
  14.             byte[] data = SerializationUtil.serializer(in);  
  15.             out.writeInt(data.length);  
  16.             out.writeBytes(data);  
  17.         }  
  18.     }  
  19. }  
Java代码  收藏代码
  1. public class RpcDecoder extends ByteToMessageDecoder {  
  2.   
  3.     private Class<?> genericClass;  
  4.   
  5.     public RpcDecoder(Class<?> genericClass) {  
  6.         this.genericClass = genericClass;  
  7.     }  
  8.   
  9.     @Override  
  10.     public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)  
  11.             throws Exception {  
  12.         if (in.readableBytes() < 4) {  
  13.             return;  
  14.         }  
  15.         in.markReaderIndex();  
  16.         int dataLength = in.readInt();  
  17.         if (dataLength < 0) {  
  18.             ctx.close();  
  19.         }  
  20.         if (in.readableBytes() < dataLength) {  
  21.             in.resetReaderIndex();  
  22.         }  
  23.         byte[] data = new byte[dataLength];  
  24.         in.readBytes(data);  
  25.   
  26.         Object obj = SerializationUtil.deserializer(data, genericClass);  
  27.         System.out.println("接收到的消息是:"+obj);  
  28.         out.add(obj);  
  29.     }  
  30. }  
Java代码  收藏代码
  1. public class HeartBeatReqHandler extends ChannelDuplexHandler {  
  2.   
  3.     /** 
  4.      * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext, 
  5.      *      java.lang.Object) 
  6.      */  
  7.     @Override  
  8.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
  9.         if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {  
  10.             IdleStateEvent event = (IdleStateEvent) evt;  
  11.             if (event.state() == IdleState.READER_IDLE) {  
  12.                 System.out.println("read 空闲");  
  13.                 ctx.disconnect();  
  14.             } else if (event.state() == IdleState.WRITER_IDLE) {  
  15.                 System.out.println("write 空闲");  
  16.                 ctx.writeAndFlush(buildHeartBeat(MessageType.HEARTBEAT_REQ.getType()));  
  17.             }  
  18.         }  
  19.     }  
  20.   
  21.     /** 
  22.      *  
  23.      * @return 
  24.      * @author zhangwei<wei.zw@corp.netease.com> 
  25.      */  
  26.     private NettyMessage buildHeartBeat(byte type) {  
  27.         NettyMessage msg = new NettyMessage();  
  28.         Header header = new Header();  
  29.         header.setType(type);  
  30.         msg.setHeader(header);  
  31.         return msg;  
  32.     }  
  33.   
  34. }  
Java代码  收藏代码
  1. public class NettyServer {  
  2.     public void bind(int port) throws Exception {  
  3.         EventLoopGroup bossGroup = new NioEventLoopGroup();  
  4.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
  5.         try {  
  6.             ServerBootstrap b = new ServerBootstrap();  
  7.             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)  
  8.                     .childHandler(new ChildChannelHandler());  
  9.   
  10.             ChannelFuture f = b.bind(port).sync();  
  11.             System.out.println("Netty time Server started at port " + port);  
  12.             f.channel().closeFuture().sync();  
  13.         } finally {  
  14.             bossGroup.shutdownGracefully();  
  15.             workerGroup.shutdownGracefully();  
  16.         }  
  17.     }  
  18.   
  19.     public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {  
  20.   
  21.         @Override  
  22.         protected void initChannel(final SocketChannel ch) throws Exception {  
  23.             ch.pipeline().addLast(new RpcDecoder(NettyMessage.class)).addLast(new RpcEncoder(NettyMessage.class))  
  24.                     .addLast(new IdleStateHandler(120, 0, 0, TimeUnit.SECONDS)).addLast(new HeartBeatRespHandler());  
  25.         }  
  26.   
  27.     }  
  28.   
  29.     public static void main(String[] args) {  
  30.         try {  
  31.             new NettyServer().bind(12000);  
  32.         } catch (Exception e) {  
  33.             e.printStackTrace();  
  34.         }  
  35.     }  
  36. }  
Java代码  收藏代码
  1. public enum MessageType {  
  2.   
  3.     LOGIN_REQ((byte) 1), LOGIN_RESP((byte) 2), HEARTBEAT_REQ((byte) 3), HEARTBEAT_RESP((byte) 4);  
  4.     private byte type;  
  5.   
  6.     /** 
  7.      * @param type 
  8.      */  
  9.     private MessageType(byte type) {  
  10.         this.type = type;  
  11.     }  
  12.   
  13.     public byte getType() {  
  14.         return type;  
  15.     }  
  16.   
  17.     public void setType(byte type) {  
  18.         this.type = type;  
  19.     }  
  20.   
  21.     public static MessageType getMessageType(byte type) {  
  22.         for (MessageType b : MessageType.values()) {  
  23.             if (b.getType() == type) {  
  24.                 return b;  
  25.             }  
  26.         }  
  27.         return null;  
  28.     }  
  29.   
  30. }  
Java代码  收藏代码
  1. public class HeartBeatRespHandler extends SimpleChannelInboundHandler<NettyMessage> {  
  2.   
  3.     /** 
  4.      * @see io.netty.channel.SimpleChannelInboundHandler#channelRead0(io.netty.channel.ChannelHandlerContext, 
  5.      *      java.lang.Object) 
  6.      */  
  7.     @Override  
  8.     protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {  
  9.         if (msg.getHeader() != null && msg.getHeader().getType() == MessageType.HEARTBEAT_REQ.getType()) {  
  10.             NettyMessage heartBeat = buildHeartBeat(MessageType.HEARTBEAT_RESP.getType());  
  11.             ctx.writeAndFlush(heartBeat);  
  12.         } else {  
  13.             ctx.fireChannelRead(msg);  
  14.         }  
  15.     }  
  16.       
  17.   
  18.     /** 
  19.      * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext, 
  20.      *      java.lang.Object) 
  21.      */  
  22.     @Override  
  23.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
  24.         if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {  
  25.             IdleStateEvent event = (IdleStateEvent) evt;  
  26.             if (event.state() == IdleState.READER_IDLE) {  
  27.                 System.out.println("read 空闲 关闭链接");  
  28.                 ctx.disconnect();     
  29.             }   
  30.         }  
  31.     }  
  32.       
  33.   
  34.     /** 
  35.      *  
  36.      * @return 
  37.      * @author zhangwei<wei.zw@corp.netease.com> 
  38.      */  
  39.     private NettyMessage buildHeartBeat(byte type) {  
  40.         NettyMessage msg = new NettyMessage();  
  41.         Header header = new Header();  
  42.         header.setType(type);  
  43.         msg.setHeader(header);  
  44.         return msg;  
  45.     }  
  46.   
  47. }  
Java代码  收藏代码
  1. public class NettyMessage implements Serializable{  
  2.       
  3.     /**  */  
  4.     private static final long serialVersionUID = 1L;  
  5.   
  6.     private Header header;  
  7.       
  8.     private Object body;  
  9.   
  10.     public Header getHeader() {  
  11.         return header;  
  12.     }  
  13.   
  14.     public void setHeader(Header header) {  
  15.         this.header = header;  
  16.     }  
  17.   
  18.     public Object getBody() {  
  19.         return body;  
  20.     }  
  21.   
  22.     public void setBody(Object body) {  
  23.         this.body = body;  
  24.     }  
  25.   
  26.     /**  
  27.      * @see java.lang.Object#toString() 
  28.      */  
  29.     @Override  
  30.     public String toString() {  
  31.         return "NettyMessage [header=" + header + ", body=" + body + "]";  
  32.     }  
  33.       
  34.       
  35. }  
Java代码  收藏代码
  1. public class Header implements Serializable{  
  2.     /**  */  
  3.     private static final long serialVersionUID = 1L;  
  4.     private int crcCode=0xabef0101;  
  5.     private int length;  
  6.     private long sessionId;  
  7.     private byte type;  
  8.     private byte priority;  
  9.     private Map<String,Object> attachment=new HashMap<>();  
  10.     public int getCrcCode() {  
  11.         return crcCode;  
  12.     }  
  13.     public void setCrcCode(int crcCode) {  
  14.         this.crcCode = crcCode;  
  15.     }  
  16.     public int getLength() {  
  17.         return length;  
  18.     }  
  19.     public void setLength(int length) {  
  20.         this.length = length;  
  21.     }  
  22.     public long getSessionId() {  
  23.         return sessionId;  
  24.     }  
  25.     public void setSessionId(long sessionId) {  
  26.         this.sessionId = sessionId;  
  27.     }  
  28.     public byte getType() {  
  29.         return type;  
  30.     }  
  31.     public void setType(byte type) {  
  32.         this.type = type;  
  33.     }  
  34.     public byte getPriority() {  
  35.         return priority;  
  36.     }  
  37.     public void setPriority(byte priority) {  
  38.         this.priority = priority;  
  39.     }  
  40.     public Map<String, Object> getAttachment() {  
  41.         return attachment;  
  42.     }  
  43.     public void setAttachment(Map<String, Object> attachment) {  
  44.         this.attachment = attachment;  
  45.     }  
  46.     /**  
  47.      * @see java.lang.Object#toString() 
  48.      */  
  49.     @Override  
  50.     public String toString() {  
  51.         return "Header [crcCode=" + crcCode + ", length=" + length + ", sessionId=" + sessionId + ", type=" + type  
  52.                 + ", priority=" + priority + ", attachment=" + attachment + "]";  
  53.     }  
  54.       
  55.       
  56. }  

客户端的结果是:

Java代码  收藏代码
  1. etty time Client connected at port 12000  
  2. write 空闲  
  3. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]  
  4. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]  
  5. write 空闲  
  6. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]  
  7. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]  
  8. write 空闲  
  9. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]  
  10. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]  
  11. write 空闲  
  12. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]  
  13. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]  
  14. write 空闲  
  15. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]  
  16. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]  

免责声明:文章转载自《Netty通过心跳保持长链接》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇mysql字符串 转 int-double CAST与CONVERT 函数的用法asp.net2.0导出pdf文件完美解决方案(转载)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

关于Tcpdump抓包总结

一、简介 tcpdump是一个用于截取网络分组,并输出分组内容的工具。凭借强大的功能和灵活的截取策略,使其成为类UNIX系统下用于网络分析和问题排查的首选工具 tcpdump提供了源代码,公开了接口,因此具备很强的可扩展性,对于网络维护和入侵者都是非常有用的工具 tcpdump 支持针对网络层、协议、主机、网络或端口的过滤,并提供and、or、not等逻辑...

Json Schema简介

1. 引言 什么是Json Schema? 以一个例子来说明 假设有一个web api,接受一个json请求,返回某个用户在某个城市关系最近的若干个好友。一个请求的例子如下: { "city" : "chicago", "number": 20, "user" : { "name":"Alex",...

OM模块功能&amp;amp;API详解

(一)销售订单概述 1.1   与车间模块关系 当使用ATO类型订单时,订单管理模块会直接在车间模块中产生任务 1.2   与库存模块关系 在销售订单中使用的物料,单位等信息均来自库存模块,在订单执行过程中,按订单保留及销售发运等功能也会对库存模块起作用 1.3   与应收模块关系 销售完成后,订单管理模块会在应收接口中产生INVOICE信息,影响应收...

为什么一个Http Header中的空格会被骇客利用

导读:本文通过一个Netty的一个issue来学习什么是"http request smuggling"、它产生的原因与解决方法,从而对http协议有进一步了解。 前言 前阵子在Netty的issue里有人提了一个问题 http request smuggling, cause by obfuscating TE header,描述了一个Netty的...

nginx+tomcat负载均衡搭建

一、      单独部署tomcat和nginx Nginx版本:nginx-1.13.5 Tomcat版本:apache-tomcat-8.5.8 操作系统:win10 必须先部署一个tomcat服务器。Tomcat服务器部署好后,登录tomcat服务器:localhost:8080 显示如下界面,说明部署成功。关于tomcat的部署就不详细介绍了。  ...

Netty入门

一、是什么 Netty是一个高性能、异步事件驱动、基于Java NIO的异步的可扩展的客户端/服务器网络编程框架。 Netty提供了对 TCP、UDP 和文件传输的支持,作为一个异步 NIO 框架,Netty 的所有 IO 操作都是异步非阻塞的,通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。 Net...