RabbitMQ(五)Springboot集成RabbitMQ

摘要:
基本概念@EnableRabbit@EnableRabbit与@Configuration一起使用,可以将其添加到类或方法中。此注释使容器能够检查注册bean的@RabbitListener@RabbitHandler@RabbitListener与@RabbitHandler一起使用时,不同类型的消息以不同的方式处理。依赖关系<Dependency><groupId>组织。弹簧框架。boot</groupId><artifactId>spring boot starter amqp</artifactId></dependent>配置应用程序。yml配置文件:spring:rabitmq:host:192.16858.129端口:5672用户名:orcaspassword:1224虚拟主机:/连接超时:15000publisher确认:true发布器返回:true模板:制造商:true监听器:简单:确认模式:manual#手动ackconcurrency:5#侦听消息数最大并发:10#用于声明的自定义mq配置参数交换机、队列和绑定路由顺序:queue:name:queue-2耐久性:truexchange:name:exchange-2持久性:truetype:topicognoreDeclarationExceptions:truekey:springroot*发布者确认,它实现侦听器以侦听代理返回的确认请求。Publisher返回,以确保代理可以访问消息。如果路由密钥不可访问,则侦听器将用于后续处理不可访问的消息,以确保消息成功路由。
  基本概念
  • @EnableRabbit

@EnableRabbit和@Configuration一起使用,可以加在类或者方法上,这个注解开启了容器对注册的bean的@RabbitListener检查。

  • @RabbitListener

@RabbitListener用于注册Listener时使用的信息:如queue,exchange,key、ListenerContainerFactory和RabbitAdmin的bean name。

扫描到bean带有该注解后,首先会将注解的内容封装到Endpoint对象中并和ListenerContainerFactory的实例一起添加到上面的RabbitListenerEndpointRegistry实例中。添加的时候会创建相应的ListenerContainer实例并添加Listener对象。
  • @RabbitHandler

@RabbitListener 和 @RabbitHandler结合使用,不同类型的消息使用不同的方法来处理。

  依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
  • application.yml配置文件:
spring:
  rabbitmq:
    host: 192.168.58.129
    port: 5672
    username: orcas
    password: 1224
    virtual-host: /
    connection-timeout: 15000
    publisher-confirms: true
    publisher-returns: true
    template:
      mandatory: true
    listener:
      simple:
        acknowledge-mode: manual # 手动ack
        concurrency: 5 # 监听消息的个数
        max-concurrency: 10 
      # 自定义mq配置 用于声明交换机、队列、绑定路由的参数
      order:
        queue:
          name: queue-2
          durable: true
        exchange:
          name: exchange-2
          durable: true
          type: topic
          ignoreDeclarationExceptions: true
        key: springboot.*
  • publisher-confirms,实现一个监听器用于监听Broker端返回的确认请求。
  • publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达情况,则使用监听器对不可达消息进行后续处理,保证消息的路由成功。
  • template.mandatory,true则监听器会接收到路由不可达的消息,然后进行处理;false则Broker会自动删除该消息。默认是false。

注:也可以参考与Spring整合中的配置文件,就是以@Bean的方式声明交换机、队列与绑定关系。

生产者
@Component
public class RabbitSender {

    //自动注入RabbitTemplate模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;  
    
    //回调函数: confirm确认
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData: " + correlationData);
            System.err.println("ack: " + ack);
            if(!ack){
                System.err.println("异常处理....");
            }
        }
    };
    
    //回调函数: return返回
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText,
                String exchange, String routingKey) {
            System.err.println("return exchange: " + exchange + ", routingKey: " 
                + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };
    
    //发送消息方法调用: 构建Message消息
    public void send(Object message, Map<String, Object> properties) throws Exception {
        MessageHeaders mhs = new MessageHeaders(properties);
        Message msg = MessageBuilder.createMessage(message, mhs);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        // 设置消息的唯一id
        CorrelationData correlationData = new CorrelationData("1234567890"); //id + 时间戳 全局唯一 
        rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
    }
    
    //发送消息方法调用: 构建自定义对象消息
    public void sendOrder(Order order) throws Exception {
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 全局唯一 
        CorrelationData correlationData = new CorrelationData("0987654321");
        rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
    }
}
消费者

@Exchange@Queue@QueueBinding 组合注解用来声明交换机、队列和绑定路由。

@Component
public class RabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", 
            durable="${spring.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", 
            durable="${spring.rabbitmq.listener.order.exchange.durable}", 
            type= "${spring.rabbitmq.listener.order.exchange.type}", 
            ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${spring.rabbitmq.listener.order.routingKey}"
            )
    )
@RabbitHandler
public void onOrderMessage(@Payload Order order, Channel channel, @Headers Map<String, Object> headers) throws Exception { System.err.println("--------------------------------------"); System.err.println("消费端order: " + order.getId()); Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); //手工ACK channel.basicAck(deliveryTag, false); } }

引用:

 https://www.javatt.com/p/11158

免责声明:文章转载自《RabbitMQ(五)Springboot集成RabbitMQ》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇一、tomcat基础介绍及安装部署架构使用vuex结合vue-meta实现router动态设置meta标签下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

开源RabbitMQ操作组件

开源RabbitMQ操作组件对于目前大多的.NET项目,其实使用的技术栈都是差不多,估计现在很少用控件开发项目的了,毕竟一大堆问题。对.NET的项目,目前比较适合的架构ASP.NET MVC,ASP.NET WebAPI,ORM(较多Dapper.NET或者其扩展,稍大一些的项目用EF等等),为了提高速度也会采用缓存(.NET自带的Memcache,或者R...

从零搭建企业大数据分析和机器学习平台-技术栈介绍(三)

数据传输和采集 Sqoop数据传输工具实际项目开发中,往往很多业务数据是存放在关系型数据库中,如 MySQL数据库。我们需要将这些数据集中到数据仓库中进行管理,便于使用计算模型进行统计、挖掘这类操作。 Sqoop是Apache软件基金会的⼀一款顶级开源数据传输工具,用于在 Hadoop与关系型数据库(如MySQL、Oracle、PostgreSQL等)之间...

c++消息队列的实现

#ifndef NET_FRAME_CONCURRENT_QUEUE_H #define NET_FRAME_CONCURRENT_QUEUE_H #include <queue> #include <mutex> #include <condition_variable> template<cla...

golang-nsq高性能消息队列

前言 tips:如果本文对你有用,请爱心点个赞,提高排名,让这篇文章帮助更多的人。谢谢大家!比心❤~ 如果解决不了,可以在文末加我微信,进群交流。 NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。 NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。 N...

Exchange学习:EWS 通过流通知和拉取通知订阅Exchange新邮件提醒

原理 EWS 通知以订阅的形式进行。 通常,每个邮箱一个订阅,在邮箱订阅内你还可以订阅一些或所有文件夹。 你可以决定订阅什么种类的通知(流式、拉取、推送)以及接收什么种类的事件(NewMail、Created、Deleted、Modified等),然后可以创建订阅。 然后 EWS 事件会非同步地从邮箱服务器发送到客户端。 EWS流式通知和拉取通知区别流式通...

RabbitMQ消息队列

  一、简介 RabbitMQ是一个在AMQP基础上完整的、可复用的企业消息系统,遵循Mozilla Public License开源协议。MQ全称Message Queue(消息队列),它是一种应用程序对应用程序的通信方式。应用程序通过读写入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接他们。消息传递指的是程序之间通过在消息中发送数据通信,...