RabbitMQ双活实践(转)

摘要:
一些队列消息需要在中心之间共享。Publisher通过发送确认来启动确认模式。select命令。消息消耗可靠性说明了如何确保队列中的消息至少消耗一次。接收RabbitMQ服务器发送的ACK消息,确认消息已到达;数据包丢失可能意味着中断的TCP连接需要很长时间才能被操作系统检测到。为消息实施延迟重试机制(重试队列)方案2:

有货RabbitMQ双活实践

 
RabbitMQ双活实践(转)第1张

消息服务中间件在日常工作中用途很多,如业务之间的解耦,其中 RabbitMQ 是比较容易上手且企业使用比较广泛的一种,本文主要介绍有货在使用 RabbitMQ 的一些实践与尝试。

有货的 RabbitMQ 部署架构采用双中心模式,在两套数据中心中各部署一套 RabbitMQ 集群,各中心的 RabbitMQ 服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分队列消息共享。

RabbitMQ双活实践(转)第2张

消息传递的可靠性

场景 1:生产者与消费者互不感知,怎么确认生产者已将消息投递到 RabbitMQ 服务端,又如何确认消费者已经消费了该消息?

 消息 Publish 可靠性

首先来谈谈 Publisher 的可靠发送,如果使用标准 AMQP 0-9-1,保证消息不丢失的唯一方法是使用事务,但使用事务模式会导致服务端吞吐量急剧下降。为了弥补这一点,AMQP 引入了确认机制。它模仿了协议中已经存在的消费者 ACK 确认机制。Publisher 通过发送 confirm.select 命令开启确认模式,随后 RabbitMQ 服务端在收到消息后会进行确认,Publisher 会收到服务端发送的确认回复。要注意:无法在通道中同时使用确认模式与事务模式,只可二选一。

 消息 Consume 可靠性

再说说如何保证队列中消息至少被消费一次。当 RabbitMQ 交付消息给 Consumer 时,需要确认 Message 已被投递到 Consumer。Acknowledgements 作用,Consumer 发送确认消息通知 RabbitMQ 服务端已收到消息或已成功消费消息。看下消息生产、消费的流程图:

RabbitMQ双活实践(转)第3张

在 1 号的位置需要开启 Channel 的 Confirm 模式,接收 RabbitMQ 服务端发送的确认消息已到达的 Ack 信息;在 3 号的位置,消费者在成功消费或者业务处理失败后,需要告诉 RabbitMQ 服务端,消息已被消费成功或者失败;当然在某些网络故障中,数据包丢失可能意味着中断的 TCP 连接需要较长时间才能够被操作系统检测到。通过心跳功能,确保应用程序层及时发现连接中断。

在我们的部署架构中,ELB 与 RabbitMQ 之间就是通过此机制来判断服务是否存活,通知消息生产者服务端已挂,异步等待 Confirm 的消息直接进入 Unconfirm 的处理环节。另外为了避免在代理中丢失消息,AMQP 标准具有交换、队列和持久消息的耐久性概念,要求持久对象或持久消息将在重新启动后生存,这些特性同样也是可靠性的基础。

实现消息的延迟重试机制(重试队列)

场景 2:在某些情况下,业务系统在处理消息时可能会失败,此时需要做的是重试,而不是直接丢弃;当然重试也不能直接重试,一旦有任务长时间失败,会导致后面的消息无法被正常处理,此时可以借助死信机制(消息在队列中存活时间超出队列 ttl 的设定)转发投递到特定的重试队列后,随后再尝试重新处理该消息。

下面介绍具体操作:

首先创建两个队列,工作队列命名为“yoho_test_retry”,重试队列命名为“yoho_test_retry.retry“。

再看下工作队列的参数配置:

  • x-dead-letter-exchange:死信转发的 Exchange

  • x-dead-letter-routing-key:死信转发时的 Routing-key

  • yoho_test_retry 绑定到名为“amp.topic”的 topic 类型 Exchange,接收 Routing-key 为“yoho_test_retry”的消息

RabbitMQ双活实践(转)第4张

再看下重试队列的参数配置:

  • 死信转发到“amp.topic”的 Exchange

  • Routing-key 为“yoho_test_retry”(即工作队列 yoho_test_retry 接收该主题消息)

  • x-message-ttl:message 在重试队列中存活的时间,也就是延迟多久重试。该队列绑定到“amp.topic”的 Exchange,接收 Routing-key 为“retry.yoho_test_retry”的消息(即接收工作队列的死信),这样就可以实现消息重试队列的机制了

