RocketMQ(八)RocketMQ的Consumer负载均衡

摘要:
相反,当消费者人数少于排队人数时,情况会怎样?topic.startsWith){log.warn;}}}}//删除与未订阅主题this对应的消息队列。truncateMessageQueueNotMyTopic();}}2.1.RebalanceByTopic privatevalue ByTopic{switch{caseCLUSTERING:{//获取与topic对应的队列和使用者信息。例如,mqSet如下/***0={MessageQueue@2151}“MessageQueue〔topic=myTopic001,brokerName=broker-a,queueId=3〕”*1={MessageQueue@2152}“消息队列〔topic=myTopic001,brokerName=broker-a,queueId=0〕”*2={MessageQueue@2153}“消息队列〔topic=myTopic001,brokerName=broker-a,queueId=2〕”*3={MessageQueue@2154}“MessageQueue[topic=myTopic001,brokerName=broker-a,queueId=1]”*/SetmqSet=this.topicSubscribeInfoTable。获取;//所有消费者客户端cid,例如:172.16.20.246@7832ListcidAll=this.mQClientFactory。查找ConsumerIdList;如果(mqSet!
一、问题描述

RocketMQ的Consumer是如何做的负载均衡?比如:5个Consumer进程同时消费一个Topic,这个Topic只有4个queue会出现啥情况?反之Consumer数量小于queue的数据是啥情况?

二、源码剖析

1、RebalancePushImpl

public class RebalancePushImpl extends RebalanceImpl {
    public RebalancePushImpl(String consumerGroup, MessageModel messageModel,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy,
        MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
        // 可以看到很简单,调用了父类RebalanceImpl的构造器
        super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
    }

2、RebalanceImpl

public abstract class RebalanceImpl {
    // 很简单,就是初始化一些东西,关键在于下面的doRebalance
    public RebalanceImpl(String consumerGroup, MessageModel messageModel,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy,
        MQClientInstance mQClientFactory) {
        this.consumerGroup = consumerGroup;
        this.messageModel = messageModel;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        this.mQClientFactory = mQClientFactory;
    }
    
    /**
     * 分配消息队列,命名抄袭spring,doXXX开始真正的业务逻辑
     *
     * @param isOrder:是否是顺序消息 true:是;false:不是
     */
    public void doRebalance(final boolean isOrder) {
        // 分配每个topic的消息队列
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    // 这个是关键了
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }

        // 移除未订阅的topic对应的消息队列
        this.truncateMessageQueueNotMyTopic();
    }
}

2.1、rebalanceByTopic

private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
        case CLUSTERING: {
            // 获取topic对应的队列和consumer信息,比如mqSet如下
            /**
             * 0 = {MessageQueue@2151} "MessageQueue [topic=myTopic001, brokerName=broker-a, queueId=3]"
             * 1 = {MessageQueue@2152} "MessageQueue [topic=myTopic001, brokerName=broker-a, queueId=0]"
             * 2 = {MessageQueue@2153} "MessageQueue [topic=myTopic001, brokerName=broker-a, queueId=2]"
             * 3 = {MessageQueue@2154} "MessageQueue [topic=myTopic001, brokerName=broker-a, queueId=1]"
             */
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            // 所有的Consumer客户端cid,比如:172.16.20.246@7832
            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
            if (mqSet != null && cidAll != null) {
                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                // 为什么要addAll到list里,因为他要排序
                mqAll.addAll(mqSet);

                // 排序消息队列和消费者数组,因为是在进行分配队列,排序后,各Client的顺序才能保持一致。
                Collections.sort(mqAll);
                Collections.sort(cidAll);

                // 默认选择的是org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely
                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                // 根据队列分配策略分配消息队列
                List<MessageQueue> allocateResult = null;
                try {
                    // 这个才是要介绍的真正C位,strategy.allocate()
                    allocateResult = strategy.allocate(
                        this.consumerGroup,
                        this.mQClientFactory.getClientId(),
                        mqAll,
                        cidAll);
                } catch (Throwable e) {
                    return;
                }
            }
        }
    }
}

