dubbo心跳机制 (1)

摘要:
Dubbo的心跳机制:目的:检测提供者和使用者之间的连接是否仍在连接。如果连接断开,则需要进行相应的处理。消费者:dubbo的默认心跳是在60秒内没有收到消息时发送心跳消息。如果连续三次没有收到心跳响应,消费者将重新连接。Anyhost=true&application=demo provider&bind。ip=10.10.10.10&绑定.端口=20880&通道。只读。set=true&codec=dubbo&default。服务器=netty4&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com。阿里巴巴。杜波。演示。演示服务和方法=sayHello&pid=21999&qos。port=22222&side=provider&timestamp=1520660491836处理程序:Du bboProtocol。requestHandler--˃newDecodeHandler)--˃NettyTransporter。bindlistener:--˃newNettyServer--˃ChannelHandler WrapInteralhandler的上层DecodeHandler实例:上面的DecodeHandler实例--˃doOpen()//启动netty服务--˃newHeaderExchangeServerserver:上面的NettyServer--˃startHeartbeatTimer()服务器将在启动netty服务器时调用createServer时从url的参数映射中获取心跳配置,代码如下:1 1privateExchangeServercreateServer{23…45url=url.addParameterIfAbsent;67…89ExchangeServer;10尝试{11server=Exchanges.bind;12}捕获{13thrownewRpcException;14}1516…1718returnserver;19} 其中:intDEFAULT_HEARTBEAT=60*1000,即当用户未配置心跳时,默认心跳=60s。

此文已由作者赵计刚授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。

dubbo的心跳机制:

  • 目的:检测provider与consumer之间的connection连接是不是还连接着,如果连接断了,需要作出相应的处理。

  • 原理:

    • provider:dubbo的心跳默认是在heartbeat(默认是60s)内如果没有接收到消息,就会发送心跳消息,如果连着3次(180s)没有收到心跳响应,provider会关闭channel

    • consumer:dubbo的心跳默认是在60s内如果没有接收到消息,就会发送心跳消息,如果连着3次(180s)没有收到心跳响应,consumer会进行重连

来看源码调用链。先看provider端。

一、provider端心跳机制

复制代码

-->openServer(URLurl)
url:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=10.10.10.10&bind.port=20880&default.server=netty4&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=21999&qos.port=22222&side=provider&timestamp=1520660491836
-->createServer(URLurl)
-->HeaderExchanger.bind(URLurl,ExchangeHandlerhandler)
url:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=10.10.10.10&bind.port=20880&channel.readonly.sent=true&codec=dubbo&default.server=netty4&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=21999&qos.port=22222&side=provider&timestamp=1520660491836handler:DubboProtocol.requestHandler
-->newDecodeHandler(newHeaderExchangeHandler(handler)))
-->NettyTransporter.bind(URLurl,ChannelHandlerlistener)
listener:上边的DecodeHandler实例
-->newNettyServer(URLurl,ChannelHandlerhandler)
-->ChannelHandler.wrapInternal(ChannelHandlerhandler,URLurl)
handler:上边的DecodeHandler实例
-->doOpen()//开启netty服务
-->newHeaderExchangeServer(Serverserver)
server:上述的NettyServer
-->startHeatbeatTimer()

复制代码

服务端在开启netty服务时, 在调用createServer时,会从url的parameters map中获取heartbeat配置,代码如下:

复制代码

1privateExchangeServercreateServer(URLurl){
2
3...
4
5url=url.addParameterIfAbsent(Constants.HEARTBEAT_KEY,String.valueOf(Constants.DEFAULT_HEARTBEAT));
6
7...
8
9ExchangeServerserver;
10try{
11server=Exchangers.bind(url,requestHandler);
12}catch(RemotingExceptione){
13thrownewRpcException("Failtostartserver(url:"+url+")"+e.getMessage(),e);
14}
15
16...
17
18returnserver;
19}

复制代码

其中:int DEFAULT_HEARTBEAT = 60 * 1000,即当用户没有配置heartbeat(心跳时间)时,默认heartbeat=60s(即60s内没有接收到任何请求,就会发送心跳信息)。那么这个heartbeat到底该怎么配?

provider端:

1<dubbo:service...>
2<dubbo:parameterkey="heartbeat"value="3000"/>
3</dubbo:service>

consumer端:

1<dubbo:reference...>
2<dubbo:parameterkey="heartbeat"value="3000"/>
3</dubbo:reference>

再来看调用链,当执行到这一句。

1ChannelHandler.wrapInternal(ChannelHandlerhandler,URLurl)

会形成一个handler调用链,调用链如下:

复制代码

