Spring Cloud Stream学习笔记

摘要:
//@输入(Input)可订阅频道输入()//发送@Output(Output)消息频道输出();importorg.springframework.web.bind.annotation.RestController;

目录

1 环境

2 简介

Spring Cloud Stream是一个用于构建消息驱动的微服务应用的框架,其提供的一系列抽象屏蔽了不同类型消息中间件使用上的差异,同时也大大简化了Spring在整合消息中间件时的使用复杂度。

Spring Cloud Stream 提供了Binder(负责与消息中间件进行交互)

3 初见

1 创建项目 添加web rabbitmq stream依赖

在这里插入图片描述

2 rabbitmq配置

# 其他参数默认配置
spring.rabbitmq.host=你的host

3 消息接收器

// 该注解表示绑定Sink消息通道
@EnableBinding(Sink.class)
public class MsgReceiver {

    private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class);

    // 自带 消费者
    @StreamListener(Sink.INPUT)
    public void receive(Object payload){
        logger.info("received: " + payload);
    }

}

4 在rabbitmq中发送消息

在这里插入图片描述

5 查看结果

在这里插入图片描述

4 自定义消息通道

1 自定义接口


public interface MyChannel {
    String INPUT = "test-input";
    String OUTPUT = "test-output";

    // 收
    @Input(INPUT)
    SubscribableChannel input();

    // 发
    @Output(OUTPUT)
    MessageChannel output();
}

2 自定义接收器

// 绑定自定义消息通道
@EnableBinding(MyChannel.class)
public class MsgReceiver1 {

    private static final Logger logger = LoggerFactory.getLogger(MsgReceiver1.class);

    // 收
    @StreamListener(MyChannel.INPUT)
    public void receive(Object payload){
        logger.info("received1: " + payload + ":" + new Date());
    }

}

3 controller进行测试

package com.sundown.stream.controller;

import com.sundown.stream.bean.ChatMessage;
import com.sundown.stream.msg.MyChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

@RestController
public class HelloController {

    @Autowired
    MyChannel myChannel;

    @GetMapping("/hello")
    public void hello(){
        String message = "welcome spring cloud stream";
 myChannel.output().send(MessageBuilder.withPayload(message).build());
    }
}

4 消息输入输出(通道对接)

spring.cloud.stream.bindings.test-input.destination=test-topic
spring.cloud.stream.bindings.test-output.destination=test-topic

5 启动、访问

在这里插入图片描述
在这里插入图片描述

5 消息分组

  • 消息分组(肥水不留外人田 你可能不知道流向哪家田 但是确实是自己人)

1 打包 访问(未使用消息分组)

在这里插入图片描述

  • 启动java -jar stream-0.0.1-SNAPSHOT.jar
    java -jar stream-0.0.1-SNAPSHOT.jar --server.port=8081运行访问http://localhost:8080/hello
  • 结果(多次消费)
    在这里插入图片描述在这里插入图片描述
  • 现在我不想一条消息被多次消费(假设消费者是一个集群 --> 多个人做同一件事 题外话:分布式 --> 一件事分给多个人做) 是否有什么办法呢
    消息分组帮我们解决(指定输入 输出 有么有负载均衡的味道)

2 消息分组配置

spring.cloud.stream.bindings.test-input.destination=test-topic
spring.cloud.stream.bindings.test-output.destination=test-topic

spring.cloud.stream.bindings.test-input.group=gg
spring.cloud.stream.bindings.test-output.group=gg
  • 为了验证是否能成功 重新打包运行 和上面一样 访问接口
    在这里插入图片描述
    在这里插入图片描述
  • 清空2个控制台的信息 再次访问接口
    在这里插入图片描述
    在这里插入图片描述

6 消息分区

  • 为一些具有相同特征的消息设置每次都被同一个消费实例进行消费。

1 消息分区配置

  • properties配置
spring.cloud.stream.bindings.test-input.destination=test-topic
spring.cloud.stream.bindings.test-output.destination=test-topic

spring.cloud.stream.bindings.test-input.group=gg
spring.cloud.stream.bindings.test-output.group=gg

# 开启消费分区(消费者上配置)
spring.cloud.stream.bindings.test-input.consumer.partitioned=true
# 消费者实例个数(消费者上配置)
spring.cloud.stream.instance-count=2
# 当前实例下标(消费者上配置)
spring.cloud.stream.instance-index=0

2 controller配置

@RestController
public class HelloController {

    @Autowired
    MyChannel myChannel;

    @GetMapping("/hello")
    public void hello(){
        String message = "welcome spring cloud stream";
        // 先写死
        int whichPart = 1;
        System.out.println("发送消息:" + message + ",发往分区:" + whichPart);
        myChannel.output().send(MessageBuilder.withPayload(message).setHeader("whichPart", whichPart).build());
    }
}

3 访问

  • 打包运行java -jar stream-0.0.1-SNAPSHOT.jar --spring.cloud.stream.instance-index=0
    java -jar stream-0.0.1-SNAPSHOT.jar --server.port=8081 --spring.cloud.stream.instance-index=0(别忘了先关闭启动类 不然打包会报错)
    -访问http://localhost:8080/hello
    在这里插入图片描述

4 若是随机访问呢

@GetMapping("/hello")
    public void hello(){
        String message = "welcome spring cloud stream";
        int whichPart = new Random().nextInt(2);
        System.out.println("发送消息:" + message + ",发往分区:" + whichPart);
        myChannel.output().send(MessageBuilder.withPayload(message).setHeader("whichPart", whichPart).build());
    }
  • 和上面一样打包访问
    在这里插入图片描述

