【--RocketMQ--】RocketMQ实现事务消息

摘要:
如果提交提交或回滚状态,代理将将消息写入OpTopic,默认情况下为RMQ _SYS_TRANS_OP_HALF_TOPIC。此主题主要用于记录已提交或回滚的准备消息。代理使用HalfTopic和OpTopic来计算需要检查的事务消息。调用fillOpRemoveMap方法以在OpHalfTopic中完成准备好的事务消息。在监控事务消息时,我们主要检查事务消息是否处于所需状态。事务消息生成器成功发送准备消息后,我们只能获得transactionId,它不是RocketMQ消息存储的物理偏移地址。RocketMQ只有在准备好写入commitlog文件时才会生成true msgId,这里


在RocketMQ4.3.0版本后,开放了事务消息这一特性,对于分布式事务而言,最常说的还是二阶段提交协议,那么RocketMQ的事务消息又是怎么一回事呢,这里主要带着以下几个问题来探究一下RocketMQ的事务消息:

  事务消息是如何实现的
  我们有哪些手段来监控事务消息的状态
  事务消息的异常恢复机制
  RocketMQ的事务消息是如何实现的

RocketMQ作为一款消息中间件,主要作用就是帮助各个系统进行业务解耦,以及对消息流量有削峰填谷的作用,而对于事务消息,主要是通过消息的异步处理,可以保证本地事务和消息发送同时成功执行或失败,从而保证数据的最终一致性,

事务消息从诞生到结束的整个时间线流程:

  

  生产者发送消息到broker,该消息是prepare消息,且事务消息的发送是同步发送的方式。
  broker接收到消息后,会将该消息进行转换,所有的事务消息统一写入Half Topic,该Topic默认是RMQ_SYS_TRANS_HALF_TOPIC ,写入成功后会给生产者返回成功状态。
  本地生产获取到该消息的事务Id,进行本地事务处理。
  本地事务执行成功提交Commit,失败则提交Rollback,超时提交或提交Unknow状态则会触发broker的事务回查。
  若提交了Commit或Rollback状态,Broker则会将该消息写入到Op Topic,该Topic默认是RMQ_SYS_TRANS_OP_HALF_TOPIC,该Topic的作用主要记录已经Commit或Rollback的prepare消息,Broker利用Half Topic和Op Topic计算出需要回查的事务消息。如果是commit消息,broker还会将消息从Half取出来存储到真正的Topic里,从而消费者可以正常进行消费,如果是Rollback则不进行其他操作
  如果本地事务执行超时或返回了Unknow状态,则broker会进行事务回查。若生产者执行本地事务超过6s则进行第一次事务回查,总共回查15次,后续回查间隔时间是60s,broker在每次回查时会将消息再在Half Topic写一次。回查次数和时间间隔都是可配置的。
  执行事务回查时,生产者可以获取到事务Id,检查该事务在本地执行情况,返回状态同第一次执行本地事务一样。

  

  从上述流程可以看到事务消息其实只是保证了生产者发送消息成功与本地执行事务的成功的一致性,消费者在消费事务消息时,broker处理事务消息的消费与普通消息是一样的,若消费不成功,则broker会重复投递该消息16次,若仍然不成功则需要人工介入。

事务消息的成功投递是需要经历三个Topic的

Half Topic:用于记录所有的prepare消息
Op Half Topic:记录已经提交了状态的prepare消息
Real Topic:事务消息真正的Topic,在Commit后会才会将消息写入该Topic,从而进行消息的投递


  理解清楚事务消息在这三个Topic的流转就基本理解清楚了RocketMQ的事务消息的处理。接下来我们看看在源码中是如何使用这三个Topic的。