RabbitMQ双活实践(转)第5张

当然还有别的方式,如通过声明 Retry 的 Exchange 来中转到 Retry 队列中,不需要指定 x-dead-letter-routing-key,再指定 Retry 队列的 dead-letter-exchange 为“amp.topic”即可,这种方式不需要每个队列都生成一个 Retry 队列,大家可以自己动手尝试下。

实现消息的延时消费(延时队列)

场景 3:如何实现消息的延时消费也是一种常见的需求,可以让某些任务延时执行,其实同样也可以借助死信机制来实现。

队列 A 用于接收暂存 Producer 的消息,队列 B 用于 Consumer 的消费,在队列 A 中指定消息的 ttl 即生命周期时长,同时指定其死信交换机 DLXs,一旦消息在队列中存活时长超过 ttl 的设定值,那么消息会被转发到 DLXs,将队列 B 绑定到 DLXs,即可接收到队列 A 的死信。

具体操作流程,与场景 2 一样,首先创建两个队列:工作队列名为“yoho_test_delay” ,延迟队列名为“yoho_test_delay.delay“。

再看下工作队列的配置参数:

  • 从“amp.topic”的 Exchange 中接收 Routing-key 为“delay.yoho_test_delay”的消息。

RabbitMQ双活实践(转)第6张

延迟队列“yoho_test_delay.delay”的配置:

  • x-dead-letter-exchange 死信转到交换机“amp.topic”

  • 死信消息的 Routing-key 为“delay.yoho_test_delay”(即工作队列接收消息的 Routing-key)

  • 消息在延迟队列中存活时间 ttl

  • 该队列绑定到“amp.topic”交换机,接收 Routing-key 为“yoho_test_delay”的消息(即生产者发送消息指定的 topic)。如此一来延迟队列接收消息后,等待 ttl 时长后将消息转发到工作队列中,即可实现延迟队列机制

RabbitMQ双活实践(转)第7张

同样还有别的方法,大家可以灵活实现。

实现跨数据中心的消息共享

场景 4:有时跨中心业务需要共享消息,如缓存清理等,在业务代码中分别向多个中心的 RabbitMQ 发布消费消息显然不是一种比较好的解决方案,那还有什么好的方法呢?

RabbitMQ 为此提供了 Federation 插件来很好地解决此类问题,有货跨中心部署 Federation 架构图:

RabbitMQ双活实践(转)第8张

Federation 插件是一个不需要构建 Cluster,而在 Brokers 之间传输消息的高性能插件,Federation 插件可以在 Brokers 或者 Cluster 之间传输消息,连接的双方可以使用不同的 users 和 virtual hosts,双方也可以使用版本不同的 RabbitMQ 和 Erlang。Federation 插件使用 AMQP 协议通讯,可以接受不连续的传输。

Federation Exchanges,可以看成 Downstream 从 Upstream 主动拉取消息,但并不是拉取所有消息,必须是在 Downstream 上已经明确定义 Bindings 关系的 Exchange,也就是有实际的物理 Queue 来接收消息,才会从 Upstream 拉取消息到 Downstream。使用 AMQP 协议实施代理间通信,Downstream 会将绑定关系组合在一起,绑定 / 解除绑定命令将发送到 Upstream 交换机。因此,Federation Exchange 只接收具有订阅的消息,本处贴出官方图来说明;

RabbitMQ双活实践(转)第9张

但是注意,由于绑定是异步发送的 Upstream 的,所以添加或删除绑定的效果并不立即生效,消息被缓冲在 Upstream 交换机的所在 Broker 创建的队列中,这被称为 Upstream 队列。任何 Upstream Exchange 接收到的消息都可能被 Downstream 中 Federation Exchange 接收到,但直接发送给 Federation Exchange 的消息是不能被 Upstream 中所绑定的 Exchange 接收到的。

下面动手创建名为“fed_test”的 Federation Exchange,配置 Federation 策略“fed_ex”,Federation-upstream-set 可以简单的配置为“all”,表示与所有的 Upstream 都建立点对点的 Federation 连接。

RabbitMQ双活实践(转)第10张

RabbitMQ双活实践(转)第11张

此时在 Downstream 上可以看到建立了一个 Running-Links 连接到 Upstream,该 Exchange 就可以收到 Upstream 中同名 Exchange 收到的所有消息(前提是 Downstream 中有物理队列接收)。

RabbitMQ双活实践(转)第12张

