此文已由作者赵计刚授权网易云社区发布。
欢迎访问网易云社区,了解更多网易技术产品运营经验。
dubbo的心跳机制:
目的:检测provider与consumer之间的connection连接是不是还连接着,如果连接断了,需要作出相应的处理。
原理:
provider:dubbo的心跳默认是在heartbeat(默认是60s)内如果没有接收到消息,就会发送心跳消息,如果连着3次(180s)没有收到心跳响应,provider会关闭channel。
consumer:dubbo的心跳默认是在60s内如果没有接收到消息,就会发送心跳消息,如果连着3次(180s)没有收到心跳响应,consumer会进行重连。
来看源码调用链。先看provider端。
一、provider端心跳机制
-->openServer(URLurl) url:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=10.10.10.10&bind.port=20880&default.server=netty4&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=21999&qos.port=22222&side=provider×tamp=1520660491836 -->createServer(URLurl) -->HeaderExchanger.bind(URLurl,ExchangeHandlerhandler) url:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=10.10.10.10&bind.port=20880&channel.readonly.sent=true&codec=dubbo&default.server=netty4&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=21999&qos.port=22222&side=provider×tamp=1520660491836handler:DubboProtocol.requestHandler -->newDecodeHandler(newHeaderExchangeHandler(handler))) -->NettyTransporter.bind(URLurl,ChannelHandlerlistener) listener:上边的DecodeHandler实例 -->newNettyServer(URLurl,ChannelHandlerhandler) -->ChannelHandler.wrapInternal(ChannelHandlerhandler,URLurl) handler:上边的DecodeHandler实例 -->doOpen()//开启netty服务 -->newHeaderExchangeServer(Serverserver) server:上述的NettyServer -->startHeatbeatTimer()
服务端在开启netty服务时, 在调用createServer时,会从url的parameters map中获取heartbeat配置,代码如下:
1privateExchangeServercreateServer(URLurl){ 2 3... 4 5url=url.addParameterIfAbsent(Constants.HEARTBEAT_KEY,String.valueOf(Constants.DEFAULT_HEARTBEAT)); 6 7... 8 9ExchangeServerserver; 10try{ 11server=Exchangers.bind(url,requestHandler); 12}catch(RemotingExceptione){ 13thrownewRpcException("Failtostartserver(url:"+url+")"+e.getMessage(),e); 14} 15 16... 17 18returnserver; 19}
其中:int DEFAULT_HEARTBEAT = 60 * 1000,即当用户没有配置heartbeat(心跳时间)时,默认heartbeat=60s(即60s内没有接收到任何请求,就会发送心跳信息)。那么这个heartbeat到底该怎么配?
provider端:
1<dubbo:service...> 2<dubbo:parameterkey="heartbeat"value="3000"/> 3</dubbo:service>
consumer端:
1<dubbo:reference...> 2<dubbo:parameterkey="heartbeat"value="3000"/> 3</dubbo:reference>
再来看调用链,当执行到这一句。
1ChannelHandler.wrapInternal(ChannelHandlerhandler,URLurl)
会形成一个handler调用链,调用链如下:
1MultiMessageHandler 2-->handler:HeartbeatHandler 3-->handler:AllChannelHandler 4-->url:providerUrl 5-->executor:FixedExecutor 6-->handler:DecodeHandler 7-->handler:HeaderExchangeHandler 8-->handler:ExchangeHandlerAdapter(DubboProtocol.requestHandler)
这也是netty接收到请求后的处理链路,注意其中有一个HeartbeatHandler。
最后,执行new HeaderExchangeServer(Server server),来看源码:
1publicclassHeaderExchangeServerimplementsExchangeServer{ 2/**心跳定时器*/ 3privatefinalScheduledExecutorServicescheduled=Executors.newScheduledThreadPool(1, 4newNamedThreadFactory( 5"dubbo-remoting-server-heartbeat", 6true)); 7/**NettyServer*/ 8privatefinalServerserver; 9//heartbeattimer 10privateScheduledFuture<?>heatbeatTimer; 11//heartbeattimeout(ms),defaultvalueis0,won'texecuteaheartbeat. 12privateintheartbeat; 13privateintheartbeatTimeout; 14privateAtomicBooleanclosed=newAtomicBoolean(false); 15 16publicHeaderExchangeServer(Serverserver){ 17if(server==null){ 18thrownewIllegalArgumentException("server==null"); 19} 20this.server=server; 21this.heartbeat=server.getUrl().getParameter(Constants.HEARTBEAT_KEY,0); 22this.heartbeatTimeout=server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY,heartbeat*3); 23if(heartbeatTimeout<heartbeat*2){ 24thrownewIllegalStateException("heartbeatTimeout<heartbeatInterval*2"); 25} 26startHeatbeatTimer(); 27} 28 29privatevoidstartHeatbeatTimer(){ 30stopHeartbeatTimer(); 31if(heartbeat>0){ 32heatbeatTimer=scheduled.scheduleWithFixedDelay( 33newHeartBeatTask(newHeartBeatTask.ChannelProvider(){ 34publicCollection<Channel>getChannels(){ 35returnCollections.unmodifiableCollection( 36HeaderExchangeServer.this.getChannels()); 37} 38},heartbeat,heartbeatTimeout), 39heartbeat,heartbeat,TimeUnit.MILLISECONDS); 40} 41} 42 43privatevoidstopHeartbeatTimer(){ 44try{ 45ScheduledFuture<?>timer=heatbeatTimer; 46if(timer!=null&&!timer.isCancelled()){ 47timer.cancel(true); 48} 49}catch(Throwablet){ 50logger.warn(t.getMessage(),t); 51}finally{ 52heatbeatTimer=null; 53} 54} 55}
创建HeaderExchangeServer时,初始化了heartbeat(心跳间隔时间)和heartbeatTimeout(心跳响应超时时间:即如果最终发送的心跳在这个时间内都没有返回,则做出响应的处理)。
heartbeat默认是0(从startHeatbeatTimer()方法可以看出只有heartbeat>0的情况下,才会发心跳,这里heartbeat如果从url的parameter map中获取不到,就是0,但是我们在前边看到dubbo会默认设置heartbeat=60s到parameter map中,所以此处的heartbeat=60s);
heartbeatTimeout:默认是heartbeat*3。(原因:假设一端发出一次heartbeatRequest,另一端在heartbeat内没有返回任何响应-包括正常请求响应和心跳响应,此时不能认为是连接断了,因为有可能还是网络抖动什么的导致了tcp包的重传超时等)
scheduled是一个含有一个线程的定时线程执行器(其中的线程名字为:"dubbo-remoting-server-heartbeat-thread-*")
之后启动心跳定时任务:
首先如果原来有心跳定时任务,关闭原来的定时任务
之后启动scheduled中的定时线程,从启动该线程开始,每隔heartbeat执行一次HeartBeatTask任务(第一次执行是在启动线程后heartbeat时)
来看一下HeartBeatTask的源码:
1finalclassHeartBeatTaskimplementsRunnable{ 2//channel获取器:用于获取所有需要进行心跳检测的channel 3privateChannelProviderchannelProvider; 4privateintheartbeat; 5privateintheartbeatTimeout; 6 7HeartBeatTask(ChannelProviderprovider,intheartbeat,intheartbeatTimeout){ 8this.channelProvider=provider; 9this.heartbeat=heartbeat; 10this.heartbeatTimeout=heartbeatTimeout; 11} 12 13publicvoidrun(){ 14try{ 15longnow=System.currentTimeMillis(); 16for(Channelchannel:channelProvider.getChannels()){ 17if(channel.isClosed()){ 18continue; 19} 20try{ 21//获取最后一次读操作的时间 22LonglastRead=(Long)channel.getAttribute( 23HeaderExchangeHandler.KEY_READ_TIMESTAMP); 24//获取最后一次写操作的时间 25LonglastWrite=(Long)channel.getAttribute( 26HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);27//如果在heartbeat内没有进行读操作或者写操作,则发送心跳请求 28if((lastRead!=null&&now-lastRead>heartbeat) 29||(lastWrite!=null&&now-lastWrite>heartbeat)){ 30Requestreq=newRequest(); 31req.setVersion("2.0.0"); 32req.setTwoWay(true); 33req.setEvent(Request.HEARTBEAT_EVENT); 34channel.send(req); 35if(logger.isDebugEnabled()){ 36logger.debug("Sendheartbeattoremotechannel"+channel.getRemoteAddress() 37+",cause:Thechannelhasnodata-transmissionexceedsaheartbeatperiod:"+heartbeat+"ms"); 38} 39} 40//正常消息和心跳在heartbeatTimeout都没接收到 41if(lastRead!=null&&now-lastRead>heartbeatTimeout){ 42logger.warn("Closechannel"+channel 43+",becauseheartbeatreadidletimeout:"+heartbeatTimeout+"ms"); 44//consumer端进行重连 45if(channelinstanceofClient){ 46try{ 47((Client)channel).reconnect(); 48}catch(Exceptione){ 49//donothing 50} 51}else{//provider端关闭连接 52channel.close(); 53} 54} 55}catch(Throwablet){ 56logger.warn("Exceptionwhenheartbeattoremotechannel"+channel.getRemoteAddress(),t); 57} 58} 59}catch(Throwablet){ 60logger.warn("Unhandledexceptionwhenheartbeat,cause:"+t.getMessage(),t); 61} 62} 63 64interfaceChannelProvider{ 65Collection<Channel>getChannels(); 66} 67}
HeartBeatTask首先获取所有的channelProvider#getChannels获取所有需要心跳检测的channel,channelProvider实例是HeaderExchangeServer中在启动线程定时执行器的时候创建的内部类。
1newHeartBeatTask.ChannelProvider(){ 2publicCollection<Channel>getChannels(){ 3returnCollections.unmodifiableCollection( 4HeaderExchangeServer.this.getChannels()); 5} 6}
更多网易技术、产品、运营经验分享请点击。
相关文章:
【推荐】一个小白的测试环境docker化之路
【推荐】移动端推广APP防作弊机制之依我见
【推荐】大中型 UGC 平台的反垃圾(anti-spam)工作