rabbitmq进阶

摘要:
目录消息传递过期时间(TTL)死信队列延迟队列优先级队列RPC实现持久性生产者确认使用者密钥消息传输保证消息传递强制=true。如果交换机无法根据自己的类型和路由密钥找到合格的队列,RabbitMQ将调用Basic。Return命令将消息返回给生产者,生产者调用频道。addReturnListener以添加侦听器以接收返回的结果强制=fa

目录

消息传递

mandatory

mandatory=true,如果交换器无法根据自身的类型和路由键找到一个符合条件的队列,RabbitMQ会调用 Basic.Return 命令将消息返回给生产者,生产者通过调用 channel.addReturnListener 添加监听器接收返回结果
mandatory=false,上述情形下,RabbitMQ 将消息直接丢弃

immediate

immediate=true,如果交换器在消息路由到队列时发现队列上并不存在任何消费者,该消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return 返回生产者
和mandatory相比,mandatory如果路由不到队列则返回消息,immediate如果队列中没有消费者则返回消息

备份交换器(Alternate Exchange)

AE可以将未被路由的消息存储到 RabbitMQ 中。简化了mandatory+addReturnListener 的编程逻辑。

Map<String,Object> args = new HashMap<String,Object>();
args.put("alternate-exchange","myAe");

// 声明普通交换器(AE交换器作为备份交换器)
channel.exchangeDeclare("normalExchange","direct",true,false,args);
// 声明AE交换器
channel.exchangeDeclare("myAe","fanout",true,false,null);

// 普通队列 绑定 普通交换器
channel.queueBind("normalQueue","normalExchange","normalKey");

// 声明 未路由队列
channel.queueDeclare("unroutedQueue",true,false,false,null);
// 未路由队列 绑定 AE交换器
channel.queueBind("unroutedQueue","myAe","");

特殊情况

  • 若备份交换器不存在,客户端和 RabbitMQ 服务端都不会有异常出现,消息丢失
  • 若备份交换器没有绑定任何队列,客户端和 RabbitMQ 服务端都不会有异常出现,消息丢失
  • 若备份交换器没有匹配任何队列,客户端和 RabbitMQ 服务端都不会有异常出现,消息丢失
  • 若备份交换器和mandatory参数一起使用,该参数无效

过期时间(TTL)

通过队列属性设置消息TTL

Map<String,Object> args = new HashMap<String,Object>();
args.put("x-message-ttl",6000); // 单位毫秒
channel.queueDeclare(queueName,durable,exclusive,autoDelete,args);
  • 不设置TTL:该消息不会过期
  • TTL为0:若直接可以投递到消费者,否则立刻被丢弃

消息过期:一旦过期,从队列中抹去。因为消息在队列头部,RabbitMQ只需要定期从头部开始扫描是否有过期消息即可。

设置每条消息TTL