大家应该都知道 RabbitMQ 中单 Queue 能够对外提供的服务能力有局限性,如何通过 Federation 来满足跨中心同时高并发的场景呢,此时就需要自己编写插件了,结合后面会介绍的 Sharding 分片机制,创建多个 Federation 缓冲队列分摊压力,本人的想法仅供参考。

实现消息队列的高可用(HA 容灾)

场景 5:需要保证消息队列高可用的场景有很多,比如核心业务的订单服务、erp 服务。

默认情况下,RabbitMQ 群集中的队列位于单个节点上(首次被声明的节点上),而 Exchanges 和 Bindings 可以认为在所有节点上存在,但也可以将 Queue 在 Cluster 节点之间配置为镜像队列。每个镜像队列由一个 Master 和一个或多个 Slave 组成,如果 Master 因为某些原因失效,则将从 Slave 中选择一个提升为 Master。

发布到队列的消息将复制到所有镜像,消费者连接到主机,无论它们连接到哪个节点,镜像会丢弃已在主设备上确认的消息,队列镜像因此增强了可用性,但不跨节点分配负载。

RabbitMQ双活实践(转)第13张

如上图创建名为“ha_test_queue”的队列,同时为该队列配置了策略 Policy,Ha-mode 简单配置为 all,当然可以使用 Ha-node 参数选择节点制作镜像。

RabbitMQ双活实践(转)第14张

此时队列已被配置为镜像,master 节点位于 server5,slave 节点位于 server6,此时,随意关闭任意一台 RabbitMQ 节点,该队列都可以正常对外提供服务。

RabbitMQ双活实践(转)第15张

当然在高可用的场景下,队列的性能会受到一定的影响,此时可以借助后面提到的 Sharding 机制(根据场景选择 x-modulus-hash 还是 consistent-hash ),解决单队列的性能瓶颈,在高可用、高并发下寻求一个动态的平衡。

RabbitMQ 分片机制

在解决 RabbitMQ 单 Queue 性能问题时可以用到 RabbitMQ Sharding 插件,该插件可以提供消息的自动分片能力:自动创建分片队列,同时交换机将在队列中分区或分片消息。

RabbitMQ双活实践(转)第16张

在某些情况下,你可能希望发送到交换机的消息一致并且均匀地分布在多个不同的队列。在上面的插件中如果队列数量发生变化,则不能确保新的拓扑结构仍然在不同队列之间均匀分配消息,此时就可以借助 Consistent-sharding 类型 Exchange,与 Sharding 插件的主要区别是,该类 Exchange 不能自动创建分片队列,需要手动创建并配置 Binding 关系,且支持一致性 hash。

RabbitMQ 高并发实践

衡量消息服务的性能最重要的指标之一就是吞吐量,那 RabbitMQ 的高并发到底可以到多少呢?首先使用了 32 台 8 核 30G 内存的虚拟机,构建了相对来说比较庞大的 RabbitMQ 集群,各虚拟机的作用分配如下:

  • 30 RabbitMQ RAM 节点(正常 RAM 节点,RabbitMQ 元数据和定义仅保存在 RAM 中);

  • 1 RabbitMQ Disc 节点(元数据持久化节点,其中 RabbitMQ 代理元数据和定义也保留在光盘上);

  • 1 RabbitMQ Stats 节点(统计信息节点,运行 RabbitMQ 管理插件,不带任何队列)。

测试环境架构结构图,大致如下:

RabbitMQ双活实践(转)第17张

在 RabbitMQ 群集节点的前面,挂载负载均衡器,负载均衡器配置中包含了除统计信息节点以外的所有节点。来自连接的 AMQP 客户端的请求在目标池中的节点之间进行了平衡。从目标池排除统计信息节点有助于确保消息队列和传送工作不会最终与管理节点发生资源竞争。在较低吞吐量的情况下,用户可以选择将统计信息节点包含在负载平衡器后台服务池中。实验结果如下:

RabbitMQ双活实践(转)第18张

在这种高负载的生产(1345531 msgs/pers)消费(1413840 msgs/pers)压力下,RabbitMQ 仅有 2343 条消息暂时在其等待发送的队列中累积,在这样的负载下,RabbitMQ 节点也没有显示内存压力,或者需要基于资源限制启动流控机制。我们在 AWS 上搭建了同等规模与配置的环境,验证了上述 Google 提供的测试方案及结果后又做了一些别的尝试,如使用 RabbitMQ Sharding 插件、Consistent-hash Sharding Exchange 来更加灵活地动态均衡队列压力,可以更从容地达到百万并发的性能。

