RabbitMQ重试机制

摘要:
注意:重试并不意味着RabbitMQ重新发送消息,它只是消费者的内部重试。换句话说,重试与mq无关;因此,try{}catch(){}不能添加到上述使用者代码中。在自动确认模式下,一旦捕获到异常,就相当于正确处理消息。消息被直接确认,不会触发重试;在上述MessageReCover示例的测试中,我们还发现了一个问题:在五次重试后,控制台输出异常堆栈日志,然后队列中的数据也被确认。首先,让我们看看异常日志是什么。

消费端在处理消息过程中可能会报错,此时该如何重新处理消息呢?解决方案有以下两种。

  • 在redis或者数据库中记录重试次数,达到最大重试次数以后消息进入死信队列或者其他队列,再单独针对这些消息进行处理;

  • 使用spring-rabbit中自带的retry功能;

第一种方案我们就不再详细说了,我们主要来看一下第二种方案,老规矩,先上代码:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto  # 自动ack
        retry:
          enabled: true
          max-attempts: 5
          max-interval: 10000   # 重试最大间隔时间
          initial-interval: 2000  # 重试初始间隔时间
          multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间

此时我们的消费者代码如下所示:

@RabbitHandler
@RabbitListener(queues = {"${platform.queue-name}"},concurrency = "1")
public void msgConsumer(String msg, Channel channel, Message message) throws IOException {
		log.info("接收到消息>>>{}",msg);
		int temp = 10/0;
		log.info("消息{}消费成功",msg);
}

此时启动程序,发送消息后可以看到控制台输出内容如下:

RabbitMQ重试机制第1张

可以看到重试次数是5次(包含自身消费的一次),重试时间依次是2s,4s,8s,10s(上一次间隔时间*间隔时间乘子),最后一次重试时间理论上是16s,但是由于设置了最大间隔时间是10s,因此最后一次间隔时间只能是10s,和配置相符合。

注意:

重试并不是RabbitMQ重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟mq没有任何关系;

因此上述消费者代码不能添加try{}catch(){},一旦捕获了异常,在自动ack模式下,就相当于消息正确处理了,消息直接被确认掉了,不会触发重试的;

MessageReCoverer

上面的例子在测试中我们还发现了一个问题,就是经过5次重试以后,控制台输出了一个异常的堆栈日志,然后队列中的数据也被ack掉了(自动ack模式),首先我们看一下这个异常日志是什么。

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted

出现消息被消费掉并且出现上述异常的原因是因为在构建SimpleRabbitListenerContainerFactoryConfigurer类时使用了MessageRecoverer接口,这个接口有一个cover方法,用来实现重试完成之后对消息的处理,源码如下:

ListenerRetry retryConfig = configuration.getRetry();
if (retryConfig.isEnabled()) {
	RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()
			: RetryInterceptorBuilder.stateful();
	RetryTemplate retryTemplate = new RetryTemplateFactory(this.retryTemplateCustomizers)
			.createRetryTemplate(retryConfig, RabbitRetryTemplateCustomizer.Target.LISTENER);
	builder.retryOperations(retryTemplate);
	MessageRecoverer recoverer = (this.messageRecoverer != null) ? this.messageRecoverer
			: new RejectAndDontRequeueRecoverer(); //<1>
	builder.recoverer(recoverer);
	factory.setAdviceChain(builder.build());

注意看<1>处的代码,默认使用的是RejectAndDontRequeueRecoverer实现类,根据实现类的名字我们就可以看出来该实现类的作用就是拒绝并且不会将消息重新发回队列,我们可以看一下这个实现类的具体内容:

public class RejectAndDontRequeueRecoverer implements MessageRecoverer {
	protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class); // NOSONAR protected
	@Override
	public void recover(Message message, Throwable cause) {
		if (this.logger.isWarnEnabled()) {
			this.logger.warn("Retries exhausted for message " + message, cause);
		}
		throw new ListenerExecutionFailedException("Retry Policy Exhausted",
					new AmqpRejectAndDontRequeueException(cause), message);
	}
}

上述源码给出了异常的来源,但是未看到拒绝消息的代码,猜测应该是使用aop的方式实现的,此处不再继续深究。