7 定时器

虽然定时任务可以用cron表达式 但是对于一些特殊的定时任务 可以使用stream+rabbitmq更合适 比如几分钟后执行
rabbitmq插件安装

1 配置

  • properties
spring.rabbitmq.host=xxx

spring.cloud.stream.bindings.test-input.destination=topic
spring.cloud.stream.bindings.test-output.destination=topic

spring.cloud.stream.rabbit.bindings.test-input.consumer.delayed-exchange=true
spring.cloud.stream.rabbit.bindings.test-output.producer.delayed-exchange=true

#spring.cloud.stream.bindings.test-input.destination=test-topic
#spring.cloud.stream.bindings.test-output.destination=test-topic
#
#spring.cloud.stream.bindings.test-input.group=gg
#spring.cloud.stream.bindings.test-output.group=gg
#
## 开启消费分区(消费者上配置)
#spring.cloud.stream.bindings.test-input.consumer.partitioned=true
## 消费者实例个数(消费者上配置)
#spring.cloud.stream.instance-count=2
## 当前实例下标(消费者上配置)
#spring.cloud.stream.instance-index=0
#
## 生产者配置
#spring.cloud.stream.bindings.test-output.producer.partition-key-expression=headers['whichPart']
## 消费节点数量
#spring.cloud.stream.bindings.test-output.producer.partition-count=2

  • 自定义通道
// 绑定自定义消息通道
@EnableBinding(MyChannel.class)
public class MsgReceiver1 {

    private static final Logger logger = LoggerFactory.getLogger(MsgReceiver1.class);

    // 收
    @StreamListener(MyChannel.INPUT)
    public void receive(Object payload){
        // 添加日期 一会好对比
        logger.info("received1: " + payload + ":" + new Date());
    }

}
  • controller
@RestController
public class HelloController {
    private static final Logger logger = LoggerFactory.getLogger(HelloController.class);

    @Autowired
    MyChannel myChannel;

    @GetMapping("/delay")
    public void delay(){
        String message = "welcome spring cloud stream";
        logger.info("send msg:" + new Date());
        // x-delay --> 延迟3s
        myChannel.output().send(MessageBuilder.withPayload(message).setHeader("x-delay", 3000).build());
    }
}

2 启动 访问

在这里插入图片描述

  • 打开rabbitmq查看
    在这里插入图片描述
  • 查看idea控制台
    在这里插入图片描述

8 小结

stream自带的与自定义(添加destination=xxx)之间的类似和区别
解决重复消费 分组(group)
消息分组单个实例访问(开启消费分区 实例个数 实例下标 生产者配置 消费节点数)
定时器 rabbitmq相关的插件安装运行 后端代码实现(配置delayed-exchange和destination以及controller 发送时添加setHeader("x-delay", 3000) 3s延时)

免责声明:文章转载自《Spring Cloud Stream学习笔记》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇在Linux上运行C#Linux系统中imp导入dmp文件下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

RocketMQ4.X 集群

RocketMQ4.X 多种集群模式 单节点 : 优点:本地开发测试,配置简单,同步刷盘消息一条都不会丢 缺点:不可靠,如果宕机,会导致服务不可用 主从(异步、同步双写) : 优点:同步双写消息不丢失, 异步复制存在少量丢失 ,主节点宕机,从节点可以对外提供消息的消费,但是不支持写入 缺点:主备有短暂消息延迟,毫秒级,目前不支持自动切换,需要脚本或者...

【转】可在广域网部署运行的即时通讯系统 -- GGTalk总览(附源码下载)

  原文地址:http://www.cnblogs.com/justnow/p/3382160.html (最新版本:V6.0,2017.12.11 。即将推出Xamarin移动端版本,包括 Android 和 iOS)        GGTalk开源即时通讯系统(简称GG)是QQ的高仿版,同时支持局域网和广域网,包括客户端(PC客户端、android移动...

WCF的简单

WCF的简单 WCF的学习之旅 一、WCF的简单介绍      Windows Communication Foundation(WCF)是由微软发展的一组数据通信的应用程序开发接口,可以翻译为Windows通讯接口,它是MS为SOA (Service  Oriented Architecture)战略而设计的一套完整的技术框架。它是一种统一的编程模型,用...

RabbitMQ-C 客户端接口使用说明

  rabbitmq-c是一个用于C语言的,与AMQP server进行交互的client库。AMQP协议为版本0-9-1。rabbitmq-c与server进行交互前需要首先进行login操作,在操作后,可以根据AMQP协议规范,执行一系列操作。   这里,根据项目需求,只进行部分接口说明,文后附demo的github地址。 接口描述 接口说明:声明一...

WCF安全:通过 扩展实现用户名密码认证

  在webSservice时代,可以通过SOAPHEADER的方式很容易将用户名、密码附加到SOAP header消息头上,用户客户端对调用客户端身份的验证。在WCF 时代,也可以通过OperationContext.Current.IncomingMessageHeaders的方式将用户名、密码附加到SOAP消息中。但是这种方式实现起来有个缺点;那就是...

RabbitMQ面试题

1、为什么要引入MQ系统,直接读写数据库不行吗?其实就是问问你消息队列都有哪些使用场景,然后你项目里具体是什么场景,说说你在这个场景里用消息队列是什么? 面试官问你这个问题,期望的一个回答是说,你们公司有个什么业务场景,这个业务场景有个什么技术挑战,如果不用 MQ 可能会很麻烦,但是你现在用了 MQ 之后带给了你很多的好处。 先说一下消息队列常见的使用场景...