dubbo心跳机制 (3)

摘要:
本文由作者赵继刚授权网易云社区发布。欢迎来到网易云社区,了解更多网易科技产品运营经验。2、 消费者端心跳机制//创建一个ExchangeClient,并在第一个服务发现--˃getClients-˃getSharedClient--˃ExchangeClientexchangeClient=initClient--˃交换机的提供程序路径下建立到相关URL的长连接。连接--˃总管交换器。connect--˃newDecodeHandler)--˃传输程序。连接--˃NettyTransporter。connect--˃newNettyClient--˃newMultiMessageHandler--˃getChannelCodec//Get Codec2,这是DubboCountCodec实例--˃doOpen()//启动netty客户端--˃doConnect()//connect到服务器并建立长连接--˃newHeaderExchangeClient//上面的NettyClient实例,needHeartbeat:true--˃startHeatbeatTimer()//开始心跳计数器。客户端已在initClient中设置心跳参数,如下所示:1/**2*创建新连接3*/4privateExchangeClient{5…6//默认启用心跳7url=url.addParameterIfAbsent;89…1011Exchange客户端;12如果{15client=newLazyConnectExchangeClient;16}否则{17client=Exchanges.connect;18}19}捕获{20throuwnewRpcException;21}22returnclient;23}类似于提供者,让我们看看最后一个启用心跳检测的地方。

此文已由作者赵计刚授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。

二、consumer端心跳机制

//创建ExchangeClient,对第一次服务发现providers路径下的相关url建立长连接
-->getClients(URLurl)
-->getSharedClient(URLurl)
-->ExchangeClientexchangeClient=initClient(url)
-->Exchangers.connect(url,requestHandler)
-->HeaderExchanger.connect(URLurl,ExchangeHandlerhandler)
-->newDecodeHandler(newHeaderExchangeHandler(handler)))
-->Transporters.connect(URLurl,ChannelHandler...handlers)
-->NettyTransporter.connect(URLurl,ChannelHandlerlistener)
-->newNettyClient(url,listener)
-->newMultiMessageHandler(HeartbeatHandler(AllChannelHandler(handler)))
-->getChannelCodec(url)//获取Codec2,这里是DubboCountCodec实例
-->doOpen()//开启netty客户端
-->doConnect()//连接服务端,建立长连接
-->newHeaderExchangeClient(Clientclient,booleanneedHeartbeat)//上述的NettyClient实例,needHeartbeat:true
-->startHeatbeatTimer()//启动心跳计数器

客户端在initClient(url)中设置了heartbeat参数(默认为60s,用户自己设置的方式见“一”中所讲),如下:

1/**
2*Createnewconnection
3*/
4privateExchangeClientinitClient(URLurl){
5...
6//enableheartbeatbydefault
7url=url.addParameterIfAbsent(Constants.HEARTBEAT_KEY,String.valueOf(Constants.DEFAULT_HEARTBEAT));
8
9...
10
11ExchangeClientclient;
12try{
13//connectionshouldbelazy
14if(url.getParameter(Constants.LAZY_CONNECT_KEY,false)){
15client=newLazyConnectExchangeClient(url,requestHandler);16}else{
17client=Exchangers.connect(url,requestHandler);
18}
19}catch(RemotingExceptione){
20thrownewRpcException("Failtocreateremotingclientforservice("+url+"):"+e.getMessage(),e);
21}
22returnclient;
23}

与provider类似,来看一下最后开启心跳检测的地方。

1publicclassHeaderExchangeClientimplementsExchangeClient{
2privatestaticfinalScheduledThreadPoolExecutorscheduled=newScheduledThreadPoolExecutor(2,newNamedThreadFactory("dubbo-remoting-client-heartbeat",true));
3privatefinalClientclient;
4privatefinalExchangeChannelchannel;
5//heartbeattimer
6privateScheduledFuture<?>heartbeatTimer;
7//heartbeat(ms),defaultvalueis0,won'texecuteaheartbeat.
8privateintheartbeat;
9privateintheartbeatTimeout;
10
11publicHeaderExchangeClient(Clientclient,booleanneedHeartbeat){
12if(client==null){
13thrownewIllegalArgumentException("client==null");
14}
15this.client=client;
16this.channel=newHeaderExchangeChannel(client);
17Stringdubbo=client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
18this.heartbeat=client.getUrl().getParameter(Constants.HEARTBEAT_KEY,dubbo!=null&&dubbo.startsWith("1.0.")?Constants.DEFAULT_HEARTBEAT:0);
19this.heartbeatTimeout=client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY,heartbeat*3);
20if(heartbeatTimeout<heartbeat*2){
21thrownewIllegalStateException("heartbeatTimeout<heartbeatInterval*2");
22}
23if(needHeartbeat){
24startHeatbeatTimer();
25}
26}
27
28privatevoidstartHeatbeatTimer(){
29stopHeartbeatTimer();
30if(heartbeat>0){
31heartbeatTimer=scheduled.scheduleWithFixedDelay(
32newHeartBeatTask(newHeartBeatTask.ChannelProvider(){
33publicCollection<Channel>getChannels(){
34returnCollections.<Channel>singletonList(HeaderExchangeClient.this);
35}
36},heartbeat,heartbeatTimeout),
37heartbeat,heartbeat,TimeUnit.MILLISECONDS);
38}
39}
40
41privatevoidstopHeartbeatTimer(){
42if(heartbeatTimer!=null&&!heartbeatTimer.isCancelled()){
43try{
44heartbeatTimer.cancel(true);
45scheduled.purge();
46}catch(Throwablee){
47if(logger.isWarnEnabled()){
48logger.warn(e.getMessage(),e);
49}
50}
51}
52heartbeatTimer=null;
53}
54}

