mina学习

摘要:
Mina断开连接重新连接的定义:这里讨论的Mina断断连接重新连接是指使用Mina作为客户端软件连接到其他提供套接字通信服务的服务器。

长连接表示一旦建立了链接,就可以长时间的保持双方的通讯,例如:
socket链接,推送平台.
短链接表示建立链接,完成数据的交换之后,就断开链接,例如: http链接.

mina 框架是对socket链接的一次封装框架,可以更好的管理链接的任务.
在很多的开源项目中使用,例如:Androidpn推送框架.
可以通过简单的几行代码建立通讯链接.

客户端:

        NioSocketConnector connector=new NioSocketConnector();
        //设置处理器
        connector.setHandler(new MyHandler());
        //设置拦截器
        connector.getFilterChain().addLast("codec",
                new ProtocolCodecFilter(new TextLineCodecFactory()));
        //建立连接
        ConnectFuture future=connector.connect(new InetSocketAddress("127.1.1.0", 8888));
        future.awaitUninterruptibly();
        //获取连接的会话
        IoSession session=future.getSession();

        BufferedReader inputReader = new BufferedReader(
                 new InputStreamReader(System.in));
        String inputContent;
        while ((inputContent = inputReader.readLine())!=null) {
            session.write(inputContent);
        }

服务端:

            NioSocketAcceptor  acceptor=new NioSocketAcceptor();
            //设置处理器
            acceptor.setHandler(new MyHandler());
            //设置空闲的时间
            acceptor.getSessionConfig().setIdleTime(
                    IdleStatus.BOTH_IDLE, 3);
            //设置拦截器
            acceptor.getFilterChain().addLast("codec", 
                    new ProtocolCodecFilter(new MyTextLineProtocolFactory()));
            ///端口的绑定
            acceptor.bind(new InetSocketAddress(8888));

至于处理器IohandlerAdapter,与拦截器的编写,略.

---------------------------------------------------

自己以解决:session.getService().getManagedSessions();可以当前连接的所用session;
session.setAttribute(session.getId(),"解析出来的ID")可以让一个客户上次id对应一个session;

mina解决断线重连:

http://www.iteye.com/topic/1125178

http://chwshuang.iteye.com/blog/2028951

http://blog.csdn.net/yoara/article/details/37597141

-------------------------

Mina使用IoHandler实现业务处理

IoHandler是Mina实现其业务逻辑的顶级接口;在IoHandler中定义了7个方法,根据I/O事件来触发对应的方法:

import java.io.IOException;

public interface IoHandler {

void sessionCreated(IoSession session) throws Exception;

void sessionOpened(IoSession session) throws Exception;

void sessionClosed(IoSession session) throws Exception;

void sessionIdle(IoSession session, IdleStatus status) throws Exception;

void exceptionCaught(IoSession session, Throwable cause) throws Exception;

void messageReceived(IoSession session, Object message) throws Exception;

void messageSent(IoSession session, Object message) throws Exception;
}
sessionCreated:当一个新的连接建立时,由I/O processor thread调用;

sessionOpened:当连接打开是调用;

messageReceived:当接收了一个消息时调用;

