此文已由作者赵计刚授权网易云社区发布。
欢迎访问网易云社区,了解更多网易技术产品运营经验。
二、consumer端心跳机制
//创建ExchangeClient,对第一次服务发现providers路径下的相关url建立长连接 -->getClients(URLurl) -->getSharedClient(URLurl) -->ExchangeClientexchangeClient=initClient(url) -->Exchangers.connect(url,requestHandler) -->HeaderExchanger.connect(URLurl,ExchangeHandlerhandler) -->newDecodeHandler(newHeaderExchangeHandler(handler))) -->Transporters.connect(URLurl,ChannelHandler...handlers) -->NettyTransporter.connect(URLurl,ChannelHandlerlistener) -->newNettyClient(url,listener) -->newMultiMessageHandler(HeartbeatHandler(AllChannelHandler(handler))) -->getChannelCodec(url)//获取Codec2,这里是DubboCountCodec实例 -->doOpen()//开启netty客户端 -->doConnect()//连接服务端,建立长连接 -->newHeaderExchangeClient(Clientclient,booleanneedHeartbeat)//上述的NettyClient实例,needHeartbeat:true -->startHeatbeatTimer()//启动心跳计数器
客户端在initClient(url)中设置了heartbeat参数(默认为60s,用户自己设置的方式见“一”中所讲),如下:
1/** 2*Createnewconnection 3*/ 4privateExchangeClientinitClient(URLurl){ 5... 6//enableheartbeatbydefault 7url=url.addParameterIfAbsent(Constants.HEARTBEAT_KEY,String.valueOf(Constants.DEFAULT_HEARTBEAT)); 8 9... 10 11ExchangeClientclient; 12try{ 13//connectionshouldbelazy 14if(url.getParameter(Constants.LAZY_CONNECT_KEY,false)){ 15client=newLazyConnectExchangeClient(url,requestHandler);16}else{ 17client=Exchangers.connect(url,requestHandler); 18} 19}catch(RemotingExceptione){ 20thrownewRpcException("Failtocreateremotingclientforservice("+url+"):"+e.getMessage(),e); 21} 22returnclient; 23}
与provider类似,来看一下最后开启心跳检测的地方。
1publicclassHeaderExchangeClientimplementsExchangeClient{ 2privatestaticfinalScheduledThreadPoolExecutorscheduled=newScheduledThreadPoolExecutor(2,newNamedThreadFactory("dubbo-remoting-client-heartbeat",true)); 3privatefinalClientclient; 4privatefinalExchangeChannelchannel; 5//heartbeattimer 6privateScheduledFuture<?>heartbeatTimer; 7//heartbeat(ms),defaultvalueis0,won'texecuteaheartbeat. 8privateintheartbeat; 9privateintheartbeatTimeout; 10 11publicHeaderExchangeClient(Clientclient,booleanneedHeartbeat){ 12if(client==null){ 13thrownewIllegalArgumentException("client==null"); 14} 15this.client=client; 16this.channel=newHeaderExchangeChannel(client); 17Stringdubbo=client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); 18this.heartbeat=client.getUrl().getParameter(Constants.HEARTBEAT_KEY,dubbo!=null&&dubbo.startsWith("1.0.")?Constants.DEFAULT_HEARTBEAT:0); 19this.heartbeatTimeout=client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY,heartbeat*3); 20if(heartbeatTimeout<heartbeat*2){ 21thrownewIllegalStateException("heartbeatTimeout<heartbeatInterval*2"); 22} 23if(needHeartbeat){ 24startHeatbeatTimer(); 25} 26} 27 28privatevoidstartHeatbeatTimer(){ 29stopHeartbeatTimer(); 30if(heartbeat>0){ 31heartbeatTimer=scheduled.scheduleWithFixedDelay( 32newHeartBeatTask(newHeartBeatTask.ChannelProvider(){ 33publicCollection<Channel>getChannels(){ 34returnCollections.<Channel>singletonList(HeaderExchangeClient.this); 35} 36},heartbeat,heartbeatTimeout), 37heartbeat,heartbeat,TimeUnit.MILLISECONDS); 38} 39} 40 41privatevoidstopHeartbeatTimer(){ 42if(heartbeatTimer!=null&&!heartbeatTimer.isCancelled()){ 43try{ 44heartbeatTimer.cancel(true); 45scheduled.purge(); 46}catch(Throwablee){ 47if(logger.isWarnEnabled()){ 48logger.warn(e.getMessage(),e); 49} 50} 51} 52heartbeatTimer=null; 53} 54}
主要看一下startHeartbeatTimer()方法,与provider相同,只是provider是获取NettyServer的所有的NettyChannel,而consumer只是获取当前的对象。
consumer的handler处理链与provider完全相同。
最后来看一下consumer的重连机制:AbstractClient#reconnect
1publicvoidreconnect()throwsRemotingException{ 2disconnect();3connect(); 4} 5 6publicvoiddisconnect(){ 7connectLock.lock(); 8try{ 9destroyConnectStatusCheckCommand(); 10try{ 11Channelchannel=getChannel(); 12if(channel!=null){ 13channel.close(); 14} 15}catch(Throwablee){ 16logger.warn(e.getMessage(),e); 17} 18try{ 19doDisConnect(); 20}catch(Throwablee){ 21logger.warn(e.getMessage(),e); 22} 23}finally{ 24connectLock.unlock(); 25} 26} 27 28protectedvoidconnect()throwsRemotingException{ 29connectLock.lock(); 30try{ 31if(isConnected()){ 32return; 33} 34initConnectStatusCheckCommand(); 35doConnect(); 36if(!isConnected()){ 37thrownewRemotingException(this,"Failedconnecttoserver"+getRemoteAddress()+"from"+getClass().getSimpleName()+"" 38+NetUtils.getLocalHost()+"usingdubboversion"+Version.getVersion() 39+",cause:Connectwaittimeout:"+getTimeout()+"ms."); 40}else{ 41if(logger.isInfoEnabled()){ 42logger.info("Successedconnecttoserver"+getRemoteAddress()+"from"+getClass().getSimpleName()+"" 43+NetUtils.getLocalHost()+"usingdubboversion"+Version.getVersion() 44+",channelis"+this.getChannel()); 45} 46} 47reconnect_count.set(0); 48reconnect_error_log_flag.set(false); 49}catch(RemotingExceptione){ 50throwe; 51}catch(Throwablee){ 52thrownewRemotingException(this,"Failedconnecttoserver"+getRemoteAddress()+"from"+getClass().getSimpleName()+"" 53+NetUtils.getLocalHost()+"usingdubboversion"+Version.getVersion() 54+",cause:"+e.getMessage(),e); 55}finally{ 56connectLock.unlock(); 57} 58}
代码比较简单,先断连,再连接。
更多网易技术、产品、运营经验分享请点击。
相关文章:
【推荐】ApiDoc 一键生成注释