使用redis作为消息队列的用法

摘要:
最后只好退而求其次,想到了使用redis的队列来做规则的更新消息队列首先做简单的引入。进行插入操作的端称为队尾,进行删除操作的端称为队头。消息队列一般是采用一个独立的集群专门用于消息存储,可以存储在内存里也可以直接存储在磁盘中。redis实现消息队列redis有一个数据类型叫list(列表),它的每个子元素都是string类型的双向链表。Redis中的消息可以提供两种不同的功能。

背景

最近项目有个需求需要动态更新规则,当时脑中想到的第一个方案是利用zk的监听机制,管理人员更新完规则将状态写入zk,集群中的机器监听zk的状态,当有状态变更后,集群中的机器开始拉取最新的配置。但由于公司技术选型,没有专门搭建zk集群,因此也不可能为这一个小需求去搭建zk集群。图为使用zk监听状态变化的流程。

使用redis作为消息队列的用法第1张

最后只好退而求其次,想到了使用redis的队列来做规则的更新

消息队列

首先做简单的引入。

  1. 队列(来自百度百科):是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。
  2. 消息队列(来自百度百科):是在消息的传输过程中保存消息的容器。

从队列和消息队列的定义看来,看不出什么相似之处。但我理解它们的作用是相似的,只是使用环境不同。队列和消息队列 本质上都可以用于解决“生产者”和“消费者”问题,在二者这间建立桥梁,it中专业术语是对“生产者”和“消费者”进行解耦。可以动态的通过调整“生产者”和“消费者”线程数或服务器实例数,在正常情况使消费和生产到达一个平衡;在高峰情况下(生产者大于消费者)可以保护消费者不被拖垮的同时,还可以对把积压的数据保存下来,消费者可以延迟消费这些数据进行处理。

队列 一般指的是单个服务实例内部使用,比如,在java中的一个jvm实例内部可以使用Queue的子类(Deque:双端队列,是Queue的子接口),比如:单线程情况下使用LinkedList(无界)、PriorityQueue(优先队列);多线程情况下可以阻塞队列ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue(延迟队列 无界)、PriorityBlockingQueue(优先 无界)、SynchronousQueue(没有容量的队列)。可以看到java的api已经很强大了,可以根据自己的业务需求选择使用。使用方法:生产者从一端放入消息,消费者从另一端取出消息进行处理,消息放到队列里(感觉是不是有点像“消息队列”的定义)。

MQ主要是用来:

  1. 解耦应用、
  2. 异步化消息
  3. 流量削峰填谷

目前使用的较多的有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。

使用redis作为消息队列的用法第2张

另外上面提到的“有界”和“无界”,指的是队列的容量大小。有界 指的是创建队列时必须指定队列的容量;无界 创建队列时无需指定队列的容量,容量大小取决于jvm实例分配的内存空间大小。在海量业务场景里,我们期望队列的容量是无限的,但单个jvm实例 即便是使用“无界”队列 由于单个实例内存是有限的,最终无法容纳下海量的消息数据。聪明的程序员就想 能不能使用一个第三方的队列来存储这些数据呢?当然是可以的,这就产生了“消息队列”。

消息队列 一般是采用一个独立的集群专门用于消息存储,可以存储在内存里 也可以直接存储在磁盘中。比如常见的:RabbitMQ、kafka、rocketMQ、ActiveMQ、zeromq等等,它们有不同的特性,以及采用了各种不同的实现,适用于各种场景的消息任务分发。但他们本质作用跟上面讲的单实例环境中java“队列”没什么两样:在消息的传输过程中保存消息的容器。只是这里转换到“分布式”环境中而已。

使用redis作为消息队列的用法第3张

可以看到这里这里提到的“传统”消息队列,都是一个很重型的集群。如果这个分布式环境中的消息数量有限,我们可以不必引入这种重型的mq框架。比如:本次分享的主题 如何使用redis实现“消息队列”。

redis 实现消息队列

redis有一个数据类型叫list(列表),它的每个子元素都是 string 类型的双向链表。我们可以通过 push,pop 操作从链表的头部或者尾部添加删除元素。这使得 list 既可以用作栈,也可以用作队列。

假如,我们有一个队列系统,把一个个任务放到队列中,另一个进程就把队列中的任务取出来执行。

