3-rocketmq-支持的消息种类

摘要:
通过MessageQueueSelector实现分区选择publicclassProducer{publicstaticvoidmain{try{DefaultMQProducerproducer=newDefaultMQProducer;producer.setNamesrvAddr;produr.setRetryTimesWhenSendFailed;produer.start();String[]tags=newString[]{“创建订单”、“付款”、“发货”、“收据”、“五星”};对于{orderId=i/5;Messagemsg=newMessage;/***实现MessageQueueSelector以确保顺序发送*实现MessageQueueSelector的三个实现:*SelectMessageQueueByHash*SelectMessageQueueByMachineRoom*SelectMessageQueue ByRandom*/SendResultsendResult=producer.send;System.out.printf;}producer。shutdown();}捕获{e.printStackTrace();}}消费者需要确保消息消费的顺序。最简单的方法是单线程消耗。使用MessageListenerOrderly实现消费监控问题:一个代理是否可以同时被多个消费者消费?这个热点问题似乎没有好的解决方案。我们只能拆分MessageQueue并优化路由方法,以尽可能均匀地将消息分发到不同的MessageQueue。

RocketMQ消息支持的模式

普通消息 NormalProducer

  • 消息同步发送

    producer.send(Message msg)

  • 消息异步发送

    producer.send(Message msg, SendCallback sendCallback)

  • 单向发送OneWay

    producer.sendOneWay(Message msg);

顺序消息 OrderProducer

简介

顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由两个部分组成:顺序发布和顺序消费。
顺序消息包含两种类型:

  • 分区顺序:一个Partition内所有的消息按照先进先出的顺序进行发布和消费

  • 全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费

全局顺序其实是分区顺序的一个特例,即使Topic只有一个分区(以下不在讨论全局顺序,因为全局顺序将面临性能的问题,而且绝大多数场景都不需要全局顺序)

在MQ的模型中,顺序需要由3个阶段去保障:

  1. 消息被发送时保持顺序
  2. 消息被存储时保持和发送的顺序一致(依赖1)
  3. 消息被消费时保持和存储的顺序一致(依赖2)

顺序发送和顺序存储由rocketmq保证,顺序消费需要由消费者业务层保证

RocketMQ顺序消息原理

img

Producer保证顺序发送,Consumer消费时通过一个分区只能有一个线程消费的方式来保证消息顺序

实现

生产者producer

producer只需要确保消息发送到特定的分区,也就是MessageQueue。通过MessageQueueSelector来实现分区的选择(自定义消息发送规则)

public class Producer {
    public static void main(String[] args) {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("producerGroup1");
            producer.setNamesrvAddr("10.211.55.4:9876");
            producer.setRetryTimesWhenSendFailed(3);
            producer.start();
            String[] tags = new String[]{"创建订单", "支付", "发货", "收货", "五星好评"};
            for (int i = 5; i < 25; i++) {
                int orderId = i / 5;
                Message msg = new Message("OrderTopic1", tags[i % tags.length], "uniqueId:" + i,
                        ("order_" + orderId + " " + tags[i % tags.length]).getBytes(RemotingHelper.DEFAULT_CHARSET));
                /**
                 * 实现MessageQueueSelector 保证顺序发送
                 * 实现MessageQueueSelector的三种实现:
                 * SelectMessageQueueByHash
                 * SelectMessageQueueByMachineRoom
                 * SelectMessageQueueByRandom
                 */
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        //此刻arg == orderId,可以保证是每个订单进入同一个队列
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);
                System.out.printf("%s%n", sendResult);
            }
            producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消费者consumer

消费者端需要保证消息消费的顺序性,最简单的办法就是单线程消费。使用MessageListenerOrderly实现消费监听

疑问:一个broker能否被多个consumer同时消费?

public class Consumer {
    public static void main(String[] args) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
        consumer.setNamesrvAddr("10.211.55.4:9876");
        try {
            //设置Consumer从哪开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("OrderTopic1", "*");
            // 实现了MessageListenerOrderly表示一个队列只会被一个线程取到, 第二个线程无法访问这个队列,MessageListenerOrderly默认单线程
//            consumer.setConsumeThreadMin(3);
//            consumer.setConsumeThreadMax(6);
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    try {
                        System.out.println("orderInfo: " + new String(msgs.get(0).getBody(), "utf-8"));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer1 Started.");
    }

}
顺序和异常的关系

顺序消息需要Producer和Consumer都保证顺序。Producer需要保证消息被路由到正确的分区,消息需要保证每个分区的数据只有一个线程消息,那么就会有一些缺陷:

  • 发送顺序消息无法利用集群的Failover特性,因为不能更换MessageQueue进行重试
  • 因为发送的路由策略导致的热点问题,可能某一些MessageQueue的数据量特别大
  • 消费的并行读依赖于分区数量
  • 消费失败时无法跳过

不能更换MessageQueue重试就需要MessageQueue有自己的副本,通过Raft、Paxos之类的算法保证有可用的副本,或者通过其他高可用的存储设备来存储MessageQueue。

热点问题好像没有什么好的解决办法,只能通过拆分MessageQueue和优化路由方法来尽量均衡的将消息分配到不同的MessageQueue。

消费并行度理论上不会有太大问题,因为MessageQueue的数量可以调整。

消费失败的无法跳过是不可避免的,因为跳过可能导致后续的数据处理都是错误的。不过可以提供一些策略,由用户根据错误类型来决定是否跳过,并且提供重试队列之类的功能,在跳过之后用户可以在“其他”地方重新消费到这条消息。

免责声明:文章转载自《3-rocketmq-支持的消息种类》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇(2)OLEDB数据库操作jquery下拉菜单下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

springCloud 后端使用webSocket向前端推送消息

1、webSocket webSocket长连接是一种在单个tcp连接上进行全双工通信的协议,允许双向数据推送。一般微服务提供的restful API只是对前端请求做出相应。使用webSocket可以实现后端主动向前端推送消息。 2、springboot使用webSocket 1、pom文件添加依赖 <dependency> <...

RocketMQ入门介绍

  简介 用官方的话来说,RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件,具有以下特性(ps:对于这些特性描述,大家简单过一眼就即可,深入学习之后自然就明白了): 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递 支持拉(pull)和推(push)两种消息模式 单...

windows Hook 消息分类

调用SetWindowsHookEx的DLL的模块实例句柄,它可以经由DllMain入口的第一个参数得到。HHOOK SetWindowsHookEx( int idHook,HOOKPROC lpfn,HINSTANCE hMod,DWORD dwThreadId);至于SetWindowsHookEx的第四个参数dwThreadId,才是你需要借由窗口句...

Handler为什么可能会造成内存泄漏以及可用的四种解决方法

在Android系统中,Handler是一个消息发送和处理机制的核心组件之一,与之配套的其他主要组件还有Looper和Message,MessageQueue。 根据官网的描述 There are two main uses for a Handler: (1) to schedule messages and runnables to be execut...

uniapp之页面间传递和接收数组

uni-app在页面之前如何发送和传递数组?如果直接发送和传递数组,接收到的消息如下显示。不能进一步获取对象值。  要想能够接收到数组对象的参数。可以先将数组转化为JSON字符串,传递到页面后在解析为JavaScript对象。设页面1传递数据到页面2.则,页面1的关键代码: 1 /** 2 * 跳转到下一个页面,并传递参数 3 */ 4 toN...

RocketMQ的安装配置:配置jdk环境,配置RocketMQ环境,配置集群环境,配置rocketmq-console

RocketMQ的安装配置 演示虚拟机环境:Centos64-1 (D:linuxMorecentos6_64) root / itcast : 固定IP 192.168.52.128 一,配置JDK环境 1,解压jdk到指定的目录 tar -xvf jdk-8u171-linux-x64.tar.gz -C /usr/local cd /usr/loca...