【ActiveMQ】消息重传机制

摘要:
确认当前会话中所有使用者中尚未确认的所有消息。我们通常使用基于组的CLIENT(消息分组)_确认:间歇性调用acknowledge方法,一次确认多条消息。ActiveMQMessageConumser。acknowledge()和ActiveMQSession。acknowledge()也可以确认消息。

问题:

项目在修改consumer的确认签收机制从AUTO-ACK改为CLIENT-ACK模式,但在服务优雅停机并重启之后,发现若干数据被Pending后未被重发。

过段时间再一次停启服务后,Pending的数据才能被消费成功。

此问题在数据量大时基本可以复现。

类似问题:

https://stackoverflow.com/questions/54074745/activemq-pending-messages

https://stackoverflow.com/questions/45539799/queue-consumers-have-pending-messages-but-they-are-not-getting-processed

https://stackoverflow.com/questions/18181619/consumer-is-not-receiving-messages-from-activemq

【ActiveMQ】消息重传机制第1张

思路:

1. ActiveMQ消息重传机制

2. ActiveMQ在非事务性生产者下

参考:

https://blog.csdn.net/czp11210/article/details/47022639/

https://blog.csdn.net/qq_39706128/article/details/80570577

http://codingdict.com/questions/36712

CLIENT_ACKNOWLEDGE : 客户端手动确认,这就意味着AcitveMQ将不会“自作主张”的为你ACK任何消息,开发者需要自己择机确认。在此模式下,开发者需要需要关注几个方法:1) message.acknowledge(),2) ActiveMQMessageConsumer.acknowledege(),3) ActiveMQSession.acknowledge();其1)和3)是等效的,将当前session中所有consumer中尚未ACK的消息都一起确认,2)只会对当前consumer中那些尚未确认的消息进行确认。开发者可以在合适的时机必须调用一次上述方法。

    我们通常会在基于Group(消息分组)情况下会使用CLIENT_ACKNOWLEDGE,我们将在一个group的消息序列接受完毕之后确认消息(组);不过当你认为消息很重要,只有当消息被正确处理之后才能确认时,也很可以使用此ACK_MODE。

    如果开发者忘记调用acknowledge方法,将会导致当consumer重启后,会接受到重复消息,因为对于broker而言,那些尚未真正ACK的消息被视为“未消费”。

    开发者可以在当前消息处理成功之后,立即调用message.acknowledge()方法来"逐个"确认消息,这样可以尽可能的减少因网络故障而导致消息重发的个数;当然也可以处理多条消息之后,间歇性的调用acknowledge方法来一次确认多条消息,减少ack的次数来提升consumer的效率,不过这仍然是一个利弊权衡的问题。

    除了message.acknowledge()方法之外,ActiveMQMessageConumser.acknowledge()和ActiveMQSession.acknowledge()也可以确认消息,只不过前者只会确认当前consumer中的消息。其中sesson.acknowledge()和message.acknowledge()是等效的。

    无论是“同步”/“异步”,ActiveMQ都不会发送STANDARD_ACK_TYPE,直到message.acknowledge()调用。如果在client端未确认的消息个数达到prefetchSize * 0.5时,会补充发送一个ACK_TYPE为DELIVERED_ACK_TYPE的确认指令,这会触发broker端可以继续push消息到client端。(参看PrefetchSubscription.acknwoledge方法)

    在broker端,针对每个Consumer,都会保存一个因为"DELIVERED_ACK_TYPE"而“拖延”的消息个数,这个参数为prefetchExtension,事实上这个值不会大于prefetchSize * 0.5,因为Consumer端会严格控制DELIVERED_ACK_TYPE指令发送的时机(参见ActiveMQMessageConsumer.ackLater方法),broker端通过“prefetchExtension”与prefetchSize互相配合,来决定即将push给client端的消息个数,count = prefetchExtension + prefetchSize - dispatched.size(),其中dispatched表示已经发送给client端但是还没有“STANDARD_ACK_TYPE”的消息总量;由此可见,在CLIENT_ACK模式下,足够快速的调用acknowledge()方法是决定consumer端消费消息的速率;如果client端因为某种原因导致acknowledge方法未被执行,将导致大量消息不能被确认,broker端将不会push消息,事实上client端将处于“假死”状态,而无法继续消费消息。我们要求client端在消费1.5*prefetchSize个消息之前,必须acknowledge()一次;通常我们总是每消费一个消息调用一次,这是一种良好的设计。

    此外需要额外的补充一下:所有ACK指令都是依次发送给broker端,在CLIET_ACK模式下,消息在交付给listener之前,都会首先创建一个DELIVERED_ACK_TYPE的ACK指令,直到client端未确认的消息达到"prefetchSize * 0.5"时才会发送此ACK指令,如果在此之前,开发者调用了acknowledge()方法,会导致消息直接被确认(STANDARD_ACK_TYPE)。broker端通常会认为“DELIVERED_ACK_TYPE”确认指令是一种“slow consumer”信号,如果consumer不能及时的对消息进行acknowledge而导致broker端阻塞,那么此consumer将会被标记为“slow”,此后queue中的消息将会转发给其他Consumer。

