【RocketMQ】RocketMQ事务消息 Demo

摘要:
RocketMQ版本为4.3.0。如果以前使用其他版本,则需要修改系统环境变量maven项目˂!

RocketMQ为4.3.0版本(我这种写法4.2.0不行)
如果你之前用的其他版本,需要去修改下系统的环境变量

maven工程用到的jar包

<dependencies>
<!-- RocketMQ -->
<dependency> 
<groupId>org.apache.rocketmq</groupId> 
<artifactId>rocketmq-all</artifactId> 
<version>4.3.0</version> 
<type>pom</type> 
</dependency>
<dependency> 
<groupId>org.apache.rocketmq</groupId> 
<artifactId>rocketmq-client</artifactId> 
<version>4.3.0</version> 
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.1</version>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>

  

生产者代码

  

package cn.ebiz.rocketmq.transaction;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class Producer {
  public static void main(String[] args) throws MQClientException, InterruptedException {
  //01 new 一个有事务基因的生产者
  TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
  //02 注册
  producer.setNamesrvAddr("127.0.0.1:9876");
  //03 开启
  producer.start();
/**
 * 04 生产者设置事务监听器,匿名内部类new一个事务监听器,
 * 重写“执行本地事务”和“检查本地事务”两个方法,返回值都为
 * “本地事务状态”
 */
  producer.setTransactionListener(new TransactionListener() {
         public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
              String tag = msg.getTags();
                    if(tag.equals("Transaction1")) {
                        System.out.println("这里处理业务逻辑,比如操作数据库,失败情况下进行回滚");
                        //如果失败,再次给MQ发送消息
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                  return LocalTransactionState.COMMIT_MESSAGE;
              }
   public LocalTransactionState checkLocalTransaction(MessageExt msg) {   System.out.println("state -- "+new String(msg.getBody()));   return LocalTransactionState.COMMIT_MESSAGE;    }   });
for(int i=0;i<2;i++) { try {      // 05 准备要发送的message,名字,标签,内容 Message msg = new Message("TopicTransaction","Transaction" + i,("Hello RocketMQ "+i).getBytes("UTF-8"));       // 06 用发送事务特有的方法发送消息,而不是简单的producer.send(msg); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.println(msg.getBody()); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } // 07 关闭 producer.shutdown(); } }

  


消费者代码

package cn.ebiz.rocketmq.transaction;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer0 {
        public static void main(String[] args) throws MQClientException {
        //01默认的消息消费FF者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
        //02注册
            consumer.setNamesrvAddr("127.0.0.1:9876");
        //03设置获取原则
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //04订阅
            consumer.subscribe("TopicTransaction","*");
        //05注册监听器
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    try {
        // 06接收消息并打印
                        for (MessageExt msg : msgs) {
                            String topic = msg.getTopic();
                            String msgBody = new String(msg.getBody(),"utf-8");
                            String tags = msg.getTags();
                            System.out.println("收到消息: topic:"+topic+" ,tags:"+tags+" ,msg: "+msgBody);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
        // 1s 2s 5s ... 2h
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
        // 07开启
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }


//namesrv启动
start mqnamesrv.cmd
//broker启动
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true


---------------------
转自:https://blog.csdn.net/weixin_38537747/article/details/82112584

免责声明:文章转载自《【RocketMQ】RocketMQ事务消息 Demo》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇在springmvc中配置jedis(转)CAD文件图片浏览库控件CADViewX下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

RocketMQ源码 — 八、 RocketMQ消息重试

RocketMQ的消息重试包含了producer发送消息的重试和consumer消息消费的重试。 producer发送消息重试 producer在发送消息的时候如果发送失败了,RocketMQ会自动重试。 private SendResult sendDefaultImpl( Message msg, final Communication...

Eclipse利用Maven快速上手搭建MyBatis

一、what is maven? Maven项目对象模型(POM),可以通过一小段描述信息来管理项目的构建,报告和文档的项目管理工具软件。 Maven 除了以程序构建能力为特色之外,还提供高级项目管理工具。由于 Maven 的缺省构建规则有较高的可重用性,所以常常用两三行 Maven 构建脚本就可以构建简单的项目。由于 Maven 的面向项目的方法,许多...

Hbase之批量数据写入

/** * Created by similarface on 16/8/16. */ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import...

Maven里头的pom.xml配置详解

正常的pom配置文件如下所示: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0...

Windows 下apache https配置(phpstudy)

1.首先获取证书,https://www.pianyissl.com/ 免费三个月的 或者 自己生成私钥、证书,然后应用到apache中。 http://blog.sina.com.cn/s/blog_58f71ef00102wvlx.html 讲解了如何在windows PHPStudy Apache 配置支持HTTPS http://www.cnbl...

【Apache】在Apache中利用ServerAlias设置虚拟主机接收多个域名和设置域名泛解析

ServerAlias:服务器别名,在Apache中可以用于设置虚拟主机接收到个域名,也可以用于接收泛解析的域名。具体的设置方法如下: 一、用于设置虚拟主机接收多个域名 一个虚拟主机常常会接收多个域名解析,比如:一个虚拟主机要同时介绍doctor-c.net, doctor-c.com两个域名,或者是两个二级域名,如:www.doctor-c.net, w...