RocketMQ消息至少一次(At least Once)投递和消费

摘要:
至少一次意味着每条消息必须传递一次。消费者首先将消息拉到本地,然后在消费完成后将ack返回给服务器。如果没有消费,则不会有确认消息。因此RocketMQ可以很好地支持此功能。上述措施难以实施。事实上,重复的业务记录(如订单ID)可以通过密钥记录。

至少一次(At least Once)指每个消息必须投递一次。Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。

生产者

在同步非顺序投递的时候,每次都是轮询到不同的队列:

        Message message = new Message("topic_family", ("  同步发送  ").getBytes());
        SendResult sendResult = producer.getProducer().send(message);

结果:

Product-同步发送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC38D018B4AAC2221C3BBE0000, offsetMsgId=C0A80A0B00002A9F00000000007B4A5C, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=1], queueOffset=11014]
Product-同步发送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC38D018B4AAC2221C6A3D0002, offsetMsgId=C0A80A0B00002A9F00000000007B4BB0, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=3], queueOffset=11012]
Product-同步发送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC38D018B4AAC2221CAEE20004, offsetMsgId=C0A80A0B00002A9F00000000007B4D04, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=2], queueOffset=11004]
Product-同步发送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC38D018B4AAC2221CEA6D0006, offsetMsgId=C0A80A0B00002A9F00000000007B4E58, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=2], queueOffset=11005]

异步

写入补偿log,进行重发
@GetMapping("/async")
    private void async() throws Exception {
        //创建消息
        Message message = null;
        for (int i=0;i<100;i++){
            if (i==90) {
                new RuntimeException("");
            }
            message = new Message("topic_family", ("异步发送:" + i).getBytes());
            System.out.println("异步发送:"+ i);

            //异步发送消息
            producer.getProducer().send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    //log.info("Product-异步发送-输出信息={}", sendResult);
                    System.out.println("Product-异步发送-输出信息={}" + sendResult);
                }
                @Override
                public void onException(Throwable e) {
                    //e.printStackTrace();
                    System.out.println("Product-异步发送-异常" + e.getMessage());
                    //写入补偿log,进行重发
                }
            });
        }
    }

重发带来的重复消息问题-上半场幂等

1,发送端MQ-client将消息发给服务端MQ-server
2,服务端MQ-server将消息落地
3,服务端MQ-server回ACK给发送端MQ-client
如果3丢失,发送端MQ-client超时后会重发消息,可能导致服务端MQ-server收到重复消息。
此时重发是MQ-client发起的,消息的处理是MQ-server,为了避免步骤2落地重复的消息,对每条消息,MQ系统内部必须生成一个inner-msg-id,作为去重和幂等的依据,这个内部消息ID的特性是:
(1)全局唯一
(2)MQ生成,具备业务无关性,对消息发送方和消息接收方屏蔽
有了这个inner-msg-id,就能保证上半场重发,也只有1条消息落到MQ-server的DB中,实现上半场幂等。
以上的措施实施比较麻烦,实际上可以通过key来记录重复的业务记录,比如订单id。
package com.xin.rocketmq.demo.testrun;

import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class ProduceOnce {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        producer.setNamesrvAddr("192.168.10.11:9876");

        producer.start();
        Message sendMessage = new Message(
                JmsConfig.TOPIC,
                "订单001".getBytes());

        sendMessage.setKeys("OD0000000001");//模拟同一个ID
        SendResult sendResult1 = producer.send(sendMessage);
        SendResult sendResult2 = producer.send(sendMessage);
        System.out.println("Product1-同步发送-Product信息={}" + sendResult1);
        System.out.println("Product2-同步发送-Product信息={}" + sendResult2);
        producer.shutdown();
    }
    }

结果:

Product1-同步发送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC088C18B4AAC2224396E40000, offsetMsgId=C0A80A0B00002A9F00000000007DE870, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=0], queueOffset=11258]
Product2-同步发送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC088C18B4AAC2224396E40000, offsetMsgId=C0A80A0B00002A9F00000000007DE926, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=1], queueOffset=11262]

消费端

在非去重对的消费情况下,会产生重复消费
package com.xin.rocketmq.demo.testrun;

