【RocketMQ】同一个项目中,同一个topic,可以存在多个消费者么?

摘要:
一、问题答案是不可以的而且后注册的会替换前注册的,MqConsumer2会替换MqConsumer,并且只结束tag-2的消息/***@date2019/05/28*/@Component@Slf4jpublicclassMqConsumerimplementsMessageConsumer{@Override@Transactional(rollbackFor=Throwable.class,p

一、问题答案

是不可以的

而且后注册的会替换前注册的,MqConsumer2会替换MqConsumer,并且只结束tag-2的消息

【RocketMQ】同一个项目中,同一个topic,可以存在多个消费者么?第1张

/**
 * @date 2019/05/28
 */
@Component
@Slf4j
public class MqConsumer implementsMessageConsumer {
    @Override
    @Transactional(rollbackFor = Throwable.class, propagation =Propagation.REQUIRED)
    public voidonMessage(String msg) {
        log.info("接收到的库存MQ消息:{}", msg);
        log.info("接收到的库存MQ消息:{}", msg);
        log.info("接收到的库存MQ消息:{}", msg);
    }
    @Override
    publicString getTopic() {
        return "topic-1";
    }
    @Override
    publicString getTag() {
        return "tag-1";
    }
}
@Component
@Slf4j
public class MqConsumer2 implementsMessageConsumer {
    @Override
    @Transactional(rollbackFor = Throwable.class, propagation =Propagation.REQUIRED)
    public voidonMessage(String msg) {
        log.info("接收到的库存MQ消息:{}", msg);
        log.info("接收到的库存MQ消息:{}", msg);
        log.info("接收到的库存MQ消息:{}", msg);
    }
    @Override
    publicString getTopic() {
        return "topic-1";
    }
    @Override
    publicString getTag() {
        return "tag-2";
    }
}

二、为什么呢?

我们从源码的角度来分析下

1.订阅消息的方法 public void subscribe(String topic, String subExpression, MessageListener listener) ,其中subExpression即为tag

packagecom.aliyun.openservices.ons.api.impl.rocketmq;
....
@Generated("ons-client")
public class ConsumerImpl extends ONSConsumerAbstract implementsConsumer {
    private final ConcurrentHashMap<String, MessageListener> subscribeTable = new ConcurrentHashMap<String, MessageListener>();
    public ConsumerImpl(finalProperties properties) {
        super(properties);
        boolean postSubscriptionWhenPull = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.PostSubscriptionWhenPull, "false"));
        this.defaultMQPushConsumer.setPostSubscriptionWhenPull(postSubscriptionWhenPull);
        String messageModel =properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(messageModel));
    }
    @Override
    public voidstart() {
        this.defaultMQPushConsumer.registerMessageListener(newMessageListenerImpl());
        super.start();
    }
    @Override
    public voidsubscribe(String topic, String subExpression, MessageListener listener) {
        if (null ==topic) {
            throw new ONSClientException("topic is null");
        }
        if (null ==listener) {
            throw new ONSClientException("listener is null");
        }
        this.subscribeTable.put(topic, listener);
        super.subscribe(topic, subExpression);
    }
.....
}

从上面的类中我们可以从this.subscribeTable.put(topic, listener);看到subscribeTable这样的一个Map,该Map与tag无关

2.我们再看super.subscribe(topic, subExpression)方法,属于ONSConsumerAbstract类中

protected voidsubscribe(String topic, String subExpression) {
        try{
            this.defaultMQPushConsumer.subscribe(topic, subExpression);
        } catch(MQClientException e) {
            throw new ONSClientException("defaultMQPushConsumer subscribe exception", e);
        }
    }

DefaultMQPushConsumer中:

@Override
    public void subscribe(String topic, String subExpression) throwsMQClientException {
        this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression);
    }

DefaultMQPushConsumerImpl中:

public void subscribe(String topic, String subExpression) throwsMQClientException {
        try{
//此处用来构建订阅数据,并且指定了tag SubscriptionData subscriptionData
= FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subExpression);
//此处将topic和该topic的订阅数据存放到subscriptionInner这个Map中
// protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =new ConcurrentHashMap<String, SubscriptionData>();
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) { this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); } } catch(Exception e) { throw new MQClientException("subscription exception", e); } }

三、总结

从上面简单的源码可以看到,有用到两个Map,

subscribeTable 和 subscriptionInner ,并且Map的key都为topic。所以我们可以笃定,RocketMQ在同一个项目中,只支持注册一个topic消费者,那么也就只能指定一个tag

免责声明:文章转载自《【RocketMQ】同一个项目中,同一个topic,可以存在多个消费者么?》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Jenkins进阶-用户权限管理(10)Visual Studio中查看和修改文件编码下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

kafka 0.8.2 消息消费者 consumer

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven...

用广播监听安卓设备电量状态

  发送通知   这次邮件我们将会讨论怎么获取电量状态在安卓设备上,为了完成这个目标,我们将会使用到广播。 What is BroadcastReceiver?A broadcast receiver is an Android component which allows you to register for system or applic...

java爬虫(四)利用Jsoup获取需要登陆的网站中的内容(无验证码的登录)

一、实现原理 登录之后进行数据分析,精确抓取数据。根据上篇文章的代码,我们不仅获取了cookies,还获取了登录之后返回的网页源码,此时有如下几种种情况:(1)若我们所需的数据就在登录之后返回的源码里面,那么我们就可以直接通过Jsoup去解析源码了,然后利用Jsoup的选择器功能去筛选出我们需要的信息;(2)若需要的数据是需要通过请求源码里的链接得到,那么...

Java几种常用JSON库性能比较

本篇通过JMH来测试一下Java中几种常见的JSON解析库的性能。 每次都在网上看到别人说什么某某库性能是如何如何的好,碾压其他的库。但是百闻不如一见,只有自己亲手测试过的才是最值得相信的。 JSON不管是在Web开发还是服务器开发中是相当常见的数据传输格式,一般情况我们对于JSON解析构造的性能并不需要过于关心,除非是在性能要求比较高的系统。 目前对于J...

Scala学习——模式匹配

scala模式匹配 1.基础match case(类似java里switch case,但功能强大些) object MatchApp { def main(args: Array[String]): Unit = { val is = Array("a","b","c","d") val i = is(Random.nextInt...

window.open()不同源页面通信

父页面 运行端口:8080 <template> <div> <el-button @click="open()">发送消息给子页面</el-button> </div> </template> <script> export default{ na...