1MultiMessageHandler
2-->handler:HeartbeatHandler
3-->handler:AllChannelHandler
4-->url:providerUrl
5-->executor:FixedExecutor
6-->handler:DecodeHandler
7-->handler:HeaderExchangeHandler
8-->handler:ExchangeHandlerAdapter(DubboProtocol.requestHandler)

复制代码

这也是netty接收到请求后的处理链路,注意其中有一个HeartbeatHandler。

最后,执行new HeaderExchangeServer(Server server),来看源码:

复制代码

1publicclassHeaderExchangeServerimplementsExchangeServer{
2/**心跳定时器*/
3privatefinalScheduledExecutorServicescheduled=Executors.newScheduledThreadPool(1,
4newNamedThreadFactory(
5"dubbo-remoting-server-heartbeat",
6true));
7/**NettyServer*/
8privatefinalServerserver;
9//heartbeattimer
10privateScheduledFuture<?>heatbeatTimer;
11//heartbeattimeout(ms),defaultvalueis0,won'texecuteaheartbeat.
12privateintheartbeat;
13privateintheartbeatTimeout;
14privateAtomicBooleanclosed=newAtomicBoolean(false);
15
16publicHeaderExchangeServer(Serverserver){
17if(server==null){
18thrownewIllegalArgumentException("server==null");
19}
20this.server=server;
21this.heartbeat=server.getUrl().getParameter(Constants.HEARTBEAT_KEY,0);
22this.heartbeatTimeout=server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY,heartbeat*3);
23if(heartbeatTimeout<heartbeat*2){
24thrownewIllegalStateException("heartbeatTimeout<heartbeatInterval*2");
25}
26startHeatbeatTimer();
27}
28
29privatevoidstartHeatbeatTimer(){
30stopHeartbeatTimer();
31if(heartbeat>0){
32heatbeatTimer=scheduled.scheduleWithFixedDelay(
33newHeartBeatTask(newHeartBeatTask.ChannelProvider(){
34publicCollection<Channel>getChannels(){
35returnCollections.unmodifiableCollection(
36HeaderExchangeServer.this.getChannels());
37}
38},heartbeat,heartbeatTimeout),
39heartbeat,heartbeat,TimeUnit.MILLISECONDS);
40}
41}
42
43privatevoidstopHeartbeatTimer(){
44try{
45ScheduledFuture<?>timer=heatbeatTimer;
46if(timer!=null&&!timer.isCancelled()){
47timer.cancel(true);
48}
49}catch(Throwablet){
50logger.warn(t.getMessage(),t);
51}finally{
52heatbeatTimer=null;
53}
54}
55}

复制代码

创建HeaderExchangeServer时,初始化了heartbeat(心跳间隔时间)和heartbeatTimeout(心跳响应超时时间:即如果最终发送的心跳在这个时间内都没有返回,则做出响应的处理)。

  • heartbeat默认是0(从startHeatbeatTimer()方法可以看出只有heartbeat>0的情况下,才会发心跳,这里heartbeat如果从url的parameter map中获取不到,就是0,但是我们在前边看到dubbo会默认设置heartbeat=60s到parameter map中,所以此处的heartbeat=60s);

  • heartbeatTimeout:默认是heartbeat*3。(原因:假设一端发出一次heartbeatRequest,另一端在heartbeat内没有返回任何响应-包括正常请求响应和心跳响应,此时不能认为是连接断了,因为有可能还是网络抖动什么的导致了tcp包的重传超时等)

  • scheduled是一个含有一个线程的定时线程执行器(其中的线程名字为:"dubbo-remoting-server-heartbeat-thread-*")

之后启动心跳定时任务:

  • 首先如果原来有心跳定时任务,关闭原来的定时任务

  • 之后启动scheduled中的定时线程,从启动该线程开始,每隔heartbeat执行一次HeartBeatTask任务(第一次执行是在启动线程后heartbeat时)

来看一下HeartBeatTask的源码:

复制代码

