RabbitMQ学习07--消息重复消费

摘要:
非幂等性,需要保证消息不会被重复消费。为了解决消费重复消费的问题,可以使用Redis,在消费者消费之前,先将消息的id放到Redis中,id-0id-1如果ack失败,在RabbitMQ将消息交给其他消费者时,先执行Redis的setnx。value值为1,表示已经有其他消费者处理了这个消息,但是没有向RabbitMQ发送ack,则但钱消费者直接发送ack。

幂等性操作 :可以重复执行的操作,可以不用保证消息重复消费。

非幂等性,需要保证消息不会被重复消费。

重复消费原因:消费者消费了消息,但并没有向rabbitmq发送ack。

为了解决消费重复消费的问题,可以使用Redis,在消费者消费之前,先将消息的id放到Redis中,

id-0(正在执行业务)

id-1(业务执行成功)

如果ack失败,在RabbitMQ将消息交给其他消费者时,先执行Redis的setnx。

如果key不存在,则当前消费者消费此消息,然后发送ack。

如果key已经存在,则判断其值:

value值为0,表示有消费者正在处理这个消息,则当前消费者不做操作。

value值为1,表示已经有其他消费者处理了这个消息,但是没有向RabbitMQ发送ack,则但钱消费者直接发送ack。

极端情况:某消费者在执行业务时出现了死锁,则需要在setnx的基础上,再设置一个生存时间。

发布者代码:

1 packagecom.yas.myreturn;
2 
3 import com.rabbitmq.client.*;
4 importcom.yas.config.RabbitMQClient;
5 importorg.junit.Test;
6 
7 importjava.io.IOException;
8 importjava.util.UUID;
9 
10 public classPublisher {
11 @Test
12     public void publish() throwsException {
13         //1.获取连接对象
14         Connection connection =RabbitMQClient.getConnection();
15         //2.创建Channel
16         Channel channel =connection.createChannel();
17 
18         //3.1 开启Confirm
19 channel.confirmSelect();
20 
21         //判断消息发送是否成功
22         //异步发送
23         channel.addConfirmListener(newConfirmListener() {
24 @Override
25             public void handleAck(long deliveryTag, boolean multiple) throwsIOException {
26                 System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量:" +multiple);
27 }
28 
29 @Override
30             public void handleNack(long deliveryTag, boolean multiple) throwsIOException {
31                 System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量:" +multiple);
32 }
33 });
34 
35         //3.发布消息到exchange,同时指定路由规则
36 
37         //开启Return机制
38         channel.addReturnListener(newReturnListener() {
39             //当消息没有送到queue时,才会执行
40 @Override
41             public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throwsIOException {
42                 System.out.println(new String(body, "UTF-8") + "没有送到Queue中");
43 }
44 });
45 
46         //参数1:指定exchange,使用空字符串,表示默认exchange。
47         //参数2:指定路由规则,使用具体的队列名称。
48         //参数3:指定传递的消息所携带的properties。
49         //参数4:指定发布的具体消息,字节数组类型byte[]
50 //channel.basicPublish("", "HelloWorld", null, msg.getBytes());
51 
52         //生产者发送消息时,指定messageid
53         AMQP.BasicProperties properties = newAMQP.BasicProperties().builder()
54                 .deliveryMode(1)//指定消息是否需要持久化,1需要,2不需要
55 .messageId(UUID.randomUUID().toString())
56 .build();
57         String msg = "Hello World";
58         //使用带return机制的发布,mandatory:true
59         channel.basicPublish("", "HelloWorld", true, properties, msg.getBytes());
60 
61         //注意:exchange是不会将消息持久化到本地的,Queue有持久化的功能。
62 
63         System.out.println("生产者发布消息成功");
64 
65         //4.释放资源
66 channel.close();
67 connection.close();
68 }
69 }

消费者代码:

1 packagecom.yas.myreturn;
2 
3 import com.rabbitmq.client.*;
4 importcom.yas.config.RabbitMQClient;
5 importorg.junit.Test;
6 importredis.clients.jedis.Jedis;
7 
8 importjava.io.IOException;
9 
10 public classConsumer {
11 @Test
12     public void consume() throwsException {
13         //1.获取连接对象
14         Connection connection =RabbitMQClient.getConnection();
15         //2.创建channel
16         Channel channel =connection.createChannel();
17         //3.生命队列-Hello World
18         //参数1:queue,队列名称
19         //参数2:durable,当前队列是否需要持久化
20         //参数3:exclusive,是否排外
21         //影响1:当connection.close()时,当前队列会被自动删除
22         //影响2:当前队列只能被一个消费者消费
23         //参数4:autoDelete,如果这个队列没有消费者消费,队列自动删除
24         //参数5:arguments,指定当前队列的其他信息
25         channel.queueDeclare("HelloWorld", true, false, false, null);
26         //4.开启监听Queue
27         DefaultConsumer consumer = newDefaultConsumer(channel) {
28 @Override
29             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throwsIOException {
30                 //super.handleDelivery(consumerTag, envelope, properties, body);
31                 //1、连接Redis
32                 Jedis jedis = new Jedis("ubu", 6379);
33 
34                 jedis.auth("123456");
35 
36                 String messageId =properties.getMessageId();
37                 //2、操作Redis - redis的命令是什么jedis对应的方法就是什么
38                 //setnx,value值0
39                 String redisResult = jedis.set(messageId, "0", "NX", "EX", 10000);
40                 //如果setnx返回不为null(返回1),表示设置成功,redis原本没有这个key
41                 if (redisResult != null && redisResult.equalsIgnoreCase("ok")) {
42                     //消费成功,set messageid
43                     System.out.println("接受到消息id:" + messageId + ",内容:" + new String(body, "UTF-8"));
44                     jedis.set(messageId, "1");//redis标记修改为1,表示已经消费完成
45                     channel.basicAck(envelope.getDeliveryTag(), false);
46                 } else{
47                     //如果setnx返回null(返回0),redis原本有这个key
48                     String s = jedis.get(messageId);//读取原来的值
49                     if ("1".equalsIgnoreCase(s)) {//如果是1 表示原来的消费者已经消费了信息,但是没有发送ack,则当前消费者补发ack
50                         System.out.println("对消息:" + s + ",补发ack");
51                         channel.basicAck(envelope.getDeliveryTag(), false);//补发ack
52                     } else{
53                         //如果是0 表示原来的消费者正在处理,现在的消费者不用做任何处理
54                         System.out.println("无需对消息" + s + "做任何处理");
55 }
56 }
57                 //3、释放资源
58 jedis.close();
59 }
60 };
61         //参数1:queue,指定消费哪个队列
62         //参数2:deliverCallback,指定是否自动ACK,(true表示,接受到消息后,会立即通知RabbitMQ)
63         //参数3:consumer,指定消费回调
64         channel.basicConsume("HelloWorld", true, consumer);
65         System.out.println("消费者开始监听队列");
66 System.in.read();
67         //5/释放资源
68 channel.close();
69 connection.close();
70 }
71 }

免责声明:文章转载自《RabbitMQ学习07--消息重复消费》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇vue发布IIS踩坑记ubuntu下nvm,node以及npm的安装与使用下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

调用支付宝转账接口(单笔)

下面这几个类都是支付宝demo里面的,直接拿过来用就可以 using System.Web; using System.Text; using System.IO; using System.Net; using System; using System.Collections.Generic; namespace Com.Alipay { pu...

权限认证机制

一、Form表单认证 之前的项目以MVC为主,采用的是from表单认证,Forms认证示意图如下: HTTP是一个无状态的协议,WEB服务器在处理所有传入HTTP请求时,根本就不知道某个请求是否是一个用户的第一次请求与后续请求,或者是另一个用户的请求。 WEB服务器每次在处理请求时,都会按照用户所访问的资源所对应的处理代码,从头到尾执行一遍,然后输出响应...

结对项目:一寸时光APP(日程管理)二

建立数据库 package com.example.myapplication3.db; import android.content.ContentValues;import android.content.Context;import android.database.Cursor;import android.database.sqlite.SQLi...

C# excel 常用操作

1、excel文件读取 1.  com组件操作excel 读写 2.  ado.net方式操作excel 读写 3.  开源的第三方组件npoi 4. open xml 方式读写excel 方式一使用OleDbConnection System.Data.DataTable dt =GetExcelDatatable("C:\Users\Administr...

Shiro快速入门

写在前面:   最近项目中使用了Shiro,虽然不是自己在负责这一块,但还是抽空学习了下,也可以让自己对shiro有基本的了解。毕竟Shiro安全框架在项目中还是挺常用的。   对于Apache Shiro的基本概念就不在这里一一描述了,资料网上都有,主要还是记录下代码相关的,能够先让自己快速学会使用。   这里的demo(可以测试登录认证,登出,以及授权...

[转]JAVA URL请求

使用Java发送GET、POST请求 ——节选自《疯狂Java讲义》    URL的openConnection()方法将返回一个URLConnection对象,该对象表示应用程序和 URL 之间的通信链接。程序可以通过URLConnection实例向该URL发送请求、读取URL引用的资源。 通常创建一个和 URL 的连接,并发送请求、读取此 URL 引用...