3、AllocateMessageQueueAveragely

3.1、allocate

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        /**
         * 参数校验的代码我删了。
         */
        
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        /**
         * 第几个Consumer,这也是我们上面为什么要排序的重要原因之一。
         * Collections.sort(mqAll);
         * Collections.sort(cidAll);
         */
        int index = cidAll.indexOf(currentCID);
        // 取模,多少消息队列无法平均分配 比如mqAll.size()是4,代表4个queue。cidAll.size()是5,代表一个consumer,那么mod就是4
        int mod = mqAll.size() % cidAll.size();
        // 平均分配
        // 4 <= 5 ? 1 : (4 > 0 && 1 < 4 ? 4 / 5 + 1 : 4 / 5)
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        // 有余数的情况下,[0, mod) 平分余数,即每consumer多分配一个节点;第index开始,跳过前mod余数。
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        // 分配队列数量。之所以要Math.min()的原因是,mqAll.size() <= cidAll.size(),部分consumer分配不到消息队列。
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
}

3.2、解释

看着这算法凌乱的很,太复杂了!说实话,确实挺复杂,蛮罗嗦的,但是代数法可以得到如下表格:

假设4个queueConsumer有2个 可以整除Consumer有3个 不可整除Consumer有5个 无法都分配
queue[0]Consumer[0]Consumer[0]Consumer[0]
queue[1]Consumer[0]Consumer[0]Consumer[1]
queue[2]Consumer[1]Consumer[1]Consumer[2]
queue[3]Consumer[1]Consumer[2]Consumer[3]

所以得出如下真香定律(也是回击面试官的最佳答案):

  • queue个数大于Consumer个数,且queue个数能整除Consumer个数的话, 那么Consumer会平均分配queue。(比如上面表格的Consumer有2个 可以整除部分)
  • queue个数大于Consumer个数,且queue个数不能整除Consumer个数的话, 那么会有一个Consumer多消费1个queue,其余Consumer平均分配。(比如上面表格的Consumer有3个 不可整除部分)
  • queue个数小于Consumer个数,那么会有Consumer闲置,就是浪费掉了,其余Consumer平均分配到queue上。(比如上面表格的Consumer有5个 无法都分配部分)

4、补充

queue选择算法也就是负载均衡算法有很多种可选择:

  • AllocateMessageQueueAveragely:是前面讲的默认方式
  • AllocateMessageQueueAveragelyByCircle:每个消费者依次消费一个partition,环状。
  • AllocateMessageQueueConsistentHash:一致性hash算法
  • AllocateMachineRoomNearby:就近元则,离的近的消费
  • AllocateMessageQueueByConfig:是通过配置的方式
三、何时Rebalance

那就得从Consumer启动的源码开始看起,先看Consumer的启动方法start()

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    private MQClientInstance mQClientFactory;
    
    // 启动Consumer的入口函数
 public synchronized void start() throws MQClientException {
        this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(
            this.defaultMQPushConsumer, this.rpcHook);
        // 调用MQClientInstance的start方法,追进去看看。
        mQClientFactory.start();
    }
}

看看mQClientFactory.start();都干了什么

public class MQClientInstance {
    private final RebalanceService rebalanceService;
    
    public void start() throws MQClientException {
        synchronized (this) {
            // 调用RebalanceService的start方法,别慌,继续追进去看看
   this.rebalanceService.start();
        }
    }
}

看看rebalanceService.start();都干了什么,先看下他的父类ServiceThread

/*
 * 首先可以发现他是个线程的任务,实现了Runnable接口
 * 其次发现上步调用的start方法居然就是thread.start(),那就相当于调用了RebalanceService的run方法
 */