1finalclassHeartBeatTaskimplementsRunnable{
2//channel获取器:用于获取所有需要进行心跳检测的channel
3privateChannelProviderchannelProvider;
4privateintheartbeat;
5privateintheartbeatTimeout;
6
7HeartBeatTask(ChannelProviderprovider,intheartbeat,intheartbeatTimeout){
8this.channelProvider=provider;
9this.heartbeat=heartbeat;
10this.heartbeatTimeout=heartbeatTimeout;
11}
12
13publicvoidrun(){
14try{
15longnow=System.currentTimeMillis();
16for(Channelchannel:channelProvider.getChannels()){
17if(channel.isClosed()){
18continue;
19}
20try{
21//获取最后一次读操作的时间
22LonglastRead=(Long)channel.getAttribute(
23HeaderExchangeHandler.KEY_READ_TIMESTAMP);
24//获取最后一次写操作的时间
25LonglastWrite=(Long)channel.getAttribute(
26HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);27//如果在heartbeat内没有进行读操作或者写操作,则发送心跳请求
28if((lastRead!=null&&now-lastRead>heartbeat)
29||(lastWrite!=null&&now-lastWrite>heartbeat)){
30Requestreq=newRequest();
31req.setVersion("2.0.0");
32req.setTwoWay(true);
33req.setEvent(Request.HEARTBEAT_EVENT);
34channel.send(req);
35if(logger.isDebugEnabled()){
36logger.debug("Sendheartbeattoremotechannel"+channel.getRemoteAddress()
37+",cause:Thechannelhasnodata-transmissionexceedsaheartbeatperiod:"+heartbeat+"ms");
38}
39}
40//正常消息和心跳在heartbeatTimeout都没接收到
41if(lastRead!=null&&now-lastRead>heartbeatTimeout){
42logger.warn("Closechannel"+channel
43+",becauseheartbeatreadidletimeout:"+heartbeatTimeout+"ms");
44//consumer端进行重连
45if(channelinstanceofClient){
46try{
47((Client)channel).reconnect();
48}catch(Exceptione){
49//donothing
50}
51}else{//provider端关闭连接
52channel.close();
53}
54}
55}catch(Throwablet){
56logger.warn("Exceptionwhenheartbeattoremotechannel"+channel.getRemoteAddress(),t);
57}
58}
59}catch(Throwablet){
60logger.warn("Unhandledexceptionwhenheartbeat,cause:"+t.getMessage(),t);
61}
62}
63
64interfaceChannelProvider{
65Collection<Channel>getChannels();
66}
67}

HeartBeatTask首先获取所有的channelProvider#getChannels获取所有需要心跳检测的channel,channelProvider实例是HeaderExchangeServer中在启动线程定时执行器的时候创建的内部类。

1newHeartBeatTask.ChannelProvider(){
2publicCollection<Channel>getChannels(){
3returnCollections.unmodifiableCollection(
4HeaderExchangeServer.this.getChannels());
5}
6}

免费体验云安全(易盾)内容安全、验证码等服务

更多网易技术、产品、运营经验分享请点击

相关文章:
【推荐】一个小白的测试环境docker化之路
【推荐】移动端推广APP防作弊机制之依我见
【推荐】大中型 UGC 平台的反垃圾(anti-spam)工作

免责声明:文章转载自《dubbo心跳机制 (1)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇线程.Qt更新界面Power BI 图标设置下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

yii2框架随笔35

今天来看vendor/yiisoft/yii2/base/Event.php <?php namespace yiibase; //事件是所有事件类的基类。它封装了参数与事件相关联。 //如果一个事件处理程序集[[进行]]是真的,其余的,uninvoked处理程序将不再被称为处理事件。 //另外,添加一个事件处理程序时,额外的数据可能被传递...

dubbo 配置文件详解

  一、dubbo常用配置   <dubbo:service/> 服务配置,用于暴露一个服务,定义服务的元信息,一个服务可以用多个协议暴露,一个服务也可以注册到多个注册中心。 eg、<dubbo:service ref="demoService" interface="com.unj.dubbotest.provider.DemoSer...

Handler处理长时间事件

当我们在处理一些比较长时间的事件时候,比如读取网络或者数据库的数据时候,就要用到Handler,有时候为了不影响用户操作应用的流畅还要开多一个线程来区别UI线程,在新的线程里面处理长时间的操作。开发的时候遇到数据处理都可以这样用handler,灵活修改一下就可以做各种效果了。 第一步:ProgressDialog弹出旋转框来提示长时间操作。 第二步:开多一...

Android学习笔记十:异步处理

转载请注明原文地址:http://www.cnblogs.com/ygj0930/p/7520700.html 一:基础概念 UI线程:当Android程序第一次启动时,Android会同时启动一条主线程(Main Thread),主线程主要负责处理与UI相关的事件,如用户的按键事件、屏幕绘图事件,并把相关的事件分发到对应的组件进行处理。主线程通常又被称为...

android 在子线程中使用handler更新界面

1. 在子线程中创建一个handler对象,让这个handler对象获取主线程的looper,这样才能把这个handler中的消息发送到ui线程的消息队列中 下面这个界面当点击updateui按钮就会创建一个对象然后调用它的更新图片和文字的方法,这两个设置方法在子线程中执行。 在更新界面的对象的类中创建一个handler对象,在初始化的时候给他赋值为Lo...

启动dubbo消费端过程提示No provider available for the service的问题定位与解决

文/朱季谦 某次在启动dubbo消费端时,发现无法从zookeeper注册中心获取到所依赖的消费者API,启动日志一直出现这样的异常提示 Failed to check the status of the service com.fte.zhu.api.testService. No provider available for the service c...