基于Redis实现延时队列服务

摘要:
背景在业务发展过程中,会有一些场景需要延迟,例如:a.用户在下单后超过30分钟未付款,需要取消订单b.对订单的一些评论。如果用户在48小时内没有对商家发表评论,系统将自动生成默认评论c。下单后,订单在一定时间段后不会发送,超时后需要取消订单……处理此类需求的直接简单方法是定期任务循环扫描表。因此,延迟队列用于处理此类需求。几个延迟队列延迟队列是具有延迟功能的消息队列。
背景

在业务发展过程中,会出现一些需要延时处理的场景,比如:

a.订单下单之后超过30分钟用户未支付,需要取消订单
b.订单一些评论,如果48h用户未对商家评论,系统会自动产生一条默认评论
c.点我达订单下单后,超过一定时间订单未派出,需要超时取消订单等。。。
处理这类需求,比较直接简单的方式就是定时任务轮训扫表。这种处理方式在数据量不大的场景下是完全没问题,但是当数据量大的时候高频的轮训数据库就会比较的耗资源,导致数据库的慢查或者查询超时。所以在处理这类需求时候,采用了延时队列来完成。


几种延时队列

延时队列就是一种带有延迟功能的消息队列。下面会介绍几种目前已有的延时队列:
1.Java中java.util.concurrent.DelayQueue
优点:JDK自身实现,使用方便,量小适用
缺点:队列消息处于jvm内存,不支持分布式运行和消息持久化
2.Rocketmq延时队列
优点:消息持久化,分布式
缺点:不支持任意时间精度,只支持特定level的延时消息
3.Rabbitmq延时队列(TTL+DLX实现)
优点:消息持久化,分布式
缺点:延时相同的消息必须扔在同一个队列

根据自身业务和公司情况,如果实现一个自己的延时队列服务需要考虑一下几点:

* 消息存储
* 过期延时消息实时获取
* 高可用性

 基于Redis实现

1.0版本

功能特性

* 消息可靠性,消息持久化,消息至少被消费一次
* 实时性:存在一定的时间误差(定时任务间隔)
* 支持指定消息remove
* 高可用性

整体结构

基于Redis实现延时队列服务第1张

- Messages Pool所有的延时消息存放,结构为KV结构,key为消息ID,value为一个具体的message(这里选择Redis Hash结构主要是因为hash结构能存储较大的数据量,数据较多时候会进行渐进式rehash扩容,并且对于HSET和HGET命令来说时间复杂度都是O(1))
- Delayed Queue是16个有序队列(队列支持水平扩展),结构为ZSET,value为messages pool中消息ID,score为过期时间(分为多个队列是为了提高扫描的速度)
- Timed Task定时任务,负责扫描处理每个队列过期消息

 消息结构

每个延时消息必须包括以下参数:

* tags:消息过期之后发送mq的tags
* keys:消息过期之后发送mq的keys
* body:消息过期之后发送mq的body,提供给消费这做具体的消息处理
* delayTime:延时发送时间(默认,delayTime、expectDate有一个即可)
* expectDate:期望发送时间

流程

基于Redis实现延时队列服务第2张
注:上图1、2、3或者2、3是一个事务操作
取出过期消息过程是通过一个外部定时任务每隔1min分钟去查询队列中过期的消息,然后发送mq && remove

2.0版本

1.0上有一个可改进的地方就是队列中过期的消息是通过定时任务触发查询。所有有了2.0
2.0版本在1.0上做了一个优化,废弃掉了1min定时任务触发过期消息发送,采用了java Lock await/singlal方式实现过期消息的实时发送低延时

基于Redis实现延时队列服务第3张

多节点部署结构:

基于Redis实现延时队列服务第4张

- pull job:这里分别为每一个队列创建了一个pull job thread,功能很简单,就是负责去队列中拉取过期的消息数据(这里保证一个队列有且只有一个pull job)
- worker:pull job拉取到的过期消息会交给一个worker thread去处理,这样的好处是处理过期的消息实时性更高(pull job不必等去除过期消息全部处理完成在继续去拉取新的过期数据)
- zookeeper coordinate:通过zk的操作来完成对队列的重新分配工作,daemon thread监听zk节点的创建和删除