放到队列我们使用LPUSH,也就是往双向链表的尾部填充一个元素,这一端也叫生产者,是产生内容的一端。

另一个进程使用RPOP往头部取出元素来执行,这一端也叫消费者。

如果仅仅是这种方式来实现队列,它就是需要进程不断地循环队列,判断队列是不是有新元素,有的话就取出来执行,没有的话,就继续循环,但是这个总有一个时间间隔,你总得规定每隔一段时间去循环,虽然这个时间很小,但总有延迟,这种方式叫作轮循。有没有一种方式就是让不断执行一个redis命令,而redis中的列队有值就会通过命令通知程序呢?有的,那就是阻塞操作的RPOP,它叫作BRPOP。

我们来演示一下它是如何实现的。

$ redis-cli

127.0.0.1:6379> BRPOP list1 0

先执行BRPOP,假如队列list1没有值,它会返回nil,并且阻塞在那,在等另一个程序或进程往list1中填值。

我们开启另一个redis端终。

$ redis-cli

127.0.0.1:6379> LPUSH list1 a

(integer) 1

我们再来看之前的结果。

127.0.0.1:6379> BRPOP list1 0

1) "list1"

2) "a"

(16.99s)

这样就能把列表的值给取到了。

优点

  1. 能够实现持久化
  2. 采用 Master-Slave 数据复制模式。队列操作都是写操作,Master任务繁重,能让Slave分担的持久化工作,就不要Master做。RDB和AOF两种方法都用上,多重保险。
  3. 支持集群
  4. 接口使用简单

不足

  1. Redis上消息只会被一个消费者消费,不会有多个订阅者消费同一个消息,简单一对一
  2. 生产者或者消费者崩溃后的处理机制,需要自己实现
  3. 生产者写入太快,消费者消费太慢,导致Redis的内存问题,处理机制需要自己实现

通过pub/sub来实现

实现机制

订阅,取消订阅和发布实现了发布/订阅消息范式,发布者不是计划发送消息给特定的订阅者。而是发布的消息分到不同的频道,不需要知道什么样的订阅者订阅。订阅者对一个或多个频道感兴趣,只需接收感兴趣的消息,不需要知道什么样的发布者发布的。

这是一种基于非持久化的消息机制,消息发布者和订阅者必须同时在线,否则一旦消息订阅者由于各种异常情况而被迫断开连接,在其重新连接后,其离线期间的消息是无法被重新通知的(即发即弃)。

Redis中的消息可以提供两种不同的功能。一类是基于Channel的消息,这一类消息和Redis中存储的Keys没有太多关联,也就是说即使不在Redis中存储任何Keys信息,这类消息也可以独立使用。另一类消息可以对(也可以不对)Redis中存储的Keys信息的变化事件进行通知,可以用来向订阅者通知Redis中符合订阅条件的Keys的各种事件。

通过springboot 构建redis消息队列

首先springboot配置文件配置如下:

spring.redis.host=localhost

spring.redis.port=6379

消息生产者,注入redisTemplate,用convertAndSend发送消息

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.core.StringRedisTemplate;

import org.springframework.stereotype.Service;

@Service

public class PublishService {

@Autowired

private StringRedisTemplate stringRedisTemplate;

public void sendMsg(String channel, String msg) {

stringRedisTemplate.convertAndSend(channel, msg);

}

}

消费者:创建一个接收消息的类,继承MessageListener,也可以不继承

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Service;

@Slf4j

@Service

public class RedisReceiver {

public void receiveMessage(String message) {

log.info("receive message is {}",message);

}

}

消息订阅者配置类:

import com.wuzy.queue.RedisReceiver;

import org.springframework.context.annotation.Bean;

import org.springframework.data.redis.connection.RedisConnectionFactory;

import org.springframework.data.redis.core.StringRedisTemplate;

import org.springframework.data.redis.listener.PatternTopic;

import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

import org.springframework.stereotype.Component;

/**

* redis 监听配置

*/

@Configuration

public class RedisSubListenerConfig {

/**

* 初始化监听器

*

* @param connectionFactory

* @param listenerAdapter

* @return

*/

@Bean

RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,

MessageListenerAdapter listenerAdapter) {

RedisMessageListenerContainer container = new RedisMessageListenerContainer();

container.setConnectionFactory(connectionFactory);

container.addMessageListener(listenerAdapter, new PatternTopic("channel_1")); // new PatternTopic("这里是监听的通道的名字") 通道要和发布者发布消息的通道一致

return container;

}

