RabbitMQ消息可靠性传输

摘要:
首先,让我们看一下RabbitMQ消息传递的流程图。从上图中,我们可以看到消息传递涉及三个对象:生产者RabbitMQ消费者。然后,针对以上三个目标,重点分析了消息传输的可靠性。首先是制片人。这些优点确保消息可以发送到RabbitMQ,而不会在发送端丢失消息;缺点:事务机制被阻止(同步)。每次发送消息时,必须等待mq响应继续发送消息。这会消耗性能并导致吞吐量下降。确认模式基于交易特征。作为补偿,RabbitMQ添加了消息确认机制,即确认机制。

消息的可靠性投递是使用消息中间件不可避免的问题,不管是使用kafka、rocketMQ或者rabbitMQ,那么在RabbitMQ中如何保证消息的可靠性投递呢?

先再看一下RabbitMQ消息传递的流程图:

RabbitMQ消息可靠性传输第1张

从上面的图可以看到,消息的投递有三个对象参与:

  • 生产者
  • RabbitMQ(broker)
  • 消费者

那么消息的可靠性传输也主要是针对以上三个对象来分析,首先是生产者。

生产者丢失消息

生产者发送消息到broker时,要保证消息的可靠性,主要的方案有以下2种:

1.事务

2.confirm机制

事务

RabbitMQ提供了事务功能,也即在生产者发送数据之前开启RabbitMQ事务,然后再发送消息,如果消息没有成功发送到RabbitMQ,那么就抛出异常,然后进行事务回滚,回滚之后再重新发送消息,如果RabbitMQ接收到了消息,那么进行事务提交,再开始发送下一条数据。

优点

保证消息一定能够发送到RabbitMQ中,发送端不会出现消息丢失的情况;

缺点

事务机制是阻塞(同步)的,每次发送消息必须要等到mq回应之后才能继续发送消息,比较耗费性能,会导致吞吐量降下来

confirm模式

基于事务的特性,作为补偿,RabbitMQ添加了消息确认机制,也即confirm机制。

confirm机制和事务机制最大的不同就是事务是同步的,confirm是异步的,发送完一个消息后可以继续发送下一个消息,mq接收到消息后会异步回调接口告知消息接收结果。

生产者开启confirm模式后,每次发送的消息都会分配一个唯一id,如果消息成功发送到了mq中,那么就会返回一个ack消息,表示消息接收成功,反之会返回一个nack,告诉你消息接收失败,可以进行重试。依据这个机制,我们可以维护每个消息id的状态,如果超过一定时间还是没有接收到mq的回调,那么就重发消息。

代码实现

配置文件

spring:
  rabbitmq:
    publisher-confirms: true
    publisher-returns: true
    template:
      mandatory: true

此处省略了mq的其他配置项,只留下了开启confirm机制的三个配置项

publisher-confirm: 开启消息到达exchange的回调,发送成功失败都会触发回调

publisher-returns: 开启消息从exhcange路由到queue的回调,只有路由失败时才会触发回调

mandatory: 为true时,如果exchange根据routingKey将消息路由到queue时找不到匹配的queue,触发return回调,为false时,exchange直接丢弃消息。

创建交换机、队列以及绑定

@Bean
public FanoutExchange fanoutExchange(){
	return new FanoutExchange(exchangeName,true,false);
}
@Bean
public Queue queue(){
	return new Queue(queueName,true);
}

@Bean
public Binding binding(Queue queue, FanoutExchange fanoutExchange){
	return  BindingBuilder.bind(queue).to(fanoutExchange);
}

实现接口

实现RabbitTemplate.ConfirmCallbackRabbitTemplete.ReturnCallback接口,并且重写confirm和returnedMessage方法,并将其添加到RabbitTemplate的回调中,完整的生产者如下所示:

@Component
@Slf4j
public class MyProducer {

    @Value("${platform.exchange-name}")
    private String exchangeName;

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(){
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
            if(ack){
                log.info("消息{}接收成功",correlationData.getId());
            }else{
                log.info("消息{}接收失败,原因{}",correlationData.getId(),cause);
            }
        });

        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
            log.info("消息{}发送失败,应答码{},原因{},交换机{},路由键{}",message.toString(),replyCode,replyText,exchange,routingKey);
        });

        for (int i = 0; i < 10; i++) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(exchangeName,"","消息==>"+i,correlationData);
        }
    }
}

此时调用生产者发送消息,可以看到控制台输出以下内容:
RabbitMQ消息可靠性传输第2张

从mq的可视化管理界面上也可以看到,消息成功进入了队列中
RabbitMQ消息可靠性传输第3张

上述情况是消息正确发送到交换机的情况,那么如果我在发送消息时,故意写错交换机的名称会有什么情况呢,假设我们把生产者代码改为如下所示:

for (int i = 0; i < 10; i++) {
	CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
	//rabbitTemplate.convertAndSend(exchangeName,"","消息==>"+i,correlationData);
	rabbitTemplate.convertAndSend("test-confirm","","消息==>"+i,correlationData);
}

此时再次启动生产者,得到的结果如下:

消息f2214022-b3c0-462e-8a71-b0fb209bb784处理失败,失败原因channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'test-confirm' in vhost 'test', class-id=60, method-id=40)

消息90046cf8-9f84-4aba-b2e5-e667e8466e8c处理失败,失败原因channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'test-confirm' in vhost 'test', class-id=60, method-id=40)

通过上述的两个例子我们可以知道,当消息成功发送到交换机的时候,返回的是ack,当消息没有成功发送到交换机的时候,返回的是nack,并且会将异常的原因一起返回回来,通过分析异常原因可以知道消息没有正确发送的原因进而进行修改后重新发送消息。

上述的两个例子主要是针对ConfirmCallback的,什么情况下会触发ReturnCallback呢?前面也说过,就是当消息已经成功到达交换机,当交换机根据routingKey将消息路由到队列时,发现没有匹配的队列或者交换机根本就没有绑定队列,此时就会触发RetrunCallback,但是如果消息成功路由到队列中,Returncallback是不会触发的。

还是上面的例子,我们把交换机绑定的队列给删除掉,让交换机不绑定任何队列
RabbitMQ消息可靠性传输第4张

此时再执行上述的生产者代码,可以看到控制台的输出结果
RabbitMQ消息可靠性传输第5张

通过控制台信息我们可以看出来,消息成功到达了交换机,触发了ConfirmCallback回调,但是从交换机路由到队列时由于找不到匹配的队列,因此触发了ReturnCallback回调。

此外,要想在消息不能正确路由到队列时触发ReturnCallback回调,还必须设置rabbitmq.template.mandary=true,否则,消息直接被交换机丢弃,不会触发ReturnCallback回调。

confirm总结

confirm机制通过异步回调的方式来确认消息是否到达交换机以及消息是否正确路由到队列,主要可以总结为以下4点:

消息正确到达交换机,触发ConfirmCallback回调,返回ack;

消息没有正确到达交换机,触发ConfirmReturnCallback回调,返回nack;

消息正确的从交换机路由到队列,不触发ReturnCallback回调;

消息没有正确的从交换机路由到队列,设置mandory=true的情况下,触发ReturnCallback回调;

RabbitMQ(broker)丢失消息

前面我们从生产者的角度分析了消息可靠性传输的原理和实现,这一部分我们从broker的角度来看一下如何能保证消息的可靠性传输?

假设有现在一种情况,生产者已经成功将消息发送到了交换机,并且交换机也成功的将消息路由到了队列中,但是在消费者还未进行消费时,mq挂掉了,那么重启mq之后消息还会存在吗?如果消息不存在,那就造成了消息的丢失,也就不能保证消息的可靠性传输了。

也就是现在的问题变成了如何在mq挂掉重启之后还能保证消息是存在的?

解决方案:

开启RabbitMQ的持久化,也即消息写入后会持久化到磁盘,此时即使mq挂掉了,重启之后也会自动读取之前存储的额数据

开启持久化的步骤:

  • 创建交换机时,设置durable=true
    RabbitMQ消息可靠性传输第6张

  • 创建queue时,设置durable=true

这只会持久化当前队列的元数据,不会持久化消息数据
RabbitMQ消息可靠性传输第7张

  • 发送消息时,设置消息的deliveryMode=2

此时才会将消息持久化到磁盘上去,如果使用SpringBoot的话,发送消息时自动设置deliveryMode=2,不需要人工再去设置

使用可视化管理页面,可以看到队列中的数据的deliveryMode=2
RabbitMQ消息可靠性传输第8张

通过以上方式,可以保证大部分消息在broker不会丢失,但是还是有很小的概率会丢失消息,什么情况下会丢失呢?

假如消息到达队列之后,还未保存到磁盘mq就挂掉了,此时还是有很小的几率会导致消息丢失的。

这就要mq的持久化和前面的confirm进行配合使用,只有当消息写入磁盘后才返回ack,那么就是在持久化之前mq挂掉了,但是由于生产者没有接收到ack信号,此时可以进行消息重发。

消费者丢失消息

消费者什么情况下会丢失消息呢?

消费者接收到消息,但是还未处理或者还未处理完,此时消费者进程挂掉了,比如重启或者异常断电等,此时mq认为消费者已经完成消息消费,就会从队列中删除消息,从而导致消息丢失。

那该如何避免这种情况呢?这就要用到RabbitMQ提供的ack机制,RabbitMQ默认是自动ack的,此时需要将其修改为手动ack,也即自己的程序确定消息已经处理完成后,手动提交ack,此时如果再遇到消息未处理进程就挂掉的情况,由于没有提交ack,RabbitMQ就不会删除这条消息,而是会把这条消息发送给其他消费者处理,但是消息是不会丢的。