主要流程:


服务启动会注册zk,获取分配处理的queues,启动后台线程监听zk 
为每个分配queue创建一个pull job 
pull job首先会去queue中查询是否有过期消息: 
Y:将取出消息交给worker处理
N:查询queue中最后一个成员(zset结构默认按score递增排序),如果为空,则await;不为空则await(成员score-System.currentTimeMillis())

由于过期消息发送成功才会从队列中remove,所以pull job会记录上一次查询队列的一个offset,每次获取到过期消息会将offset向前偏移,过期消息交给worker处理,当worker由于某些异常原因处理失败会重置pull job中offset,这样可以避免消息发送一次失败之后没办法在继续处理(除了新节点add || remove时候)
当部署服务有新增,延时队列服务会重新计算得到当前处理队列,并将之前创建pull job cancel,为新处理队列重新创建pull job。删除同理。
</ol>

 

 
 

免责声明:文章转载自《基于Redis实现延时队列服务》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇【华为云技术分享】云小课 | WAF反爬虫“三板斧”:轻松应对网站恶意爬虫Module.modules()和Module.children()下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

2019 完美世界java面试笔试题 (含面试题解析)

本人3年开发经验、18年年底开始跑路找工作,在互联网寒冬下成功拿到阿里巴巴、今日头条、完美世界等公司offer,岗位是Java后端开发,最终选择去了完美世界。 面试了很多家公司,感觉大部分公司考察的点都差不多,所以将自己的心得记下来,希望能给正在找或者准备找工作的朋友提供一点帮助。另外,目前在完美世界也做面试官的工作,身份从求职者变为面试官,看问题的很多角...

Redis-缓存有效期与淘汰策略

Redis-缓存有效期与淘汰策略 有效期 节省空间 做到数据弱一致性,有效期失效后,可以保证数据的一致性 过期策略 Redis过期策略通常有三种: 1.定时过期: 每个设置过期时间的Key,系统还要生成一个定时器来监听时间并进行清除,但是有一个致命的问题,生成这么多定时器,并且监听非常消耗CPU资源,如果高并发时,同时过期的数据很大时,反而会爆...

看大数据时代下的IT架构(1)业界消息队列对比

一、MQ(Message Queue) 即 消息队列,一般用于应用系统解耦、消息异步分发,能够提高系统吞吐量。MQ的产品有很多,有开源的,也有闭源,比如ZeroMQ、RabbitMQ、 ActiveMQ、Kafka/Jafka、Kestrel、Beanstalkd、HornetQ、Apache Qpid、Sparrow、Starling、Amazon SQ...

C#操作redis

Redis 是一个非关系型高性能的key-value数据库。在部分场合可以对关系数据库起到很好的补充作用。它提供了Java,C/C++,C#,PHP,JavaScript,Perl,Object-C,Python,Ruby,Erlang等客户端,使用很方便。 redis提供五种数据类型:string,hash,list,set及zset(sorted se...

redis ---有用

浅谈redis (1)什么是redis? Redis是一个基于内存的高性能key-value数据库。(有空再补充,有理解错误或不足欢迎指正) (2)Reids的特点 redis本质上是一个Key-Value类型的内存数据库,很像memcached,整个数据库统统加载在内存当中进行操作,定期通过异步操作把数据库数据flush到硬盘上进行保存。因为是纯内存操作...

使用redis zset实现抽奖,奖池商品按时间随机分布

话不多说,直接上需求描述: 最近需要上一期活动,这个活动是以转盘抽奖为形式的抽奖活动,要求每个用户用积分进行抽奖,且中奖率为100%即不可出现不中任何奖品的情况,之后,又加了一个要求,即不能实行纯随机的抽取,如果如此会产生一个极端情况,如果开始的时候活动极其火爆由于随机的不可控性头一天用户便将所有优质奖品全部抽走,那么后来的用户将只会抽到保底奖品。 那么奖...