channel.basicPublish 方法中加入 expiration 参数,单位毫秒

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
builder.expiration("60000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName,routingKey,mandatory,properties,"ttlTestMessage".getBytes());

消息过期:消息过期后,不会马上从队列中抹去,在即将投递到消费者之前判定。每条消息过期时间不同,删除所有过期消息势必要扫描整个队列,因此不如等到消息需要消费时再判定是否过期,若过期则删除。

设置队列的TTL

通过 channel.queueDeclare 方法中的 x-expires 参数可以控制队列被自动删除前处于未使用状态的时间

RabbitMQ 会确保在过期时间到达后将队列删除,在 RabbitMQ 重启后,过期时间会重置

死信队列

当消息在一个队列中变成死信,会被重新发送到死信交换器(Dead-Letter-Exchange, DLX),绑定DLX的队列称为死信队列

死信原因:1.消息被拒绝; 2.消息过期; 3. 队列达到最大长度

绑定死信队列:在 channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数为此队列添加 DLX

延迟队列

消息当被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,才能拿到消费

场景:

订单超时支付,延时队列做异常处理;

智能设备在指定时间进行工作,延时队列做指令推送;

用法:

  • 每条消息设置为10秒过期时间
  • 通过 exchange.normal 交换器把发送的消息存储到 queue.normal 队列中
  • 消费者订阅 queue.dlx 队列
  • 10秒后,消息过期转存到 queue.dlx ,消费者消费到了延迟10秒的这条消息

优先级队列

具有高优先级的队列有高的优先权,优先级高的消息优先被消费

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange_priority","rk_priority",properties,("message").getBytes());
  • 默认优先级为0,最高为队列设置的最大优先级
  • 如果Broker中有消息堆积,优先级高的消息可以被优先消费

RPC实现

远程过程调用(Remote Procedure Call),通过网络从远程计算上请求服务。应用部署在A服务器上,想要调用B服务器上提供的函数或者方法,需要通过网络表达调用的语义和传达调用的数据。

RPC的协议包括:Java RMI、WebService的RPC、THrift、RestfulAPI等。

String callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish("","rpc_queue",props,message.getBytes());

RPC 处理流程:

  • 客户端启动时,创建一个匿名的回调队列
  • 客户端为 RPC 请求设置2个属性:replyTo 告知 RPC 服务端回复请求时的目的队列;correlationId 标记一个请求
  • 请求被发送到 rpc_queue 队列中
  • RPC 服务端监听 rpc_queue 队列中的请求,当请求到来时,服务端会处理并且把带有结果的消息发送给客户端。接收队列是 replyTo 设定的回调队列'';
  • 客户端监听回调队列,有消息时,检查 correlationId 属性,如果与请求匹配,就是返回结果

持久化

持久化可以提高 RabbitMQ 的可靠性,防止在异常情况(重启、关闭、宕机)下数据丢失

持久化的各种情况

RabbitMQ 持久化分为3个部分:交换器的持久化、队列的持久化、消息的持久化

  • 若交换器不设置持久化,服务重启后,交换器元数据丢失,但消息存在,不能将消息发送到该交换器中。长期使用的交换器,建议持久化。
  • 若队列不设置持久化,服务重启后,队列元数据丢失,消息也会丢失;若队列设置持久化,消息不设置,服务重启后,队列元数据存在,但消息丢失
  • 若所有消息设置持久化,会严重RabbitMQ性能,需要在吞吐量和可靠性之间做权衡
  • 生产环境会设置镜像队列保证系统的高可用性

生产者确认

默认情况下,生产者不知道消息有没有正确到达服务器。因此引入事务和发送方确认机制。

事务机制

事务方法:

  • channel.txSelect: 用于将当前信道设置成事务模式
  • channel.txCommit:用于提交事务
  • channel.txRollback:用于事务回滚

事务流程:

  • 客户端发送 Tx.Select,将信道置为事务模式
  • Broker回复 Tx.Select-Ok,确认已将信道置为事务模式
  • Basic.Publish发完消息后,客户端发送 Tx.Commit 提交事务
  • Broker 回复 Tx.Commit,确认事务提交

事务问题:事务机制会耗尽 RabbitMQ 的性能

发送方确认机制

  1. 生产者将信道设置成 confirm 模式
  2. 信道进入 confirm 模式后,所有在信道上发布的消息都会被指派一个唯一ID
  3. 消息被投递到所有匹配的队列后,RabbitMQ 发送一个确认给生产者

发送方确认机制好处:相比于事务,它是异步非阻塞的。可以在等待信道返回确认的同时,继续发送下一条消息,当消息得到确认后,生产者可以通过回调方法处理确认消息。

发送方确认机制的优势在于不一定需要同步确认:

  • 批量确认:每发送一批消息后,调用channel.waitForCOnfirms方法,等待服务器的确认返回
  • 异步confirm方法:提供一个回调方法,服务器确认一条或者多条消息后客户端会回调这个方法进行处理。

注意:批量确认提升了confirm效率,但是返回Basic.Nack或者超时,客户端需要将这一个批次的消息全部重发,会带来明显的重复消息数量。消息经常丢失时,批量confirm性能应该不升反降。

消费端要点

消息分发

  • 当 RabbitMQ 队列有多个消费者,消息会以轮询方式分发给消费者
  • 但是这样会造成因为各机器性能不同而引起负载不均
  • 消费端通过调用 channel.basicQos 方法,设置允许限制信道上的消费者保持最大未确认消息数量
  • 一旦达到未确认消息数量上限,则停止向这个消费者发送消息,实现了“滑动窗口”效果
  • Basic.Qos的使用对于拉模式消费方式无效

消息顺序性

顺序性指的是消费者消费的消息和发布者发布的消息的顺序是一致的

顺序性打破的情况:

  • 生产者使用了事务机制,事务回滚后,补发信息可能在其它线程实现
  • 启用 publiser confirm时,发生发生超时、中断,导致错序
  • 生产者设置了延迟队列,但是超时时间设置的不一样
  • 消息设置了优先级,消费端收到的消息必然不是顺序性的

弃用 QueueingConsumer

  • 队列中有大量消息,可能导致内存溢出或假死,可以使用 Basic.Qos 方法得到有效解决
  • QueueingConsumer会拖累一个 Connection 下的所有信道,使性能降低
  • 同步调用 QueueingConsumer 会产生死锁

消息传输保障

消息传输保障等级

At most once:最多一次。消息可能丢失,但绝不会重复传输
At least once:最少一次。消息绝不会丢失,但可能重复传输
Exactly once:恰好一次。每条消息肯定会,有且传输一次
最少一次:需要考虑 事务、mandatory、持久化处理、autoAck
最多一次:无须考虑以上问题,随便发送与接收
恰好一次:RabbitMQ 目前无法保障。比如消费完Ack闪断,或者生产者发送消息到RabbitMQ,返回确认消息时网络闪断。

去重一般是通过业务客户端引入GUID实现

免责声明:文章转载自《rabbitmq进阶》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇C#获取URL参数值JavaScript插件——弹出框下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

RocketMQ消息至少一次(At least Once)投递和消费

至少一次(At least Once)指每个消息必须投递一次。Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。 生产者 在同步非顺序投递的时候,每次都是轮询到不同的队列: Message message = new Message("topic...

手把手教你把苹果手机微信聊天内容存为长图的2种方法

第一种:把聊天内容全部截图,然后拼凑成一张长图 ①:打开微信,点击右下角的我 ②:点击收藏 ③:点击右上角的加号 ④:点击左下角的打开本地相册图标 ⑤:选择要拼成长图的聊天截图(这里我随便截了3张图,没有截聊天记录) ⑥:点击右上角的图标后,在弹出的界面选择保存为图片即可 第二种:选择多条聊天消息后合并,然后转为文字 ①:任意打开一个微信好友...

Kafka网络模型

摘要:很多人喜欢把RocketMQ与Kafka做对比,其实这两款消息队列的网络通信层还是比较相似的,本文就为大家简要地介绍下Kafka的NIO网络通信模型,通过对Kafka源码的分析来简述其Reactor的多线程网络通信模型和总体框架结构,同时简要介绍Kafka网络通信层的设计与具体实现。 一、Kafka网络通信模型的整体框架概述 Kafka的网络通信模型...

高仿微信新消息提示音功能

近期公司在做一个项目。有一个切换消息提示音的功能,能够切换本应用收到消息的提示音,而不影响系统提示音。我就依照微信的那个样式进行了编程,终于得到想要的效果。 转载请注明出处。谢谢:http://blog.csdn.net/harryweasley/article/details/46408037 怕有些人不知道怎么进入微信的新消息提示音功能,我这里说...

看似复杂炫酷的数据可视化大屏,学会这个工具轻松搞定

“今朝有酒今朝醉,报表不做不能睡,借问酒家何处有,报表还得编一宿”,这句带有些许幽默感的打油诗背后,却是我从业多年的心酸历程,没错,我就是你们口中做报表的哥哥——表哥。 前些日子在和别人交流的过程中发现,现在市场变化太快,不仅用Excel做报表已经落后了,就连最后的数据都要以美观、直接、酷炫的方式展现出来,我这个什么都不懂又不想学而且只会用Excel的老油...

ios--进程/多线程/同步任务/异步任务/串行队列/并行队列(对比分析)

现在先说两个基本的概念,啥是进程,啥是线程,啥又是多线程;先把这两个总是给弄清再讲下面的 进程:正在进行的程序,我们就叫它进程. 线程:线程就是进程中的一个独立的执行路径.这句话怎么理解呢! 一个程序它是按顺序从上往下执行的, 这个执行顺序我们可以把它看成是一条线,把这条线就叫做线程(个人理解,错了勿喷);每一个程序中至少包含一条线程, 这条线程,我们叫它...