Netty实现服务端客户端长连接通讯及心跳检测

摘要:
摘要:服务器和客户端之间的长连接通信由netty实现,心跳检测由netty完成。基本思想:netty服务器通过映射保存所有连接的客户端SocketChannels,客户端ID用作映射的密钥。每当服务器想要向客户端发送消息时,它只需根据ClientId取出相应的SocketChannel并向其写入消息。
       摘要: 通过netty实现服务端与客户端的长连接通讯,及心跳检测

      通过netty实现服务端与客户端的长连接通讯,及心跳检测。

       基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。

        环境JDK1.8 和netty5

        以下是具体的代码实现和介绍:

1公共的Share部分(主要包含消息协议类型的定义)

     设计消息类型:

public enum MsgType {
  PING,ASK,REPLY,LOGIN
}

 Message基类:

//必须实现序列,serialVersionUID 一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!!
public abstract class BaseMsg implements Serializable {
  private static final long serialVersionUID = 1L;
  rivate MsgType type;
  // 必须唯一,否者会出现channel调用混乱
  private String clientId;

  // 初始化客户端id
  public BaseMsg() {
    this.clientId = Constants.getClientId();
  }

  public String getClientId() {
    return clientId;
  }

  public void setClientId(String clientId) {
    this.clientId = clientId;
  }

  public MsgType getType() {
    return type;
  }

  public void setType(MsgType type) {
    this.type = type;
  }
}

常量设置:

public class Constants {
  private static String clientId;

  public static String getClientId() {
    return clientId;
  }

  public static void setClientId(String clientId) {
    Constants.clientId = clientId;
  }
}

登录类型消息:

public class LoginMsg extends BaseMsg {
  private String userName;
  private String password;
  public LoginMsg() {
    super();
    setType(MsgType.LOGIN);
  }

  public String getUserName() {
    return userName;
  }

  public void setUserName(String userName) {
    this.userName = userName;
  }

  public String getPassword() {
    return password;
  }

  public void setPassword(String password) {
    this.password = password;
  }
}

心跳检测Ping类型消息:

public class PingMsg extends BaseMsg {
  public PingMsg() {
    super();
    setType(MsgType.PING);
  }
}

请求类型消息:

public class AskMsg extends BaseMsg {
  public AskMsg() {
    super();
    setType(MsgType.ASK);
}

private AskParams params;

public AskParams getParams() {
  return params;
}

public void setParams(AskParams params) {
  this.params = params;
 }
}

// 请求类型参数
// 必须实现序列化接口
public class AskParams implements Serializable {
  private static final long serialVersionUID = 1L;
  private String auth;

  public String getAuth() {
    return auth;
  }

  public void setAuth(String auth) {
    this.auth = auth;
  }
}

响应类型消息:

public class ReplyMsg extends BaseMsg {
  public ReplyMsg() {
    super();
    setType(MsgType.REPLY);
  }
  private ReplyBody body;

  public ReplyBody getBody() {
    return body;
  }

  public void setBody(ReplyBody body) {
    this.body = body;
  }
}
//相应类型body对像
public class ReplyBody implements Serializable {
  private static final long serialVersionUID = 1L;
}
public class ReplyClientBody extends ReplyBody {
  private String clientInfo;

  public ReplyClientBody(String clientInfo) {
    this.clientInfo = clientInfo;
  }

  public String getClientInfo() {
    return clientInfo;
  }

  public void setClientInfo(String clientInfo) {
    this.clientInfo = clientInfo;
  }
}
public class ReplyServerBody extends ReplyBody {
  private String serverInfo;
  public ReplyServerBody(String serverInfo) {
    this.serverInfo = serverInfo;
  }
  public String getServerInfo() {
    return serverInfo;
  }
  public void setServerInfo(String serverInfo) {
    this.serverInfo = serverInfo;
  }
}

2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.

Map:

public class NettyChannelMap {
  private static Map<String,SocketChannel> map=new ConcurrentHashMap<String, SocketChannel>();
  public static void add(String clientId,SocketChannel socketChannel){
    map.put(clientId,socketChannel);
  }
  public static Channel get(String clientId){
    return map.get(clientId);
  }
  public static void remove(SocketChannel socketChannel){
    for (Map.Entry entry:map.entrySet()){
      if (entry.getValue()==socketChannel){
        map.remove(entry.getKey());
       }
    }
  }

}

Handler:

public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> {
  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    //channel失效,从Map中移除
    NettyChannelMap.remove((SocketChannel)ctx.channel());
  }
  @Override
  protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {

    if(MsgType.LOGIN.equals(baseMsg.getType())){
      LoginMsg loginMsg=(LoginMsg)baseMsg;
      if("robin".equals(loginMsg.getUserName())&&"yao".equals(loginMsg.getPassword())){
          //登录成功,把channel存到服务端的map中
          NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());
          System.out.println("client"+loginMsg.getClientId()+" 登录成功");
        }
      }else{
          if(NettyChannelMap.get(baseMsg.getClientId())==null){
          //说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录
          LoginMsg loginMsg=new LoginMsg();
          channelHandlerContext.channel().writeAndFlush(loginMsg);
        }
      }
    switch (baseMsg.getType()){
      case PING:{
          PingMsg pingMsg=(PingMsg)baseMsg;
          PingMsg replyPing=new PingMsg();
          NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
      }break;
      case ASK:{
          //收到客户端的请求
          AskMsg askMsg=(AskMsg)baseMsg;
          if("authToken".equals(askMsg.getParams().getAuth())){
            ReplyServerBody replyBody=new ReplyServerBody("server info $$$$ !!!");
            ReplyMsg replyMsg=new ReplyMsg();
            replyMsg.setBody(replyBody);
            NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
           }
      }break;
      case REPLY:{
          //收到客户端回复
          ReplyMsg replyMsg=(ReplyMsg)baseMsg;
          ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();
          System.out.println("receive client msg: "+clientBody.getClientInfo());
      }break;
        default:break;
      }
      ReferenceCountUtil.release(baseMsg);
    }
}

ServerBootstrap:

public class NettyServerBootstrap {
  private int port;
  private SocketChannel socketChannel;
  public NettyServerBootstrap(int port) throws InterruptedException {
    this.port = port;
    bind();
  }

  private void bind() throws InterruptedException {
    EventLoopGroup boss=new NioEventLoopGroup();
    EventLoopGroup worker=new NioEventLoopGroup();
    ServerBootstrap bootstrap=new ServerBootstrap();
    bootstrap.group(boss,worker);
    bootstrap.channel(NioServerSocketChannel.class);
    bootstrap.option(ChannelOption.SO_BACKLOG, 128);
    //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
    bootstrap.option(ChannelOption.TCP_NODELAY, true);
    //保持长连接状态
    bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline p = socketChannel.pipeline();
        p.addLast(new ObjectEncoder());
        p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
        p.addLast(new NettyServerHandler());
      }
     });
    ChannelFuture f= bootstrap.bind(port).sync();
    if(f.isSuccess()){
      System.out.println("server start---------------");
      }
    }
  public static void main(String []args) throws InterruptedException {
    NettyServerBootstrap bootstrap=new NettyServerBootstrap(9999);
    while (true){
      SocketChannel channel=(SocketChannel)NettyChannelMap.get("001");
      if(channel!=null){
        AskMsg askMsg=new AskMsg();
        channel.writeAndFlush(askMsg);
    }
    TimeUnit.SECONDS.sleep(5);
    }
  }
}

3 Client端:包含发起登录,发送心跳,及对应消息处理

handler:

public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg> {
  //利用写空闲发送心跳检测消息
  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
      IdleStateEvent e = (IdleStateEvent) evt;
      switch (e.state()) {
      case WRITER_IDLE:
        PingMsg pingMsg=new PingMsg();
        ctx.writeAndFlush(pingMsg);
        System.out.println("send ping to server----------");
        break;
        default:
        break;
      }
   }
}
@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {
   MsgType msgType=baseMsg.getType();
   switch (msgType){
     case LOGIN:{
       //向服务器发起登录
       LoginMsg loginMsg=new LoginMsg();
       loginMsg.setPassword("yao");
       loginMsg.setUserName("robin");
       channelHandlerContext.writeAndFlush(loginMsg);
     }break;
     case PING:{
        System.out.println("receive ping from server----------");
     }break;
     case ASK:{
        ReplyClientBody replyClientBody=new ReplyClientBody("client info **** !!!");
        ReplyMsg replyMsg=new ReplyMsg();
        replyMsg.setBody(replyClientBody);
        channelHandlerContext.writeAndFlush(replyMsg);
      }break;
     case REPLY:{
        ReplyMsg replyMsg=(ReplyMsg)baseMsg;
        ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody();
        System.out.println("receive client msg: "+replyServerBody.getServerInfo());
      }
      default:break;
    }
    ReferenceCountUtil.release(msgType);
  }
}