/**

* 绑定消息监听者和接收监听的方法

*

* @param redisReceiver

* @return

*/

@Bean

MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {

// redisReceiver 消息接收者

// receiveMessage 消息接收后的方法

MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();

messageListenerAdapter.setDefaultListenerMethod("receiveMessage");

messageListenerAdapter.setDelegate(redisReceiver);

return messageListenerAdapter;

}

@Bean

StringRedisTemplate template(RedisConnectionFactory connectionFactory) {

return new StringRedisTemplate(connectionFactory);

}

}

优点

  1. 一个生产者能够对应多个消费者
  2. 支持集群
  3. 接口使用简单

不足

  1. Redis提供的订阅/发布功能并不完美,更不能和ActiveMQ/RabbitMQ提供的订阅/发布功能相提并论。
  2. 首先这些消息并没有持久化机制,属于即发即弃模式。也就是说它们不能像ActiveMQ中的消息那样保证持久化消息订阅者不会错过任何消息,无论这些消息订阅者是否随时在线。
  3. 由于本来就是即发即弃的消息模式,所以Redis也不需要专门制定消息的备份和恢复机制。
  4. 也是由于即发即弃的消息模式,所以Redis也没有必要专门对使用订阅/发布功能的客户端连接进行识别,用来明确该客户端连接的ID是否在之前已经连接过Redis服务了。ActiveMQ中保持持续通知的功能的前提,就是能够识别客户端连接ID的历史连接情况,以便确定哪些订阅消息这个客户端还没有处理。
  5. Redis也没有为发布者和订阅者准备保证消息性能的任何方案,例如在大量消息同时到达Redis服务是,如果消息订阅者来不及完成消费,就可能导致消息堆积。而ActiveMQ中有专门针对这种情况的慢消息机制。

免责声明:文章转载自《使用redis作为消息队列的用法》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇线程的查看以及利用gdb调试多线程C# winform 多线程中创建等待窗体下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Redis服务启动失败,提示:redis-server:command not found

今天我开始做主从复制的集群模式的测试,所以需要再装一个Linux操作系统,我在虚拟机里已经安装了一个Linux操作系统,Redis也已经配置好了。今天打算再安装一个Linux操作系统,Linux系统的安装过程很简单,就不多说了,如果大家想看,请查看我的另一篇文章《Redis进阶实践之一VMWare Pro虚拟机安装和Linux系统的安装》。 Linux操作...

RabbitMQ 集群模式

1、主备模式: 实现RabbitMQ的高可用集群,一般在并发和数据量不高的情况下,这种模型非常的好用且简单。主备模式也称之为Warren模式 主备模式架构图 HaProxy配置 备注: rabbitmq集群节点配置 inter 每隔5秒对mq集群做健康检查, 2次正确证明服务器可用, 2次失败证明服务器不可用,并且配置主备机制 2、远程模式(早期使用较...

使用redis时出现java.util.ArrayList cannot be cast to java.lang.Long

java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.Long at redis.clients.jedis.Connection.getIntegerReply(Connection.java:222) at redis.client...

springboot 2.x版本Redis设置JedisConnectionFactory

一、 springboot2.x 集成redis时,配置连接信息和构造方法发生了改变。 2.X版本可以使用RedisStandaloneConfiguration、RedisSentinelConfiguration、RedisClusterConfiguration三种方式配置连接信息。 这里我们以RedisStandaloneConfiguration...

redis一主两从搭建

一主两从搭建: 主配: daemonize yes port 6379 logfile ./redis6379.log dir ./ bind 10.131.156.170 从1配: daemonize yes port 6380 logfile ./redis6380.log dir ./ bind 10.131.156.170 slaveof 10....

redis实现加锁的几种方法示例详解

1. redis加锁分类 redis能用的的加锁命令分表是INCR、SETNX、SET 2. 第一种锁命令INCR 这种加锁的思路是, key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCR 操作进行加一。然后其它用户在执行 INCR 操作进行加一时,如果返回的数大于 1 ,说明这个锁正在被使用当中。     1、 客户端A请求服务器...