事务消息是如何处理回查的
  在RocketMQ中,消息都是顺序写随机读的,以offset来记录消息的存储位置与消费位置,所以对于事务消息的prepare消息来说,不可能做到物理删除,broker启动时每间隔60s会开始检查一下有哪些prepare消息需要回查,从上面的分析我们知道,所有prepare消息都存储在Half Topic中,那么如何从该Topic中取出需要回查的消息进行回查呢?这就需要Op Half Topic以及一个内部的消费进度计算出需要回查的prepare消息进行回查:

  Half Topic 默认Topic是RMQ_SYS_TRANS_HALF_TOPIC,建一个队列,存储所有的prepare消息
  Op Half Topic默认是RMQ_SYS_TRANS_OP_HALF_TOPIC,建立的对列数与Half Topic相同,存储所有已经确定状态的prepare消息(rollback与commit状态),消息内容是该条消息在Half Topic的Offset
  Half Topic消费进度,默认消费者是CID_RMQ_SYS_TRANS,每次取prepare消息判断回查时,从该消费进度开始依次获取消息。
  Op Half Topic消费进度,默认消费者是CID_RMQ_SYS_TRANS,每次获取prepare消息都需要判断是否在Op Topic中已存在该消息了,若存在表示该prepare消息已结束流程,不需要再进行事务回查,每次判断都是从Op Topic中获取一定消息数量出来进行对比的,获取的消息就是从Op Topic中该消费进度开始获取的,最大一次获取32条。

获取Half Topic的所有队列,循环队列开始检测需要获取的prepare消息,实际上Half Topic只有一个队列。
获取Half Topic与Op Half Topic的消费进度。
调用fillOpRemoveMap方法,获取Op Half Topic中已完成的prepare事务消息。
从Half Topic中当前消费进度依次获取消息,与第3步获取的已结束的prepare消息进行对比,判断是否进行回查:
如果Op消息中包含该消息,则不进行回查,
如果不包含,获取Half Topic中的该消息,判断写入时间是否符合回查条件,若是新消息则不处理下次处理,并将消息重新写入Half Topic,判断回查次数是否小于15次,写入时间是否小于72h,如果不满足就丢弃消息,若满足则更新回查次数,并将消息重新写入Half Topic并进行事务回查,
在循环完后重新更新Half Topic与Op Half Topic中的消费进度,下次判断回查逻辑时,将从最新的消费进度获取信息。
生产客户端的ClientRemotingProcessor的processRequest方法会处理服务端的CHECK_TRANSACTION_STATE请求,最后会调用checkLocalTransactionState方法,该方法就是业务方可以自己实现事务消息回查逻辑的地方,并将结果最后用endTransactionOneway方法返回给Broker,该执行逻辑可以通过ClientRemotingProcessor的方法processRequest依次理解就可以了。

  

我们有哪些手段来监控事务消息的状态
事务消息主要有三个状态:

UNKNOW状态:表示事务消息未确定,可能是业务方执行本地事务逻辑时间耗时过长或者网络原因等引起的,该状态会导致broker对事务消息进行回查,默认回查总次数是15次,第一次回查间隔时间是6s,后续每次间隔60s,
ROLLBACK状态,该状态表示该事务消息被回滚,因为本地事务逻辑执行失败导致
COMMIT状态,表示事务消息被提交,会被正确分发给消费者。


那么监控事务消息时,主要是查看该事务消息是否是处于我们想要的状态,而在事务消息生产者发送prepare消息成功后只能拿到一个transactionId,该id不是的RocketMQ消息存储的物理offset地址,RocketMQ只有在准备写入commitlog文件时才会生成真正的msgId,而这里可以获取的transactionId和msgId都是客户端生成的一个消息的唯一标识符,我们在这里称为uniqId,在broker端,会把该uniqId作为一个msgKey写入消息,所以可以通过该uniqId来查找uniqId的一些状态:

通过DefaultMQAdminExt的viewMessage(String topic, String msgId)方法可以消息的信息,这里topic参数是RMQ_SYS_TRANS_HALF_TOPIC ,该topic是真正的Half Topic,msgId传发送prepare消息获取的uniqId,这样可以获取prepare消息在Half Topic真正的offsetMsgId,
通过第一步获取的offsetMsgId继续调用viewMessage(String topic, String msgId)方法,但是topic是RMQ_SYS_TRANS_OP_HALF_TOPIC,这样可以获取Op Half Topic中该事务消息的状态,如果存在说明prepare消息已处理,否则可能仍在回查中或已被丢弃
如果在第二步查到了信息可以用uniqId和事务消息真正Topic继续调用viewMessage(String topic, String msgId)方法获取消息真正的信息,如果存在说明消息已被投递,否则该事务消息已被回滚。只通过Op Half Topic是不能确定消息状态的,这里的sysFlag被设置0,sysFlag是用于确定事务消息状态。
通过上述三步就可以确定事务消息的状态。

