Springboot+Redis(发布订阅模式)跨多服务器实战

摘要:
]]:取消订阅渠道2:在实践中,reids中的发布/订阅模式可用于解决部署在阿里的阿里服务与本地后台服务之间的接口不工作的问题。当然,也可以使用更强大的消息中间件。1: redis使用的maven依赖关系是<dependency><groupId>org。弹簧框架。bootspring-bootstarter数据编辑redis。客户端jedis˂!

一:redis中发布订阅功能(http://www.redis.cn/commands.html#pubsub

  • PSUBSCRIBE pattern [pattern …]:订阅一个或者多个符合pattern格式的频道

  • PUBLISH channel message:发布消息到chanel中

  • PUBSUB subcommand [argument [argument …]]:查看订阅与发布系统状态

  • PUNSUBSCRIBE [pattern [pattern …]]:退订所有符合格式的频道

  • SUBSCRIBE channel [channel …]:订阅一个或者多个频道

  • UNSUBSCRIBE [channel [channel …]]:取消订阅频道

二:实战使用reids中的发布订阅模式解决部署在阿里服务与本地后台服务的接口调用不通问题(跨多服务器)
  当然也可使用更为强大的消息中间件(RabbitMQ、ActiveMQ、RocketMQ、
Kafka、ZeroMQ)

1:redis使用到的maven依赖

    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <!-- springboot1.5版本使用jedis,2.0以上版本使用lettuce,本项目使用jedis,所以需要排除lettuce -->
            <exclusions>
                <exclusion>
                    <groupId>redis.clients</groupId>
                    <artifactId>jedis</artifactId>
                </exclusion>
                <!--
                <exclusion>
                    <groupId>io.lettuce</groupId>
                    <artifactId>lettuce-core</artifactId>
                </exclusion>
                -->
            </exclusions>
        </dependency>

2:application.yml配置

  redis:
    host: localhost
    port: 8379
    password: 123456
    database: 2
    timeout: 50s
    # 如果使用的jedis 则将lettuce改成jedis即可
    jedis:
      pool:
        max-active: 8
        max-idle: 8
        min-idle: 0

3:publisher发布者发布消息

  /**
     * 建立发布者,通过频道发布消息
     * @param key 发布者
     * @param value 消息
     */
    public void publish(String key,Object value){
        this.redisTemplate.convertAndSend(key,value);
    }
redisUtils.publish(RedisTopicEnums.TOPIC_DISCOVER.getTopic(),message);

4:第一种实现方法

    /**
     * redis消息监听器容器
     * @param connectionFactory
     * @param healthyListenerAdapter 健康扫描消息订阅处理器
     * @param settingsListenerAdapter 配置健康扫描消息订阅处理器
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter healthyListenerAdapter,
                                            MessageListenerAdapter settingsListenerAdapter
                                       ) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //设备健康扫描绑定消息订阅处理器
        container.addMessageListener(healthyListenerAdapter, new PatternTopic("healthy_topic"));
        //设备配置扫描并绑定消息订阅处理器
        container.addMessageListener(settingsListenerAdapter, new PatternTopic("settings_topic"));
        return container;
    }

    /**
     * 设备健康消息订阅处理器,并指定处理方法(利用反射的机制调用消息处理器的业务方法)
     * @param receiver
     * @return
     */
    @Bean
    MessageListenerAdapter healthyListenerAdapter(ReceiverRedisMessage receiver) {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "healthy");
        return messageListenerAdapter;
    }

    /**
     * 设备健康消息订阅处理器,并指定处理方法
     * @param receiver
     * @return
     */
    @Bean
    MessageListenerAdapter settingsListenerAdapter(ReceiverRedisMessage receiver) {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "settings");
        return messageListenerAdapter;
    }

缺点:1:考虑实际运用中可能会订阅多个主题,每加一个主题(Topic)都需要使用container.addMessageListener(listenerAdapter,new PatternTopic("topic"));不合理
   2:考虑到后期尽量不该变原有代码进行扩展,推荐使用下面第二种方式实现(保证开闭原则)


5:第二种实现方法:定义订阅者接收消息器接口

/**
 * 订阅者接收消息的基类
 * @author : ywb
 * @createdDate : 2020/8/6
 * @updatedDate
 */