MessageRecoverer接口还有另外两个实现类,分别是RepublishMessageRecovererImmediateRequeueMessageRecoverer,顾名思义就是重新发布消息和立即重新返回队列,下面我们分别测试一个这两个实现类:

RepublishMessageRecoverer

先创建一个异常交换机和异常队列,并将两者进行绑定:

@Bean
public DirectExchange errorExchange(){
	return new DirectExchange("error-exchange",true,false);
}

@Bean
public Queue errorQueue(){
	return new Queue("error-queue", true);
}

@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){
	return BindingBuilder.bind(errorQueue).to(errorExchange).with("error-routing-key");
}

创建RepublishMessageRecoverer:

@Bean
public MessageRecoverer messageRecoverer(){
	return new RepublishMessageRecoverer(rabbitTemplate,"error-exchange","error-routing-key");
}

此时启动服务,查看处理结果:

RabbitMQ重试机制第2张

通过控制台可以看到,消息重试5次以后直接以新的routingKey发送到了配置的交换机中,此时再查看监控页面,可以看原始队列中已经没有消息了,但是配置的异常队列中存在一条消息。

RabbitMQ重试机制第3张

ImmediateRequeueMessageRecoverer

再测试一下ImmediateRequeueMessageRecoverer:

@Bean
public MessageRecoverer messageRecoverer(){
	return new ImmediateRequeueMessageRecoverer();
}

RabbitMQ重试机制第4张

重试5次之后,返回队列,然后再重试5次,周而复始直到不抛出异常为止,这样还是会影响后续的消息消费。

总结:

通过上面的测试,对于重试之后仍然异常的消息,可以采用RepublishMessageRecoverer,将消息发送到其他的队列中,再专门针对新的队列进行处理。

死信队列

除了可以采用上述RepublishMessageRecoverer,还可以采用死信队列的方式处理重试失败的消息。

首先创建死信交换机、死信队列以及两者的绑定

/**
 * 死信交换机
 * @return
 */
@Bean
public DirectExchange dlxExchange(){
	return new DirectExchange(dlxExchangeName);
}

/**
 * 死信队列
 * @return
 */
@Bean
public Queue dlxQueue(){
	return new Queue(dlxQueueName);
}

/**
 * 死信队列绑定死信交换机
 * @param dlxQueue
 * @param dlxExchange
 * @return
 */
@Bean
public Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){
	return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);
}

业务队列的创建需要做一些修改,添加死信交换机以及死信路由键的配置

/**
 * 业务队列
 * @return
 */
@Bean
public Queue queue(){
	Map<String,Object> params = new HashMap<>();
	params.put("x-dead-letter-exchange",dlxExchangeName);//声明当前队列绑定的死信交换机
	params.put("x-dead-letter-routing-key",dlxRoutingKey);//声明当前队列的死信路由键
	return QueueBuilder.durable(queueName).withArguments(params).build();
    //return new Queue(queueName,true);
}

此时启动服务,可以看到同时创建了业务队列以及死信队列

RabbitMQ重试机制第5张

在业务队列上出现了DLX以及DLK的标识,标识已经绑定了死信交换机以及死信路由键,此时调用生产者发送消息,消费者在重试5次后,由于MessageCover默认的实现类是RejectAndDontRequeueRecoverer,也就是requeue=false,又因为业务队列绑定了死信队列,因此消息会从业务队列中删除,同时发送到死信队列中。

注意:

如果ack模式是手动ack,那么需要调用channe.nack方法,同时设置requeue=false才会将异常消息发送到死信队列中

retry使用场景

上面说了什么是重试,以及如何解决重试造成的数据丢失,那么怎么来选择重试的使用场景呢?

是否是消费者只要发生异常就要去重试呢?其实不然,假设下面的两个场景:

  • http下载视频或者图片或者调用第三方接口
  • 空指针异常或者类型转换异常(其他的受检查的运行时异常)

很显然,第一种情况有重试的意义,第二种没有。

对于第一种情况,由于网络波动等原因造成请求失败,重试是有意义的;