高可靠与高可用从来都是性能杀手,那 RabbitMQ 的表现如何,实际生产应用中应该中如何做权衡?最后通过一组数据来说明,RabbitMQ 环境配置:

  • 单虚拟机:8C8G ;

  • 单位消息大小:1KB;

  • 压测工具:rabbitmq-perf-tes(官方提供)。

RabbitMQ双活实践(转)第19张

上图中,我们在单节点 RabbitMQ 上对消息持久化、Consume-Ack、Publish-Confirm 三个特性做了压测,消息持久化对性能影响最大,Consum-Ack 其次,Publish-Confirm 最小。

RabbitMQ双活实践(转)第20张

Prefetch,可以理解为 Consumer 一次最多获取多少消息进行消费,可以看到,当 Prefetch 为 10 时,性能最差,当 Prefetch 放大到一定阈值如 10000,其对性能的影响也就微乎其微了。

RabbitMQ双活实践(转)第21张

再考虑下,多消费者多生产者对性能的影响,如上图适当增加消费者,保持队列的空闲,可以增加吞吐量,但当到达某一瓶颈时,效果不太明显了。

RabbitMQ双活实践(转)第22张

在集群场景下,Ack 对性能影响明显。

RabbitMQ双活实践(转)第23张

上图为镜像场景的压测结果,对比普通集群,镜像对性能的影响很明显,消息持久化也拉低了集群的性能,适当增加 Prefetch 可以提高集群性能。

性能与高可靠、高可用,鱼和熊掌不可兼得,所以想提升 RabbitMQ 集群或单节点服务的性能,可以牺牲可靠性(根据场景来),在消费能力范围内,尽量提高 Prefetch 的数量;其次就是简单粗暴型(加机器加配置,队列实际存储节点性能未榨干,建议队列均衡分配到各节点)。

免责声明:文章转载自《RabbitMQ双活实践(转)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇C语言实现字符串IP与整数型IP的相互转换mxgraph 画布下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

RabbitMQ---6、客户端 API 的简介

1、主要的命名空间,接口和类  定义核心的API的接口和类被定义在RabbitMQ.Client这个命名空间下面:  所以要想使用RabbitMQ的功能,需要以下代码     using RabbitMQ.Client;   【1】、核心API的接口和类如下:    IModel:表示一个符合AMQP 0-9-1 协议的通道,并且提供了很多的操作方法   ...

Android Studio 插件简单介绍

现在Android的开发者基本上都使用Android Studio进行开发(如果你还在使用eclipse那也行,毕竟你乐意怎么样都行)。使用好Android Studio插件能大量的减少我们的工作量。 1.GsonFormat快速将json字符串转换成一个Java Bean,免去我们根据json字符串手写对应Java Bean的过程。 使用方法:快捷键A...

xxxx(四):接受消息hook地址分析

   今天来分析一下xxxx接受消息的call;测试的账号在虚拟机,发消息的账号在物理机;    1、老规矩:逆向分析的起点都从CE开始;给测试账号发送辨识度高的消息,这时暂时不要在测试账号打开消息。因为此时的消息刚经过网络传输,还是ASCII格式,所以千万不要勾选UTF-16,只能找ASCII码找(下面详细解释原因)!  重复发送0000000000、...

简单聊聊,如何设计站内信?

一、什么是站内信? 站内信,是为方便会员商务信件往来而设的服务功能,类似于邮箱。 “站内信”有两个基本功能: 点到点的消息传送。用户给用户发送站内信,管理员给用户发送站内信。 点到面的消息传送。管理员给用户(指定满足某一条件的用户群)群发消息。 二、为什么要做站内信? 从站内信接触对象(用户和企业)和传达形式来看(点对点,或点对面),有以下几种价值:...

Gojs学习史(一):基本定义

1. gojs定义 初始化时,先简化gojs本身的方法: var Go = go.GraphObject.make; //简化方法 1.1 画布定义 在声明了Go方法之后,接下来就是定义画布: myDiagram = Go(go.Diagram,"myDiagramDiv",{ initialContentAlignment:go.Spot.Cen...

docker-compose 搭建 kafka 集群

环境准备 kafka依赖zookeeper,所以搭建kafka需要先配置zookeeper。网格信息如下: zookeeper 192.168.56.101:2181 kafka1 192.168.56.101:9092 kafka2 192.168.56.101:9093 kafka3 192.168.56.101:9094 开始搭建...