事务消息的异常恢复机制
  事务消息的异常状态主要有:

生产者提交prepare消息到broker成功,但是当前生产者实例宕机了
生产者提交prepare消息到broker失败,可能是因为提交的broker已宕机
生产者提交prepare消息到broker成功,执行本地事务逻辑成功,但是broker宕机了未确定事务状态
生产提交prepare消息到broker成功,但是在进行事务回查的过程中broker宕机了,未确定事务状态

  异常解决:

对于1:事务消息会根据producerGroup搜寻其他的生产者实例进行回查,所以transactionId务必保存在中央存储中,并且事务消息的pid不能跟其他消息的pid混用。
对于2:当前实例会搜寻其他的可用的broker-master进行提交,因为只有提交prepare消息后才会执行本地事务,所以没有影响,注意生产者报的是超时异常时,是不会进行重发的。
对于3:因为返回状态是oneway方式,此时如果消费者未收到消息,需要用手段确定该事务消息的状态,尽快将broker重启,broker重启后会通过回查完成事务消息。
对于4:同3,尽快重启broker。

---------------------

转自:https://blog.csdn.net/qq_28632173/article/details/83790243 

免责声明:文章转载自《【--RocketMQ--】RocketMQ实现事务消息》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇SDWebImage缓存机制如何在idea中使用Mybatis-generator插件快速生成代码下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

hook千牛 千牛破解发消息 千牛机器人 千牛发消息组件 调用千牛发消息 实时获取千牛聊天记录 可以提供代码

由于开发的时候,需要调用千牛发消息,所以研究了如何调用千牛发消息的组件,非协议破解,需要挂机,基本不弹发消息的窗体,非模拟发送,直接调用千牛的某个方法直接发送的,挂机后还能获取订单,实时获取聊天记录,能拿来做机器人等,需要的私聊我。可以提供源代码或组件...

Xamarin Mono For Android 4.6.07004 完整离线安装破解版(C#开发Android、IOS工具)

Xamarin Mono For Android 常见问题解决方法/工具/教程大全:http://www.wuleba.com/tag/Mono 2013-05-24 附加消息,吾乐吧软件站提醒大家: Xamarin 4.6.07004 压缩包里面的 Xamarin iOS 插件最新版(mtvs-1.1.200.0)“不能”破解,请不要安装最新这个。与此同时...

MFC避免窗口闪烁的方法(OnEraseBkgnd) .

在图形图象处理编程过程中,双缓冲是一种基本的技术。我们知道,如果窗体在响应WM_PAINT消息的时候要进行复杂的图形处理,那么窗体在重绘时由于过频的刷新而引起闪烁现象。解决这一问题的有效方法就是双缓冲技术。因为窗体在刷新时,总要有一个擦除原来图象的过程OnEraseBkgnd,它利用背景色填充窗体绘图区,然后在调用新的绘图代码进行重绘,这样一擦一写造成了图...

RocketMQ(八)RocketMQ的Consumer负载均衡

一、问题描述 RocketMQ的Consumer是如何做的负载均衡?比如:5个Consumer进程同时消费一个Topic,这个Topic只有4个queue会出现啥情况?反之Consumer数量小于queue的数据是啥情况? 二、源码剖析 1、RebalancePushImpl public class RebalancePushImpl extends R...

(2356)SQLite多线程下的并发操作_飞翔的种子_百度空间

(2356)SQLite多线程下的并发操作_飞翔的种子_百度空间 SQLite多线程下的并发操作 这两天一直在捣鼓SQLite数据库,基本的操作就不说了,比较简单,打算有空的话另起一篇博文简单总结一下。 这里主要想探讨一下多路并发下的数据库操作 SQLite作为一款小型的嵌入式数据库,本身没有提供复杂的锁定机制,无法内部管理多路并发下的数据操作同步问题,...

MySQL学习笔记——〇一

这里不讲MySQL的原理和连接的方法了,就讲一下如何对数据库进行操作。 用户操作 创建用户 创建用户的方法:我们可以用下面的代码进行用户的创建 create user 'username'@'ip' identified by 'password'; 在上面的代码中,表示创建了用户名为username的用户,用户登录ip限制为ip,密码为password。...