对于第二种情况,需要修改代码才能解决的问题,重试也没有意义,需要的是记录日志以及人工处理或者轮询任务的方式去处理。

retry最佳实践

对于消费端异常的消息,如果在有限次重试过程中消费成功是最好的,如果有限次重试之后仍然失败的消息,不管是采用RejectAndDontRequeueRecoverer还是使用私信队列都是可以的,同时也可以采用折中的方法,先将消息从业务队列中ack掉,再将消息发送到另外的一个队列中,后续再单独处理异常数据的队列。

另外,看到有人说retry只能在自动ack模式下使用,经过测试在手动ack模式下retry也是生效的,只不过不能使用catch捕获异常,即使在自动ack模式下使用catch捕获异常也是会导致不触发重试的。当然,在手动ackm模式下要记得确认消息,不管是确认消费成功还是确认消费失败,不然消息会一直处于unack状态,直到消费者进程重启或者停止。

如果一定要在手动ack模式下使用retry功能,最好还是确认在有限次重试过程中可以重试成功,否则超过重试次数,又没办法执行nack,就会出现消息一直处于unack的问题,我想这也就是所说的retry只能在自动ack模式下使用的原因,测试代码如下:

@RabbitHandler
@RabbitListener(queues = {"${platform.queue-name}"},concurrency = "1")
public void msgConsumer(String msg, Channel channel, Message message) throws IOException {
	log.info("接收到消息>>>{}",msg);
	if(msg.indexOf("0")>-1){
		throw new RuntimeException("抛出异常");
	}
	log.info("消息{}消费成功",msg);
	channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

免责声明:文章转载自《RabbitMQ重试机制》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇table表单打印添加页码和自动分页Free Pascal IDE 下载、安装、配置下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

java加解密(一)

1、杂谈   1、古典密码学     核心:替换法/位移法(凯撒加密)     破解方法:频率分析法,即研究字母和字母组合在文本中出现的概率。   2、近代密码学:     恩尼格玛机     被图灵破解   3、现代密码学:     1、散列函数:散列函数,也叫杂凑函数、摘要函数或哈希函数,可将任意长度的消息经过运算,变成固定长度数值,常...

从未如此简单:10分钟带你逆袭Kafka!【转】

【51CTO.com原创稿件】Apache Kafka 是一个快速、可扩展的、高吞吐的、可容错的分布式“发布-订阅”消息系统, 使用 Scala 与 Java 语言编写,能够将消息从一个端点传递到另一个端点。 较之传统的消息中间件(例如 ActiveMQ、RabbitMQ),Kafka 具有高吞吐量、内置分区、支持消息副本和高容错的特性,非常适合大规模消息...

Modbus消息帧

  两种传输模式中(ASCII和RTU),传输设备以将Modbus消息转为有起点和终点的帧,这就允许接收的设备在消息起始处开始工作,读地址分配信息,判断哪一个设备被选中(广播方式则传给所以设备),判知何时信息已完成。部分的消息也能侦测到并且错误能设置为返回结果。   1、ASCII帧   使用ASCII模式,消息以冒号(:)字符(ASCII 3AH)开始,...

Samba 简介

SMB 代表的是服务器消息块 (Server Message Block),它是用于在 Windows 上共享文件的协议的原始名称。 CIFS 代表公共 Internet 文件系统 (Common Internet File System),它是 Microsoft 描述该协议最近一个版本的新字首组合词。 samba认证  Samba 有它自己独特的口令数...

celery使用

Celery 1.什么是Clelery Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统 专注于实时处理的异步任务队列 同时也支持任务调度 Celery架构 Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。 消息中间件 Ce...

ucGUI的视窗管理回调机制学习

要熟悉窗口的回调机制,重点理解回调函数作用,消息传递机制。 uC/GUI的窗口管理是个单独的软件,不是uC/GUI的基本组成部分。代码见\uCGUI\GUI\WM。当使用uC/GUI窗口管理时,任何能显示在显示终端上的内容都包含在一个窗口里面,这个窗口是LCD屏幕上的一个给用户画图或显示目标的区域。窗口能够是任何尺寸的,能够一次在屏幕上显示多个窗口,也能够...