import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ConsumerOnce {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // 设置NameServer的地址
        consumer.setNamesrvAddr("192.168.10.11:9876");

        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe(JmsConfig.TOPIC, "*");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

结果,消费了2次

ConsumeMessageThread_1 Receive New Messages: [properties={MIN_OFFSET=0, MAX_OFFSET=11260, KEYS=OD0000000001, CONSUME_START_TIME=1591515973997, UNIQ_KEY=A9FEC2CC1B4418B4AAC22248484E0000, WAIT=true}, body=[-24, -82, -94, -27, -115, -107, 48, 48, 49], transactionId='null'}]] 
ConsumeMessageThread_2 Receive New Messages:  properties={MIN_OFFSET=0, MAX_OFFSET=11264, KEYS=OD0000000001, CONSUME_START_TIME=1591515974001, UNIQ_KEY=A9FEC2CC1B4418B4AAC22248484E0000, WAIT=true}, body=[-24, -82, -94, -27, -115, -107, 48, 48, 49], transactionId='null'}]] 

业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。(具体如何ACK见后面章节)

如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。

为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费租的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。

注:

  1. 如果业务的回调没有处理好而抛出异常,会认为是消费失败当ConsumeConcurrentlyStatus.RECONSUME_LATER处理。
  2. 当使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有RECONSUME_LATER的这个状态,只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。

我们可以使用db的唯一键,或者缓存的唯一Id来记录需要消费一次的id。

RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)

msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。

每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。

   // 订单Id   
   String orderId = "20034568923546";   
   message.setKeys(orderId);   

模拟代码:

package com.xin.rocketmq.demo.testrun;

import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.ArrayList;
import java.util.List;

public class ConsumerOnce {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        List<String> redisKeyList = new ArrayList<String>();
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // 设置NameServer的地址
        consumer.setNamesrvAddr("192.168.10.11:9876");

        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe(JmsConfig.TOPIC, "*");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    // 默认msgs只有一条消息
                    String repeatID = "";
                    for (MessageExt msg : msgs) {
                        String key = msg.getKeys();
                        return noRepeat(key);
                        //return noRepeatConsume(repeatID,key);
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }

            private ConsumeConcurrentlyStatus noRepeat(String key) {
                for (String item : redisKeyList){
                    System.out.println("Redis 缓存中的keys:" + item);
                }
               if (!redisKeyList.contains(key)) {
                   redisKeyList.add(key);
                   System.out.println("Redis redisKeyList.size():" + redisKeyList.size());
                   System.out.println("Redis 缓存插入:" + key);
                   System.out.printf("%s Receive Messages: %s %n", Thread.currentThread().getName(), key);
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
               else{
                   System.out.printf("%s 重复Messages: %s %n", Thread.currentThread().getName(), key);
                   return ConsumeConcurrentlyStatus.RECONSUME_LATER;
               }
            }

        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

对于并发的消费监听器,你可以返回 RECONSUME_LATER 来通知消费者现在不能消费这条消息,并且希望可以稍后重新消费它。然后,你可以继续消费其他消息。对于有序的消息监听器,因为你关心它的顺序,所以不能跳过消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告诉消费者等待片刻。

结果没有重复消费了

Redis redisKeyList.size():1
Redis 缓存插入:OD0000000001
ConsumeMessageThread_2 Receive Messages: OD0000000001 
Redis 缓存中的keys:OD0000000001
ConsumeMessageThread_1 重复Messages: OD0000000001 
Redis 缓存中的keys:OD0000000001
ConsumeMessageThread_3 重复Messages: OD0000000001 
Redis 缓存中的keys:OD0000000001
ConsumeMessageThread_4 重复Messages: OD0000000001 
Redis 缓存中的keys:OD0000000001
ConsumeMessageThread_5 重复Messages: OD0000000001 
Redis 缓存中的keys:OD0000000001
ConsumeMessageThread_6 重复Messages: OD0000000001 
Redis 缓存中的keys:OD0000000001
ConsumeMessageThread_7 重复Messages: OD0000000001 
Redis 缓存中的keys:OD0000000001
ConsumeMessageThread_8 重复Messages: OD0000000001 

结果是不断地重复尝试消费,该怎么处理?可以删除重复的记录。

消费端常见的幂等操作总结
  1. 业务操作之前进行状态查询 消费端开始执行业务操作时,通过幂等id首先进行业务状态的查询,如:修改订单状态环节,当订单状态为成功/失败则不需要再进行处理。那么我们只需要在消费逻辑执行之前通过订单号进行订单状态查询,一旦获取到确定的订单状态则对消息进行提交,通知broker消息状态为:ConsumeConcurrentlyStatus.CONSUME_SUCCESS 。
  2. 业务操作前进行数据的检索 逻辑和第一点相似,即消费之前进行数据的检索,如果能够通过业务唯一id查询到对应的数据则不需要进行再后续的业务逻辑。如:下单环节中,在消费者执行异步下单之前首先通过订单号查询订单是否已经存在,这里可以查库也可以查缓存。如果存在则直接返回消费成功,否则进行下单操作。
  3. 唯一性约束保证最后一道防线 上述第二点操作并不能保证一定不出现重复的数据,如:并发插入的场景下,如果没有乐观锁、分布式锁作为保证的前提下,很有可能出现数据的重复插入操作,因此我们务必要对幂等id添加唯一性索引,这样就能够保证在并发场景下也能保证数据的唯一性。
  4. 引入锁机制 上述的第一点中,如果是并发更新的情况,没有使用悲观锁、乐观锁、分布式锁等机制的前提下,进行更新,很可能会出现多次更新导致状态的不准确。如:对订单状态的更新,业务要求订单只能从初始化->处理中,处理中->成功,处理中->失败,不允许跨状态更新。如果没有锁机制,很可能会将初始化的订单更新为成功,成功订单更新为失败等异常的情况。 高并发下,建议通过状态机的方式定义好业务状态的变迁,通过乐观锁、分布式锁机制保证多次更新的结果是确定的,悲观锁在并发环境不利于业务吞吐量的提高因此不建议使用。
  5. 消息记录表 这种方案和业务层做的幂等操作类似,由于我们的消息id是唯一的,可以借助该id进行消息的去重操作,间接实现消费的幂等。

首先准备一个消息记录表,在消费成功的同时插入一条已经处理成功的消息id记录到该表中,注意一定要 与业务操作处于同一个事物 中,当新的消息到达的时候,根据新消息的id在该表中查询是否已经存在该id,如果存在则表明消息已经被消费过,那么丢弃该消息不再进行业务操作即可。

肯定还有更多的场景我没有涉及到,这里说到的操作均是互相之间有关联的,将他们配合使用更能够保证消费业务的幂等性。

不论怎样,请牢记:缓存是不可靠的,在享受异步化、削峰、消息堆积等的好处之外,增加了业务复杂性,需要谨慎处理幂等操作 

免责声明:文章转载自《RocketMQ消息至少一次(At least Once)投递和消费》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇水晶报表开发之常用代码以及注意事项Nginx配置文件中文注释详解下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

SpringBoot入门-Redis(六)

依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId>...

JAVA UUID 生成唯一密钥(可随机选择长度)

/*** 获得指定数目的UUID * @param number int 需要获得的UUID数量 * @return String[] UUID数组 */public static String[] getUUID(int number){if(number < 1){return null;}String[] retArray = new Stri...

String源码详解

一、基本概念。     1、继承实现关系。因为被final修饰,因此是不可继承的String类,避免被他人继承后修改。实现了三个接口。可序列、可比较,有序。几个String兄弟类     2、本质就是字符数组,同时,它是不可变的。 二、成员变量。      1、字符数组value。访问权限私有,因此String类外具有不可访问特点,因为具有final...

序列化 反序列化 MessagePack for C#

阅读目录 快速序列化组件MessagePack介绍 简介 使用 快速开始 分析器 内置的支持类型 对象序列化 DataContract兼容性 序列化不可变对象(序列化构造器) 序列化回调 Union Dynamic(Untyped)反序列化 Object 类型序列化 Typeless 性能 反序列化中每个方法的性能 LZ4压缩 与protobuf,JS...

使用filebeat 收集日志到logstash 收集日志redis再到logstash到es

大型场合的工作流程图 filebeat -->logstash ---> redis ---> logstash --->es 工作环境: 需要两台logstash, 安装jdk8 [root@es-web1]# apt install openjdk-8-jdk -y 这里已经安装filebeat 配置filebeat(这里的...

浏览器的缓存机制

浏览器的缓存机制,其实主要就是HTTP协议定义的缓存机制(expires,Cache-control等),但是也有非HTTP协议定义的缓存机制,如使用HTML的meta标签,虽然使用上很简单,但是只有部分浏览器支持,所以并不建议使用。本文主要讲的是HTTP协议定义的缓存机制,一般将缓存定为两大类,分别是强缓存和协商缓存。 强缓存:用户发送的请求直接从客户端...