public interface Subscriber extends MessageListener {

    /**
     * 类型
     * @return
     */
    default String getType() {
        return this.getClass().getSimpleName();
    }

    /**
     * 通道名称
     * @return
     */
    String getTopic();

}

6:定义不同主题枚举类型,后期增加一个管道,增加一个枚举信息即可

/**
 * 定义不同主题类型
 * @author : ywb
 * @createdDate : 2020/8/7
 * @updatedDate
 */
public enum RedisTopicEnums {

    /**
     * redis主题名称定义 需要与发布者一致
     *
     */
    TOPIC_DISCOVERY("topic:discovery", "设备发现变更Topic"),

    TOPIC_HEALTHY("topic:healthy", "健康扫描的设备Topic"),

    TOPIC_SETTINGS("topic:settings",  "配置扫描变更的设备Topic"),

    TOPIC_DISCOVER("topic:discover", "发现设备Topic"),


    ;
    /**
     * 主题名称
     */
    private String topic;


    /**
     * 描述
     */
    private String description;

    RedisTopicEnums(String topic, String description) {
        this.topic = topic;
        this.description = description;
    }


    public String getTopic() {
        return topic;
    }

    public String getDescription() {
        return description;
    }

}

7:实现多个订阅者,后续增加一个订阅者,只需要多加上一个订阅者类,从而不用改动redis消息 监听容器配置

7.1:设备健康扫描订阅者

/**
 * 设备健康扫描的订阅者
 *
 * @author : ywb
 * @createdDate : 2020/8/7
 * @updatedDate
 */
@Component
@Slf4j
public class HealthySubscriber implements Subscriber {

    @Autowired
    private DeviceService deviceService;

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Override
    public String getTopic() {
        return RedisTopicEnums.TOPIC_HEALTHY.getTopic();
    }



    @Override
    public void onMessage(Message message, byte[] pattern) {

        String deviceIds = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());

        log.info(">> 订阅消息,设备健康异常编号:{}", deviceIds);

        // TODO 这里是收到通道的消息之后执行的方法
        String[] split = deviceIds.split(",");

        Map<String, Set<Integer>> idsMap = TokenSplit.getDeviceIdRegex(split);

        for (Map.Entry<String, Set<Integer>> stringSetEntry : idsMap.entrySet()) {
            DeviceHandle healthyHandle = new DeviceHealthyHandle();
            healthyHandle.respondHandle(stringSetEntry.getValue());
        }

    }
}

7.2:配置扫描订阅者

/**
 * 设备配置变更订阅者
 *
 * @author : ywb
 * @createdDate : 2020/8/7
 * @updatedDate
 */
@Component
@Slf4j
public class SettingsSubscriber implements Subscriber {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Override
    public String getTopic() {
        return RedisTopicEnums.TOPIC_SETTINGS.getTopic();
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {

        //使用redis convertAndSend发布消息,订阅者获取字符串字节必须要反序列
        String deviceIds = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());

        log.info(">>订阅消息,设备配置变更编号:{}", deviceIds);

        // TODO 这里是收到通道的消息之后执行的方法
        String[] split = deviceIds.split(",");

        Map<String, Set<Integer>> idsMap = TokenSplit.getDeviceIdRegex(split);

        for (Map.Entry<String, Set<Integer>> stringSetEntry : idsMap.entrySet()) {
            DeviceScannerHandle scannerHandle = new DeviceScannerHandle();
            scannerHandle.respondHandle(stringSetEntry.getValue());
        }
    }
}

8:redisConfig配置,消息监听器容器配置

@Configuration
public class RedisConfig {