messageSent:当一个消息被(IoSession#write)发送出去后调用;

sessionIdle:当连接进入空闲状态时调用;

sessionClosed:当连接关闭时调用;

exceptionCaught:当实现IoHandler的类抛出异常时调用;

一般情况下,我们最关心的只有messageReceived方法,接收消息并处理,然后调用IoSession的write方法发送出消息!(注意:这里接收到的消息都是Java对象,在IoFilter中所有二进制数据都被解码啦!)

一般情况下很少有人实现IoHandler接口,而是继承它的一个实现类IoHandlerAdapter,这样不用覆盖它的7个方法,只需要根据具体需求覆盖其中的几个方法就可以!

Mina 断线重连

定义:这里讨论的Mina 断线重连是指使用mina作为客户端软件,连接其他提供Socket通讯服务的服务器端。Socket服务器可以是Mina提供的服务器,也可以是C++提供的服务器。

mina学习第1张

一、断线重连的方式;

1. 在创建Mina客户端时增加一个监听器,或者增加一个拦截器,当检测到Session关闭时,自动进行重连。

mina学习第2张

2. 在第1种方式的基础上,增加客户端的读写通道空闲检查,当发生Session关闭或者读写空闲时,进行重连。

mina学习第3张

第一种方式比较传统,优点是简单方便,适合网络稳定、数据量不大(1M带宽以下)的环境;不过缺点是不能对系统级的连接断开阻塞进行捕获。

第二种方式更加精细,基本上能捕获到应用、网络、系统级的断连。

二、重连目的:

在使用Mina做为客户端时,往往因为网络、服务器、应用程序出现问题而导致连接断开,而自动重连,就是解决连接断开的唯一方式。如果网线断开、服务器宕机、应用程序挂了,都是断线的原因,这个时候,通过增加一个监听器或者拦截器,就能实现重连。但是生产环境中,断线的原因可能更复杂:网络不稳定、延时、服务器负载高、服务器或者应用程序的发送或者接收缓冲区满等等问题都可能导致数据传输过程出现类似于断线的情况,这个时候,光检测Session关闭是远远不够的,这个时候就需要一种重连机制,比如读写空闲超过30秒,就进行重连。对于数据不间断、实时性高、数据量大的应用场景,更是实用。

三、实例:

第一种:监听器方式

创建一个监听器实现mina的IoServiceListener接口,里面的方法可以不用写实现

  1. <spanstyle="font-family:'MicrosoftYaHei',微软雅黑,SimHei,tahoma,arial,helvetica,sans-serif;">importorg.apache.mina.core.service.IoService;
  2. importorg.apache.mina.core.service.IoServiceListener;
  3. importorg.apache.mina.core.session.IdleStatus;
  4. importorg.apache.mina.core.session.IoSession;
  5. publicclassIoListenerimplementsIoServiceListener{
  6. @Override
  7. publicvoidserviceActivated(IoServicearg0)throwsException{
  8. //TODOAuto-generatedmethodstub
  9. }
  10. @Override
  11. publicvoidserviceDeactivated(IoServicearg0)throwsException{
  12. //TODOAuto-generatedmethodstub
  13. }
  14. @Override
  15. publicvoidserviceIdle(IoServicearg0,IdleStatusarg1)throwsException{
  16. //TODOAuto-generatedmethodstub
  17. }
  18. @Override
  19. publicvoidsessionCreated(IoSessionarg0)throwsException{
  20. //TODOAuto-generatedmethodstub
  21. }
  22. @Override
  23. publicvoidsessionDestroyed(IoSessionarg0)throwsException{
  24. //TODOAuto-generatedmethodstub
  25. }
  26. }</span>

再创建客户端时加入监听

  1. <spanstyle="font-family:'MicrosoftYaHei',微软雅黑,SimHei,tahoma,arial,helvetica,sans-serif;">NioSocketConnectorconnector=newNioSocketConnector();//创建连接客户端
  2. connector.setConnectTimeoutMillis(30000);//设置连接超时
  3. connector.getSessionConfig().setReceiveBufferSize(10240);//设置接收缓冲区的大小
  4. connector.getSessionConfig().setSendBufferSize(10240);//设置输出缓冲区的大小
  5. //加入解码器
  6. TextLineCodecFactoryfactory=newTextLineCodecFactory(Charset.forName("GBK"),LineDelimiter.WINDOWS.getValue(),LineDelimiter.WINDOWS.getValue());
  7. factory.setDecoderMaxLineLength(10240);
  8. factory.setEncoderMaxLineLength(10240);
  9. connector.getFilterChain().addLast("codec",newProtocolCodecFilter(factory));
  10. connector.setDefaultRemoteAddress(newInetSocketAddress(host,port));//设置默认访问地址
  11. //添加处理器
  12. connector.setHandler(newIoHandler());
  13. //添加重连监听
  14. connector.addListener(newIoListener(){
  15. @Override
  16. publicvoidsessionDestroyed(IoSessionarg0)throwsException{
  17. for(;;){
  18. try{
  19. Thread.sleep(3000);
  20. ConnectFuturefuture=connector.connect();
  21. future.awaitUninterruptibly();//等待连接创建成功
  22. session=future.getSession();//获取会话
  23. if(session.isConnected()){
  24. logger.info("断线重连["+connector.getDefaultRemoteAddress().getHostName()+":"+connector.getDefaultRemoteAddress().getPort()+"]成功");
  25. break;
  26. }
  27. }catch(Exceptionex){
  28. logger.info("重连服务器登录失败,3秒再连接一次:"+ex.getMessage());
  29. }
  30. }
  31. }
  32. });
  33. for(;;){
  34. try{
  35. ConnectFuturefuture=connector.connect();
  36. future.awaitUninterruptibly();//等待连接创建成功
  37. session=future.getSession();//获取会话
  38. logger.info("连接服务端"+host+":"+port+"[成功]"+",,时间:"+newSimpleDateFormat("yyyy-MM-ddHH:mm:ss").format(newDate()));
  39. break;
  40. }catch(RuntimeIoExceptione){
  41. logger.error("连接服务端"+host+":"+port+"失败"+",,时间:"+newSimpleDateFormat("yyyy-MM-ddHH:mm:ss").format(newDate())+",连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:"+e.getMessage(),e);
  42. Thread.sleep(5000);//连接失败后,重连间隔5s
  43. }
  44. }
  45. </span>

第一种:拦截器方式

  1. <spanstyle="font-family:'MicrosoftYaHei',微软雅黑,SimHei,tahoma,arial,helvetica,sans-serif;">connector=newNioSocketConnector();//创建连接客户端
  2. connector.setConnectTimeoutMillis(30000);//设置连接超时
  3. //断线重连回调拦截器
  4. connector.getFilterChain().addFirst("reconnection",newIoFilterAdapter(){
  5. @Override
  6. publicvoidsessionClosed(NextFilternextFilter,IoSessionioSession)throwsException{
  7. for(;;){
  8. try{
  9. Thread.sleep(3000);
  10. ConnectFuturefuture=connector.connect();
  11. future.awaitUninterruptibly();//等待连接创建成功
  12. session=future.getSession();//获取会话
  13. if(session.isConnected()){
  14. logger.info("断线重连["+connector.getDefaultRemoteAddress().getHostName()+":"+connector.getDefaultRemoteAddress().getPort()+"]成功");
  15. break;
  16. }
  17. }catch(Exceptionex){
  18. logger.info("重连服务器登录失败,3秒再连接一次:"+ex.getMessage());
  19. }
  20. }
  21. }
  22. });
  23. TextLineCodecFactoryfactory=newTextLineCodecFactory(Charset.forName(encoding),LineDelimiter.WINDOWS.getValue(),LineDelimiter.WINDOWS.getValue());
  24. factory.setDecoderMaxLineLength(10240);
  25. factory.setEncoderMaxLineLength(10240);
  26. //加入解码器
  27. connector.getFilterChain().addLast("codec",newProtocolCodecFilter(factory));
  28. //添加处理器
  29. connector.setHandler(newIoHandler());
  30. connector.getSessionConfig().setReceiveBufferSize(10240);//设置接收缓冲区的大小
  31. connector.getSessionConfig().setSendBufferSize(10240);//设置输出缓冲区的大小
  32. connector.setDefaultRemoteAddress(newInetSocketAddress(host,port));//设置默认访问地址
  33. for(;;){
  34. try{
  35. ConnectFuturefuture=connector.connect();
  36. //等待连接创建成功
  37. future.awaitUninterruptibly();
  38. //获取会话
  39. session=future.getSession();
  40. logger.error("连接服务端"+host+":"+port+"[成功]"+",,时间:"+newSimpleDateFormat("yyyy-MM-ddHH:mm:ss").format(newDate()));
  41. break;
  42. }catch(RuntimeIoExceptione){
  43. logger.error("连接服务端"+host+":"+port+"失败"+",,时间:"+newSimpleDateFormat("yyyy-MM-ddHH:mm:ss").format(newDate())+",连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:"+e.getMessage(),e);
  44. Thread.sleep(5000);//连接失败后,重连间隔5s
  45. }
  46. }</span>

第二种:加入空闲检测机制

空闲检测机制需要在创建客户端时,加入空闲超时,然后在处理器handler端的sessionIdle方法中加入一个预关闭连接的方法。让Session关闭传递到监听器或者拦截器的sessionClose方法中实现重连。

以拦截器方式为例,在创建客户端时,加入读写通道空闲检查超时机制。

  1. <spanstyle="font-family:'MicrosoftYaHei',微软雅黑,SimHei,tahoma,arial,helvetica,sans-serif;">connector=newNioSocketConnector();//创建连接客户端
  2. connector.setConnectTimeoutMillis(30000);//设置连接超时
  3. //断线重连回调拦截器
  4. connector.getFilterChain().addFirst("reconnection",newIoFilterAdapter(){
  5. @Override
  6. publicvoidsessionClosed(NextFilternextFilter,IoSessionioSession)throwsException{
  7. for(;;){
  8. try{
  9. Thread.sleep(3000);
  10. ConnectFuturefuture=connector.connect();
  11. future.awaitUninterruptibly();//等待连接创建成功
  12. session=future.getSession();//获取会话
  13. if(session.isConnected()){
  14. logger.info("断线重连["+connector.getDefaultRemoteAddress().getHostName()+":"+connector.getDefaultRemoteAddress().getPort()+"]成功");
  15. break;
  16. }
  17. }catch(Exceptionex){
  18. logger.info("重连服务器登录失败,3秒再连接一次:"+ex.getMessage());
  19. }
  20. }
  21. }
  22. });
  23. connector.getFilterChain().addLast("mdc",newMdcInjectionFilter());
  24. TextLineCodecFactoryfactory=newTextLineCodecFactory(Charset.forName(encoding),LineDelimiter.WINDOWS.getValue(),LineDelimiter.WINDOWS.getValue());
  25. factory.setDecoderMaxLineLength(10240);
  26. factory.setEncoderMaxLineLength(10240);
  27. //加入解码器
  28. connector.getFilterChain().addLast("codec",newProtocolCodecFilter(factory));
  29. connector.getSessionConfig().setReceiveBufferSize(10240);//设置接收缓冲区的大小
  30. connector.getSessionConfig().setSendBufferSize(10240);//设置输出缓冲区的大小
  31. connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,30000);//读写都空闲时间:30秒
  32. connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE,40000);//读(接收通道)空闲时间:40秒
  33. connector.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE,50000);//写(发送通道)空闲时间:50秒
  34. //添加处理器
  35. connector.setHandler(newIoHandler());
  36. connector.setDefaultRemoteAddress(newInetSocketAddress(host,port));//设置默认访问地址
  37. for(;;){
  38. try{
  39. ConnectFuturefuture=connector.connect();
  40. //等待连接创建成功
  41. future.awaitUninterruptibly();
  42. //获取会话
  43. session=future.getSession();
  44. logger.error("连接服务端"+host+":"+port+"[成功]"+",,时间:"+newSimpleDateFormat("yyyy-MM-ddHH:mm:ss").format(newDate()));
  45. break;
  46. }catch(RuntimeIoExceptione){
  47. System.out.println("连接服务端"+host+":"+port+"失败"+",,时间:"+newSimpleDateFormat("yyyy-MM-ddHH:mm:ss").format(newDate())+",连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:"+e.getMessage());
  48. logger.error("连接服务端"+host+":"+port+"失败"+",,时间:"+newSimpleDateFormat("yyyy-MM-ddHH:mm:ss").format(newDate())+",连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:"+e.getMessage(),e);
  49. Thread.sleep(5000);//连接失败后,重连10次,间隔30s
  50. }
  51. }</span>