public abstract class ServiceThread implements Runnable {
 public void start() {
        this.thread = new Thread(this, getServiceName());
        this.thread.setDaemon(isDaemon);
        this.thread.start();
    }

最后来看看RebalanceService.run()

public class RebalanceService extends ServiceThread {
    /**
     * 等待时间的间隔,毫秒,默认是20s
     */
    private static long waitInterval =
        Long.parseLong(System.getProperty(
            "rocketmq.client.rebalance.waitInterval", "20000"));

    @Override
    public void run() {
        while (!this.isStopped()) {
            // 等待20s,然后超时自动释放锁执行doRebalance
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }
    }
}

到这里真相大白了。

当一个consumer出现宕机后,默认最多20s,其它机器将重新消费已宕机的机器消费的queue,同样当有新的Consumer连接上后,20s内也会完成rebalance使得新的Consumer有机会消费queue里的msg。

等等,好像有问题:新上线一个Consumer要等20s才能负载均衡?这不是搞笑呢吗?肯定有猫腻。

确实,新启动Consumer的话会立即唤醒沉睡的线程, 让他立马进行this.mqClientFactory.doRebalance();,源码如下

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    // 启动Consumer的入口函数
 public synchronized void start() throws MQClientException {        
        // 看到了没!!!, 见名知意,立即rebalance负载均衡
        this.mQClientFactory.rebalanceImmediately();
    }
}

免责声明:文章转载自《RocketMQ(八)RocketMQ的Consumer负载均衡》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇vue基础(七),同源策略以及跨域,vuex超简单,安卓模拟器手动root下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Kafka — 高吞吐量的分布式发布订阅消息系统【转】

1.Kafka独特设计在什么地方?2.Kafka如何搭建及创建topic、发送消息、消费消息?3.如何书写Kafka程序?4.数据传输的事务定义有哪三种?5.Kafka判断一个节点是否活着有哪两个条件?6.producer是否直接将数据发送到broker的leader(主节点)?7.Kafa consumer是否可以消费指定分区消息?8.Kafka消息是采...

Java 代码实现rar解压最全攻略操作

最全Java 代码实现rar解压操作首先,非常感谢下面几位链接上的支持,之所以写这篇博文,主要在于总结,同时给第一次实现该功能的同学提供完整的参考。因为第一次遇到需要在代码中实现rar和zip的解压操作。而zip其实很简单,jdk自带的ZipUtil就可以实现,这里不做赘述。但是rar的解压,特别是5.0及其以上版本的解压,折腾了我很久。根据这几位博主的思...

java 调用apache.commons.codec的包简单实现MD5加密

转自:https://blog.csdn.net/mmd1234520/article/details/70210002/ 1 importjava.security.MessageDigest; 2 importjava.security.NoSuchAlgorithmException; 3 4 import org.apache....

关于配置文件Web.config文件的家常事

1. 在Web.config文件中数据库连接字符串的运用      a.将web.config文件中<system.web>标签之上的<connectionStrings />更改如下:          <connectionStrings>            <add  name="ConnStr"  co...

Unity3D中使用委托和事件

Unity3D中使用委托和事件  c#语言规范 阅读目录 1.C#中的委托、事件引入 2.方法的参数是方法 前言: 本来早就想写写和代码设计相关的东西了,以前做2DX的时候就有过写写观察者设计模式的想法,但是实践不多。现在转到U3D的怀抱中,倒是接触了不少委托事件的写法,那干脆就在此总结一下吧。 回到目录 1.C#中的委托、事件引入 本想去找...

调用钉钉接口发送消息

1.首先登陆钉钉开发者后台 https://ding-doc.dingtalk.com/ 2.选择H5微应用,创建应用 4.创建好之后,查看所建好的应用信息 其中AgentId,AppKey,AppSecret很重要,调用时需要用到 5.直接上代码看效果 1 string appkey = "dingv0cab6brl1ax6exd"; 2...