    /**
     * 自定义 redisTemplate<String, Object>
     *
     * @param redisConnectionFactory
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);

        ObjectMapper om = new ObjectMapper();
        // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类
        // om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        // om.activateDefaultTyping(BasicPolymorphicTypeValidator.builder().build(), ObjectMapper.DefaultTyping.EVERYTHING);
        om.activateDefaultTyping(new LaissezFaireSubTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
        om.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"));
        om.setTimeZone(TimeZone.getTimeZone("GMT+8"));
        // 不转换值为 null 的对象
        // om.setSerializationInclusion(JsonInclude.Include.NON_NULL);

        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        // key 采用 string 的序列化方式
        template.setKeySerializer(new StringRedisSerializer());
        // value 采用 jackson 的序列化方式
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash 的 key 采用 string 的序列化方式
        template.setHashKeySerializer(new StringRedisSerializer());
        // hash 的 value 采用 jackson 的序列化方式
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();

        return template;
    }

    /**
     * DependencyDescriptor
     * 重点
     * 首先判断注入的类型,如果是数组、Collection、Map,则注入的是元素数据,即查找与元素类型相同的Bean,注入到集合中。
     * 强调下Map类型,Map的 key 为Bean的 name,value 为 与定义的元素类型相同的Bean。
     *将所有相同类型(实现了同一个接口)的Bean,一次性注入到集合类型中,具体实现查看spring源码
     *
     * 获取Subscriptor接口所有的实现类
     * 注入所有实现了接口的Bean
     * 将所有的配置消息接收处理类注入进来,那么消息接收处理类里面的注解对象也会注入进来
     */
    @Autowired
    private transient List<Subscriber> subscriptorList;


    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

        //创建一个消息监听对象
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();

        //将监听对象放入到容器中
        container.setConnectionFactory(connectionFactory);

        if (this.subscriptorList != null && this.subscriptorList.size() > 0) {
            for (Subscriber subscriber : this.subscriptorList) {

                if (subscriber == null || StringUtils.isBlank(subscriber.getTopic())) {
                    continue;
                }
                //一个订阅者对应一个主题通道信息
                container.addMessageListener(subscriber, new PatternTopic(subscriber.getTopic()));
            }
        }

        return container;
    }

注:编写代码需遵循五大设计原则(SOLID),要善于运用设计模式





免责声明:文章转载自《Springboot+Redis(发布订阅模式)跨多服务器实战》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇软件开发,一定意味着加班吗?虹软Arcface人脸识别demo[Windows]分享下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

ElasticSearch的基本原理与用法

一、简介 ElasticSearch和Solr都是基于Lucene的搜索引擎,不过ElasticSearch天生支持分布式,而Solr是4.0版本后的SolrCloud才是分布式版本,Solr的分布式支持需要ZooKeeper的支持。 这里有一个详细的ElasticSearch和Solr的对比:http://solr-vs-elasticsearch.co...

Java 代码实现rar解压最全攻略操作

最全Java 代码实现rar解压操作首先,非常感谢下面几位链接上的支持,之所以写这篇博文,主要在于总结,同时给第一次实现该功能的同学提供完整的参考。因为第一次遇到需要在代码中实现rar和zip的解压操作。而zip其实很简单,jdk自带的ZipUtil就可以实现,这里不做赘述。但是rar的解压,特别是5.0及其以上版本的解压,折腾了我很久。根据这几位博主的思...

ssm+mybatis无法给带有下划线属性赋值问题,无法获取数据库带下划线的字段值

1、配置问题 <!-- 是否开启自动驼峰命名规则(camel case)映射, --> <setting name="mapUnderscoreToCamelCase" value="true"/> 或者 //开启驼峰映射 bean.getObject().getConfiguration().setMapUnderscore...

spring基于注解的IOC配置 知识点

明确:注解配置和xml配置要实现的功能都是一样的,都是要降低程序间的耦合。只是配置的形式不一样。 配置注解扫描的包:声明到指定的包下去进行扫描,如果发现类上有对应的注解,将其装配到容器中 <context:component-scan base-package="cn.XXX"></context:component-scan>...

Java开发中的23种设计模式详解(转)

设计模式(Design Patterns) ——可复用面向对象软件的基础 设计模式(Design pattern)是一套被反复使用、多数人知晓的、经过分类编目的、代码设计经验的总结。使用设计模式是为了可重用代码、让代码更容易被他人理解、保证代码可靠性。 毫无疑问,设计模式于己于他人于系统都是多赢的,设计模式使代码编制真正工程化,设计模式是软件工程的基石,如...

javase:习题

1、下来说法正确的是? A、JAVA程序的main方法必须写在类里面 B、JAVA程序中可以有多个main方法 C、JAVA程序中类名必须与文件名一样 public class Mytest { public class Animal{ } public class Plants{ } }...