代码实现

配置文件

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual  # 手动ack
        prefetch: 5

消费者实现

@Component
@Slf4j
public class MyConsumer {
    @RabbitHandler
    @RabbitListener(queues = {"${platform.queue-name}"},concurrency = "1")
    public void msgConsumer(String msg, Channel channel, Message message) throws IOException {
        try {
            //int temp = 10/0;
            log.info("消息{}消费成功",msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("接收消息过程中出现异常,执行nack");
            //第三个参数为true表示异常消息重新返回队列,会导致一直在刷新消息,且返回的消息处于队列头部,影响后续消息的处理
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            log.error("消息{}异常",message.getMessageProperties().getHeaders());
        }
    }
}

acknowledge-mode: manual就表示开启手动ack,该配置项的其他两个值分别是none和auto

auto:消费者根据程序执行正常或者抛出异常来决定是提交ack或者nack,不要把none和auto搞混了

manual: 手动ack,用户必须手动提交ack或者nack

none: 没有ack机制

默认值是auto,如果将ack的模式设置为auto,此时如果消费者执行异常的话,就相当于执行了nack方法,消息会被放置到队列头部,消息会被无限期的执行,从而导致后续的消息无法消费。

对于channel.basicNack方法的第三个参数,表示消息nack后是否返回队列,如果设置为true,表示返回队列,此时消息处于队列头部,消费者会一直处理该消息,影响后续消息的消费,设置为false时表示不返回队列,此时如果设置有DLX(死信队列),那么消息会进入DLX中,后续再对该消息进行相应的处理,如果没有设置DLX,此时消息就会被丢弃。关于私信队列后续再单独来说。

根据以上分析,一般情况下,为了保证消息不丢失,还是建议使用手动ack的方式。

总结

本文主要从生产者、broker以及消费者三个层面分析了消息可能丢失的原因以及相应的解决方案,也就是生产者要确认消息发送到交换机,交换机要确认消息路由到队列,队列要对存储的消息进行持久化存储,消费者在消费时使用手动ack的方式确认消息完成消息,进过上述一系列的步骤达到消息可靠性传输的目的,下一篇是RabbitMQ的重试机制。

免责声明:文章转载自《RabbitMQ消息可靠性传输》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Maven 多模块父子工程 (含Spring Boot示例)3D局部光照模型 (转)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Qt调试信息重定向输出(qInstallMessageHandler)

由于工具需要,做了一小段Qt5测试代码,参考了网友的案例测试了以下功能 1 qDebug()重定向输出QT窗口 2 qDebug()信息保存到本地文件 QtMessageHandler qInstallMessageHandler(QtMessageHandler handler)   此函数在使用Qt消息处理程序之前已定义。返回一个指向前一个消息处理程序...

基于Abp VNext框架设计

abp 通过IDistributedEventBus接口集成自IEventBus实现分布式事件消息的发布订阅。 IEventBus在什么时机触发PublishAsync? 当前UnitOfWork完成时,触发IEventBus的PublishAsync 在没有事务环境下,同步调用IEventBus的PublishAsync abp 默认实现基于Ra...

ES6+转ES5

  本人近期接到一个天大的“好消息”:zxbc项目某些客户为保险业等种种原因要支持IE……  2013年,ES6草案冻结,2015年6月,ES6正式通过,成为国际标准。都9102啦,Chrome还好啦,升级到最新版本,大部分ES6还是ok的,但是万恶之源IE呢?作为一个前端开发者,兼容万恶的IE,顿时,胸中万马奔腾,此处省略十万字……  无奈之举,撸起袖子...

SQL 存储过程入门(事务)(四)

SQL 存储过程入门(事务)(四)  本篇我们来讲一下事务处理技术。 为什么要使用事务呢,事务有什么用呢,举个例子。 假设我们现在有个业务,当做成功某件事情的时候要向2张表中插入数据,A表,B表,我们插入的顺序是先插入A,再插入B表,如果都顺利插入成功了,当然没有问题,如果任意一张表插入失败了,而另一张表插入成功了,插入成功的表就是垃圾数据了。我们要判断...

阿里消息队列中间件 RocketMQ源码解析:Message发送&amp;amp;接收

关注微信公众号:【芋艿的后端小屋】有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址 您对于源码的疑问每条留言都将得到认真回复。甚至不知道如何读源码也可以请教噢。 新的源码解析文章实时收到通知。每周更新...

rabbitmq坑点与异常处理

一、None of the specified endpoints were reachable 这个异常在创建连接时抛出(CreateConnection()),原因一般是ConnectionFactory参数设置不对,比如HostName、UserName、Password 标准设置: var factory = new ConnectionFact...