然后在数据处理器IoHandler中sessionIdle方法中加入Session会话关闭的代码,这样session关闭就能传递到拦截器或者监听器中,然后实现重连。

  1. <spanstyle="font-family:'MicrosoftYaHei',微软雅黑,SimHei,tahoma,arial,helvetica,sans-serif;">importorg.apache.mina.core.service.IoHandlerAdapter;
  2. importorg.apache.mina.core.session.IdleStatus;
  3. importorg.apache.mina.core.session.IoSession;
  4. publicclassIoHandlerextendsIoHandlerAdapter{
  5. //部分代码忽略...
  6. @Override
  7. publicvoidsessionIdle(IoSessionsession,IdleStatusstatus)throwsException{
  8. logger.info("-客户端与服务端连接[空闲]-"+status.toString());
  9. if(session!=null){
  10. session.close(true);
  11. }
  12. }
  13. //部分代码忽略...
  14. }</span>

总结-最佳实践:

以上两种方式我个人认为最好是使用第二种。在实际的生产环境,对于数据量比较少的情况下,需要加一个线程专门发送心跳信息,然后在服务器端进行回应心跳,这样就保证读写通道不出现空闲。如果数据量比较大,大到24小时都有数据,那么就不需要心跳线程,可以直接在IoHandler处理器端中messageReceived方法中定时发送心跳到服务器。由于读写监控还可以处理服务器、网络、应用等等方面的不确定因素,所以建议使用第二种方式。