bootstrap

public class NettyClientBootstrap {
  private int port;
  private String host;
  private SocketChannel socketChannel;
  private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);
  public NettyClientBootstrap(int port, String host) throws InterruptedException {
    this.port = port;
    this.host = host;
    start();
  }
  private void start() throws InterruptedException {
    EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
    Bootstrap bootstrap=new Bootstrap();
    bootstrap.channel(NioSocketChannel.class);
    bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
    bootstrap.group(eventLoopGroup);
    bootstrap.remoteAddress(host,port);
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
  protected void initChannel(SocketChannel socketChannel) throws Exception {
    socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));
    socketChannel.pipeline().addLast(new ObjectEncoder());
    socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
    socketChannel.pipeline().addLast(new NettyClientHandler());
  }
});
  ChannelFuture future =bootstrap.connect(host,port).sync();
    if (future.isSuccess()) {
    socketChannel = (SocketChannel)future.channel();
    System.out.println("connect server 成功---------");
   }
  }
  public static void main(String[]args) throws InterruptedException {
    Constants.setClientId("001");
    NettyClientBootstrap bootstrap=new NettyClientBootstrap(9999,"localhost");

    LoginMsg loginMsg=new LoginMsg();
    loginMsg.setPassword("yao");
    loginMsg.setUserName("robin");
    bootstrap.socketChannel.writeAndFlush(loginMsg);
    while (true){
      TimeUnit.SECONDS.sleep(3);
      AskMsg askMsg=new AskMsg();
      AskParams askParams=new AskParams();
      askParams.setAuth("authToken");
      askMsg.setParams(askParams);
      bootstrap.socketChannel.writeAndFlush(askMsg);
    }
  }
}

免责声明:文章转载自《Netty实现服务端客户端长连接通讯及心跳检测》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇JQuery 之 获取 radio选中值,select选中值2、webpack基础配置下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Qt调试信息重定向输出(qInstallMessageHandler)

由于工具需要,做了一小段Qt5测试代码,参考了网友的案例测试了以下功能 1 qDebug()重定向输出QT窗口 2 qDebug()信息保存到本地文件 QtMessageHandler qInstallMessageHandler(QtMessageHandler handler)   此函数在使用Qt消息处理程序之前已定义。返回一个指向前一个消息处理程序...

使用ionic3快速开发webapp(二)

本文整理了使用ionic3开发时会用到的一些最基本组件及用法 1、ion-tabs 最常见的通过标签切换页面: tabs.html 1 <ion-tabs> 2 <ion-tab [root]="tab1Root" tabTitle="首页" tabIcon="home"></ion-tab> 3 <ion...

C#程序员开发WinForm必须知道的 Window 消息大全(转)

消息,就是指Windows发出的一个通知,告诉应用程序某个事情发生了。例如,单击鼠标、改变窗口尺寸、按下键盘上的一个键都会使Windows发送一个消息给应用程序。 消息本身是作为一个记录传递给应用程序的,这个记录中包含了消息的类型以及其他信息。例如,对于单击鼠标所产生的消息来说,这个记录中包含了单击鼠标时的坐标。这个记录类型叫做TMsg,它在Windows...

RabbitMQ、Kafka、RocketMQ的优劣势

今天我们一起来探讨:  全量的消息队列究竟有哪些?  Kafka、RocketMQ、RabbitMQ的优劣势比较  以及消息队列的选型 最全MQ消息队列有哪些 那么目前在业界有哪些比较知名的消息引擎呢?如下图所示 这里面几乎完全列举了当下比较知名的消息引擎,包括:  ZeroMQ  推特的Distributedlog  ActiveMQ:Apach...

在网上看到这篇文章还不错,OnDrawItem与DrawItem讨论

我在学习中经常遇到要重写DrawItem()的情况,但又有一个WM_DRAWITEM消息,它们是什么样的关系呢。如果我们要重写一个CButton取名为CMyButton,我们可以重写CMyButton的DrawItem()函数来实现我们的 需求,但CMyButton::DrawItem()是在什么时候调用呢?它是在它的宿主类的OnDrawItem()中被调...

[转]C#中用NamedPipe进程间通信

转自:http://blog.csdn.net/jinjazz/archive/2009/02/03/3861143.aspx 本文只是一个测试例子,核心代码是kernel32.dll中的一组windows api函数,这里不深入研究,代码都在codeproject上。 http://www.codeproject.com/KB/threads/dotne...