免责声明:文章转载自《【ActiveMQ】消息重传机制》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Nodejs学习笔记(十六)—Pomelo介绍&入门Windows Server2003本地用户的批量导入和导出(转)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

WebService(axis2),整合springmvc

 1.导入jar 2.在spring.xml中添加 <!-- axis2start --> <bean id="testWSService" class="com.sh.test.WsTestServer"></bean> <bean id="testWSService1" class="com....

Java高级--Java线程运行栈信息的获取 getStackTrace()

我们在Java程序中使用日志功能(JDK Log或者Log4J)的时候,会发现Log系统会自动帮我们打印出丰富的信息,格式一般如下:为了免去解析StackTrace字符串的麻烦,JDK1.4引入了一个新的类,StackTraceElement。   一、问题的引入   我们在Java程序中使用日志功能(JDK Log或者Log4J)的时候,会发现Log系统...

JSP 表单处理

JSP 表单处理 我们在浏览网页的时候,经常需要向服务器提交信息,并让后台程序处理。浏览器中使用 GET 和 POST 方法向服务器提交数据。 GET 方法 GET方法将请求的编码信息添加在网址后面,网址与编码信息通过"?"号分隔。如下所示: http://www.runoob.com/hello?key1=value1&key2=value2...

解决Mac应用程序软件不出现在Launchpad里面的方法

新装了几个软件,可是打开Lauchpad之后却在里面找不到,真是烦人!然后尝试了以下方法: 1、重启电脑,没用; 2、尝试打开“应用程序(英文名称:Applications)”并找到安装的软件,然后直接将其拖动到底部菜单的“Launchpad”图标上,当看到+号出现的时候松手,也没用; 如果有人尝试以上两种方式有用的话,也是可以的啊!可惜我的都没用,然后...

Object 中的wait和Thread中sleep的区别

一、区别 1、wait()来自于Object类而sleep来自于Thread类 2、sleep没有释放锁,但是wait释放了锁(使得其他线程可以使用同步控制块或者方法锁) 3、wait,notify和notifyAll只能在同步控制方法或者同步控制块使用,而sleep能在各个地方使用 4、sleep必须捕获异常,但是其它wait不用 5、sleep让一个线...

代理和协议区别及应用

协议与代理 一、理解协议与代理协议:协议是一个方法签名的列表,在其中可以定义若干个方法。根据配置,遵守该协议的类会去实现这个协议中规定的若干个方法。代理:代理是一个概念,很难用一个名词去定义(如我们可以说协议其实就是一个方法列表)。它更像是一种关系,我要做某一个事情,但我自己不想去做这件事,我委托其他人帮我去做这件事。这个时候,这位其他人就是我的代理。二、...