分析dubbo心跳检测机制

摘要:
=null&&now lastRead˃heartbeat)||(lastWrite!
  • 目的: 
  • 维持provider和consumer之间的长连接
  • 实现: 
  • dubbo心跳时间heartbeat默认是60s,超过heartbeat时间没有收到消息,就发送心跳消息(provider,consumer一样),如果连着3次(heartbeatTimeout为heartbeat*3)没有收到心跳响应,provider会关闭channel,而consumer会进行重连;不论是provider还是consumer的心跳检测都是通过启动定时任务的方式实现;
  • provider绑定和consumer连接的入口:
public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}
  • provider启动心跳检测:
public HeaderExchangeServer(Server server) {
        if (server == null) {
            throw new IllegalArgumentException("server == null");
        }
        this.server = server;
        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
        //心跳超时时间默认为心跳时间的3倍
        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        //如果心跳超时时间小于心跳时间的两倍则抛异常
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        startHeatbeatTimer();
    }
  • startHeatbeatTimer的实现 
  • 先停止已有的定时任务,启动新的定时任务:
private void startHeatbeatTimer() {
        // 停止原有定时任务
        stopHeartbeatTimer();
        // 发起新的定时任务
        if (heartbeat > 0) {
            heatbeatTimer = scheduled.scheduleWithFixedDelay(
                    new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                        public Collection<Channel> getChannels() {
                            return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels());
                        }
                    }, heartbeat, heartbeatTimeout),
                    heartbeat, heartbeat, TimeUnit.MILLISECONDS);
        }
    }
  • HeartBeatTask的实现 
  • 遍历所有的channel,检测心跳间隔,如果超过心跳间隔没有读或写,则发送需要回复的心跳消息,最有判断是否心跳超时(heartbeatTimeout),如果超时,provider关闭channel,consumer进行重连
public void run() {
        try {
            long now = System.currentTimeMillis();
            for (Channel channel : channelProvider.getChannels()) {
                if (channel.isClosed()) {
                    continue;
                }
                try {
                    Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
                    Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
                    // 读写的时间,任一超过心跳间隔,发送心跳
                    if ((lastRead != null && now - lastRead > heartbeat)
                            || (lastWrite != null && now - lastWrite > heartbeat)) {
                        Request req = new Request();
                        req.setVersion("2.0.0");
                        req.setTwoWay(true); // 需要响应的心跳事件
                        req.setEvent(Request.HEARTBEAT_EVENT);
                        channel.send(req);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                                    + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
                        }
                    }
                    // 最后读的时间,超过心跳超时时间
                    if (lastRead != null && now - lastRead > heartbeatTimeout) {
                        logger.warn("Close channel " + channel
                                + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
                        // 客户端侧,重新连接服务端
                        if (channel instanceof Client) {
                            try {
                                ((Client) channel).reconnect();
                            } catch (Exception e) {
                                //do nothing
                            }
                        // 服务端侧,关闭客户端连接
                        } else {
                            channel.close();
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
                }
            }
        } catch (Throwable t) {
            logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
        }
    }
  • consumer端的实现 
  • 默认需要心跳检测
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        // 创建 HeaderExchangeChannel 对象
        this.channel = new HeaderExchangeChannel(client);
        // 读取心跳相关配置
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) { // 避免间隔太短
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        // 发起心跳定时器
        if (needHeartbeat) {
            startHeatbeatTimer();
        }
    }

我们可以看到dubbo的心跳检测,服务端会发送发送心跳包,客户端也会发送心跳包,与一般只有客户端发送心跳包,服务端接受心跳是有所不同的。

免责声明:文章转载自《分析dubbo心跳检测机制》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇【小梅哥SOPC学习笔记】设置Eclipse在编译(build)前自动保存源代码文件源码分析:若依用户 user_id查询返回除了用户表,为何还带有部门dept和角色role表呢下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

DeepFacelab更新:分辨率提至640,效率翻倍!

这几天又更新了,好事儿是又有新功能可以玩了,“坏事儿”是感觉又要升级设备了。其实,总的来说这次更新,对低配高配玩家都有好消息。 1. 对于高配玩家,可以跑更高像素的模型。 2. 对于低配玩家,那些跑不起的像素也能跑起来了。 下面就来说说重点更新内容: 1.  模型训练参数 resolution 的最大值从512调到了640。 ​ 这意味着对于高配玩家来说可...

RocketMQ事务消费和顺序消费详解

一、RocketMq有3中消息类型 1.普通消费 2. 顺序消费 3.事务消费 顺序消费场景 在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一、创建订单 ,第二:订单付款,第三:订单完成。也就是这个三个环节要有顺序,这个订单才有意义。RocketMQ可以保证顺序消费。 rocketMq实现顺序消费的原理  produce在发送消息的时候,把消息...

【RocketMQ】同一个项目中,同一个topic,可以存在多个消费者么?

一、问题答案 是不可以的 而且后注册的会替换前注册的,MqConsumer2会替换MqConsumer,并且只结束tag-2的消息 /** * @date 2019/05/28 */ @Component @Slf4j public class MqConsumer implementsMessageConsumer { @Overr...

python网络编程 day35 网络编程——进程池,线程池、协程、回调函数、gevent模块、asyncio模块

一、内容回顾 面试题: 请聊聊进程队列的特点和实现原理 进程之间可以互相通信 IPC 数据安全 先进先出 实现原理 管道+锁 管道是基于文件级别的socket+pickle实现的 你了解生产者消费者模型吗? 了解 为什么了解? 工作经历: 采集图片,爬取音乐,主要是爬取大量数据,想提高爬虫效率,有用过一个生产者消费者模型,这个模型...

SAP-Function

[转]sap函数大全 ********SAP中常用函数 函数名 描述SD_VBAP_READ_WITH_VBELN 根据销售订单读取表vbap中的信息EDIT_LINES 把READ_TEXT返回的LINES中的行按照TDFORMAT=“*”重新组织VIEW_MAINTENANCE_CALL 维护表视图 函数名 描述DY_GET_FOCUS 获得屏幕焦点D...

Win32编程

    Win32编程 此资料为ITjob软件开发教程网提供,特此分享,互相学习! C/C++/VC/MFC技术交流群:95453496 一、Win32编程基本概念 1、消息驱动 在介绍Windows消息驱动概念之前,我们首先来回顾面向过程的程序结构:main()程序有明显的开始、中间过程和结束点,程序是围绕这个过程编写好相关的子过程,再把这些子过程串联...