免责声明:文章转载自《mina学习》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Android开发人员不得不收集的代码(不断更新中...)IDEA快捷键大全下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

spring-session之二:简单配置

官方示例:https://docs.spring.io/spring-session/docs/current/reference/html5/#samples 配置Spring Session 在Web项目中配置Spring Session分为四步: 搭建用于Spring Session的数据存储 将Spring Session的jar文件添加到...

informatica 厂商培训资料

1、informatica中domain与node的理解:    domain 类似于局域网,node就是局域网中的节点或者计算机。    node应与repository在数据库中存储在不同的scheme中,此处node指informatica操作的内容。 2、informatica中Repository Service 与 Intergration S...

XAF学习笔记之 Upcasting

https://www.cnblogs.com/foreachlife/p/xpoupcasting.htmlXAF学习笔记之 Upcasting 通常,我们会定义继承层次结构,假设有类型,CustomerBase,CustomerTrialed,CustomerRegistered三个类型,并且继承结构如下: 业务对象代码定义如下: using...

jsp、javabean、el

JSP三大指令一个jsp页面中,可以有0~N个指令的定义!1. page --> 最复杂:<%@page language="java" info="xxx"...%>* pageEncoding和contentType:> pageEncoding:它指定当前jsp页面的编码,只要不说谎,就不会有乱码!在服务器要把jsp编译成.j...

informatica 参数文件配置

Informatica 中 parameter file 参数文件配置规则: 参数文件的头部内容 [Global] All Integration Services, Integration Service processes, workflows, worklets, and sessions. [Service:service name] T...

springBoot环境下创建webSocket服务端和客户端

1.pom文件导入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <version>2.0.4.RELE...