主要看一下startHeartbeatTimer()方法,与provider相同,只是provider是获取NettyServer的所有的NettyChannel,而consumer只是获取当前的对象。

consumer的handler处理链与provider完全相同。

最后来看一下consumer的重连机制:AbstractClient#reconnect

1publicvoidreconnect()throwsRemotingException{
2disconnect();3connect();
4}
5
6publicvoiddisconnect(){
7connectLock.lock();
8try{
9destroyConnectStatusCheckCommand();
10try{
11Channelchannel=getChannel();
12if(channel!=null){
13channel.close();
14}
15}catch(Throwablee){
16logger.warn(e.getMessage(),e);
17}
18try{
19doDisConnect();
20}catch(Throwablee){
21logger.warn(e.getMessage(),e);
22}
23}finally{
24connectLock.unlock();
25}
26}
27
28protectedvoidconnect()throwsRemotingException{
29connectLock.lock();
30try{
31if(isConnected()){
32return;
33}
34initConnectStatusCheckCommand();
35doConnect();
36if(!isConnected()){
37thrownewRemotingException(this,"Failedconnecttoserver"+getRemoteAddress()+"from"+getClass().getSimpleName()+""
38+NetUtils.getLocalHost()+"usingdubboversion"+Version.getVersion()
39+",cause:Connectwaittimeout:"+getTimeout()+"ms.");
40}else{
41if(logger.isInfoEnabled()){
42logger.info("Successedconnecttoserver"+getRemoteAddress()+"from"+getClass().getSimpleName()+""
43+NetUtils.getLocalHost()+"usingdubboversion"+Version.getVersion()
44+",channelis"+this.getChannel());
45}
46}
47reconnect_count.set(0);
48reconnect_error_log_flag.set(false);
49}catch(RemotingExceptione){
50throwe;
51}catch(Throwablee){
52thrownewRemotingException(this,"Failedconnecttoserver"+getRemoteAddress()+"from"+getClass().getSimpleName()+""
53+NetUtils.getLocalHost()+"usingdubboversion"+Version.getVersion()
54+",cause:"+e.getMessage(),e);
55}finally{
56connectLock.unlock();
57}
58}

代码比较简单,先断连,再连接。

免费体验云安全(易盾)内容安全、验证码等服务

更多网易技术、产品、运营经验分享请点击

相关文章:
【推荐】ApiDoc 一键生成注释

免责声明:文章转载自《dubbo心跳机制 (3)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇【Scala】Scala之Packaging and ImportsPIE-SDK For C++打开静止卫星数据下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Dubbo多注册中心配置

Dubbo常用的注册中心有Zookeeper、Nacos、Redis,目前项目中使用Zookeeper作为注册中心。service-xxx-dubbo.xml配置如下: <dubbo:application name="${dubbo.applicationName}" /> <dubbo:registry protocol="${dub...

微信小程序设置全局请求URL 封装wx.request请求

app.js: App({ //设置全局请求URL globalData:{ URL: 'https://www.oyhdo.com', }, /** * 封装wx.request请求 * method: 请求方式 * url: 请求地址 * data: 要传递的参数 * callback: 请求...

『AngularJS』$location 服务

参考: ng.$location Developer Guide: Angular Services: Using $location 简介 $location服务解析在浏览器地址栏中的URL(基于window.location)并且让URL在你的应用中可用。改变在地址栏中的URL会作用到$location服务,同样的,改变$location服务也会改...

dubbo问题

1.dubbo启动异常,提供者未暴露服务 provider未提供配置Related cause: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'loanQuotaRecoverOrdersInterface': Cannot r...

js正则表达式 URL格式匹配详解

0、URL格式 protocol :// hostname[:port] / path / [;parameters][?query]#fragment [;parameters]没见过 这里就不做相关匹配了 1、代码及运行结果 'use strict'; { // URL地址匹配格式: protocol :// hostname[:port...

[转]Windows 注册自定义的协议

[转自] http://blog.sina.com.cn/s/blog_86e4a51c01010nik.html 1、注册应用程序来处理自定义协议          你必须添加一个新的key以及相关的value到HKEY_CLASSES_ROOT中,来使应用程序可以处理特殊的URL协议。          新注册的key必须与协议scheme相匹配才可以...