RabbitMQ(二)核心组件介绍

摘要:
前言本文主要介绍AMQP的核心组件:RabbitAdminSpringAMQP语句RabbitTemplateSimpleMessageListenerContainerMessageListenerAdapteMessageConverterRabbitAdminRabbitAdmin类封装了RabbitMQ管理端的操作,如Exchange操作、队列操作、绑定绑定操作等,
前言

本文主要介绍AMQP核心组件:

  • RabbitAdmin
  • SpringAMQP 声明
  • RabbitTemplate
  • SimpleMessageListenerContainer
  • MessageListenerAdapte
  • MessageConverter
RabbitAdmin

RabbitAdmin类是针对RabbitMQ管理端进行封装操作,比如:Exchange操作、Queue操作,Binding绑定操作等,操作起来简单便捷!

  • 用于声明RabbitMQ相关配置、操作RabbitMQ
  • autoStartup设为true:表示Spring容器启动时自动初始化RabbitAdmin
  • 底层实现:从Spring容器中获取Exchange、Binding、RoutingKey以及Queue的@Bean声明
  • rabbitTemplate的execute方法执行对应的声明等操作
@Configuration
@ComponentScan({"com.orcas.spring"})
public class RabbitMQConfig {

   /**
     * 创建 RabbitMQ 连接工厂
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("192.168.58.129:5672");
        connectionFactory.setUsername("orcas");
        connectionFactory.setPassword("1224");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

   /**
     * 创建 RabbitAdmin 类,这个类封装了对 RabbitMQ 管理端的操作! 比如:Exchange 操作,Queue 操作,Binding 绑定 等
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
} 
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    /**
     * 交换机操作
     */
    @Test
    public void testAdminExchange() {
        // 创建交换机, 类型为 direct,durable 参数表示是否持久化
        rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));

        // 创建交换机,类型为 topic,durable 参数表示是否持久化
        rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));

        // 创建交换机,类型为 fanout,durable 参数表示是否持久化
        rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
    }

    /**
     * 队列操作
     */
    @Test
    public void testAdminQueue() {
        // 创建队列
        // durable 参数表示是否持久化
        rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
        // 创建队列
        // durable 参数表示是否持久化
        rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
        // 创建队列
        // durable 参数表示是否持久化
        rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
    }

    /**
     * 绑定操作
     */
    @Test
    public void testAdminBinding() {
        /**
         * 两种写法都可以,都选择绑定 队列 或者 交换机
         */

        /**
         * destination 需要绑定队列的名字
         * DestinationType 绑定类型,
         *          Binding.DestinationType.QUEUE 表示是队列绑定
         *          Binding.DestinationType.EXCHANGE 表示交换机绑定
         *
         * exchange 交换机名称
         * routingKey 路由key
         * arguments 额外参数(比如绑定队列,可以设置 死信队列的参数)
         */
        rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "routing_direct", new HashMap<>()));

        /**
         * 链式写法
         */
        rabbitAdmin.declareBinding(
                BindingBuilder.bind(new Queue("test.topic.queue", false)) // 直接创建队列
                        .to(new TopicExchange("test.topic", false, false)) // 直接创建交换机,并建立关联关系
                        .with("routing_topic") // 指定路由 key
        );

        /**
         * 链式写法
         *
         * FanoutExchange 交换机,和路由key没有绑定关系,因为他是给交换机内所有的 queue 都发送消息!
         */
        rabbitAdmin.declareBinding(
                BindingBuilder.bind(new Queue("test.fanout.queue", false)) // 直接创建队列
                        .to(new FanoutExchange("test.fanout", false, false)) // 直接创建交换机,并建立关联关系
        );
    }

    /**
     * 其他操作-清空队列
     */
    @Test
    public void testAdminOther() {// noWait 参数是否需要等待: true 表示需要,false 表示不需要
        //      也就是需要清空的时候,我需要等待一下,在清空(会自动等待几秒钟)
        rabbitAdmin.purgeQueue("test.fanout.queue", false);
    }
} 
Spring AMQP 声明
  • exchange
    • TopicExchange
    • FanoutExchange
    • DirectExchange
  • queue
  • binding
    • BindingBuilder

spring使用@bean声明exchange queue binding 例子如下:

 @Bean  
 public TopicExchange exchange001() {  
     return new TopicExchange("topic001", true, false);  
 }  

 @Bean  
 public Queue queue001() {  
     return new Queue("queue001", true); //队列持久  
 }  
    
 @Bean  
 public Binding binding001() {  
     return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");  
 }  
RabbitTemplate

消息模板

  • 与SpringAMQP整合的时候进行发送消息的关键类
  • 该类提供了丰富的发送消息方法,包括可靠性投递方法,回调监听消息接口ConfirmCallback,返回值确认接口ReturnCallback等等。同样我们需要进行注入到spring容器中。然后进行使用。
  • 与Spring整合时需要实例化,但是与Springboot整合时不需要,在配置文件添加配置即可
  • rabbitTemplate.convertAndSend方法是主要的发送消息的方法
  • MessageProperties 用于构造消息体
  • MessagePostProcessor:消息发送之后对消息进行的设置

以下是RabbitTemplate实例化的例子:

 @Bean
 public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
     RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
     rabbitTemplate.setConfirmCallback(//);
     rabbitTemplate.setReturnCallback(//);
     return rabbitTemplate;
 }

以下是发送消息例子:

    @Test
    public void testSendMessage() throws Exception {
        //1 创建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc", "信息描述..");
        messageProperties.getHeaders().put("type", "自定义消息类型..");
        Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
        
        rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                System.err.println("------添加额外的设置---------");
                message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                return message;
            }
        });
    }
SimpleMessageListenerContainer

简单消息监听容器 功能:

  • 监听多个队列、自动启动、自动声明
  • 设置事务特性、事务管理器、事务属性、事务容器、是否开启事务、回滚消息
  • 设置消费者数量、最小最大数量、批量消费
  • 设置消息确认和自动确认模式、是否重回队列、异常捕获函数
  • 设置消费者标签生成策略、是否独占模式、消费者属性等
  • 设置具体的监听器、消息转换器 message convert
  • SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者的大小、接收消息的模式等,可以基于此开发rabbitmq自定义后台管控平台
属性:
  • queues: 消费队列
  • concurrentConsumers:当前消费者数
  • maxConcurrentConsumers:最大消费者并发
  • defaultRequeueRejected: 是否重回队列,默认: false
  • acknowledgeMode:消息确认模型,默认:AUTO
  • exposeListenerChannel:
  • messageListener: 消息监听
  • consumerTagStrategy:consumerTag生成策略
    /**
     * 简单消息监听容器
     * 配置完成后。可以在管控台看到消息者信息。 以及消费者标签信息
     *
     * @param connectionFactory 链接工厂
     * @return SimpleMessageListenerContainer
     */
    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        //设置要监听的队列
        simpleMessageListenerContainer.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
        //初始化消费者数量
        simpleMessageListenerContainer.setConcurrentConsumers(1);
        //最大消费者数量
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
        //设置是否重回队列[一般为false]
        simpleMessageListenerContainer.setDefaultRequeueRejected(false);
        //设置自动ack
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //设置channel 是否外露
         simpleMessageListenerContainer.setExposeListenerChannel(true);
        //设置消费端标签的策略
        simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queueName) {
                return queueName + "_" + UUID.randomUUID().toString();
            }
        });
        //设置消息监听 ChannelAwareMessageListener
        simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                System.out.println("----------消费者: " + msg);
            }
        });

        return simpleMessageListenerContainer;
    }
MessageListenerAdaper

消息监听适配器

  • extends AbstractAdaptableMessageListener 消息listener
  • queueOrTagToMethodName 队列标识与方法名称组成的集合
  • defaultListenerMethod 默认监听方法名称
  • Delegate 委托对象:实际真实的委托对象
  • 可以一一进行队列和方法名称的匹配
  • 队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接受处理
/**
 * 通过`simpleMessageListenerContainer` 配置消息监听适配器。 指向这个类
 */
public class MessageDelegate {
    /**
     * MessageListenerAdapter 默认指定接收消息的方法的名字就是 handleMessage .当然也可以手动设置
     *
     * @param messageBody message信息
     */
    public void handleMessage(byte[] messageBody) {
        System.err.println("默认方法,消息内容: " + new String(messageBody));
    }

    public void consumeMessage(byte[] messageBody) {
        System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
    }

    public void consumeMessage(String messageBody) {
        System.err.println("字符串方法, 消息内容:" + messageBody);
    }
}



/**
 * spring amqp 消息转换器
 *
 * @author yangHX
 * createTime  2019/4/6 12:28
 */
public class TextMessageConverter implements MessageConverter {

    /**
     * 将数据转化为 message 类
     *
     * @param o                 要发送的数据
     * @param messageProperties 消息头
     * @return Message
     * @throws MessageConversionException ex
     */
    @Override
    public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(o.toString().getBytes(), messageProperties);
    }

    /**
     * 将message转换为想要的数据类型
     *
     * @param message message
     * @return Object
     * @throws MessageConversionException ex
     */
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {

        String contentType = message.getMessageProperties().getContentType();
        if (null != contentType && contentType.contains("text")) {
            return new String(message.getBody());
        }
        return message.getBody();
    }
}

//消息监听适配器 只截取了一小段 /* * 适配器方式。 默认是有自己的方法名字。 handleMessage * 可以自己指定一个方法的名称。 consumerMessage * 也可以添加一个转换器: 从字节数组转换为String */ MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate); //设定queue使用哪个adapter方法处理 Map<String,String> queueOrTagToMethodName = new HashMap<>(); queueOrTagToMethodName.put("queue001","method1"); queueOrTagToMethodName.put("queue002","method2"); adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); //设置默认处理方法,默认处理方法是handleMessage adapter.setDefaultListenerMethod("handleMessage"); //设置消息转换方式 adapter.setMessageConverter(new TextMessageConvert()); //消息监听 container.setMessageListener(adapter); /** * 发送消息。测试转换器和适配器 * <p> * 转换器判断contentType 将字节数组转化为字符串 * 适配器将数据交给 MessageDelegate 的 consumeMessage 方法进行处理 */ @Test public void testMessage4Text() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plan"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.send("topic002", "rabbit.abc", message); }

RabbitMQ(二)核心组件介绍第1张

MessageConverter
  • 我们在消息传输的时候,正常情况下消息体为二进制的数据方式进行传输。如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter

  • 自定义常用转换器 MessageConverter一般来讲都需要实现这个接口

  • 重写下面两个方法

    • toMessage: java 对象转换为Message
    • fromMessage: Message对象转换为java对象
  • 转换器类型
    • json转换器: jackson2JsonMessageConverter: 可以进行java对象的转换功能
    • DefaultJackson2JavaTypeMapper映射器: 可以进行java对象的映射关系
    • 自定义二进制转换器: 比如图片类型、PDF,PPT, 流媒体

1、Json转换器:Jackson2JsonMessageConverter

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
container.setMessageListener(jackson2JsonMessageConverter);

2、支持Java对象的转换:DefaultJackson2JavaTypeMapper&Jackson2JsonMessageConverter

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
        
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper);
        adapter.setMessageConverter(jackson2JsonMessageConverter);

3、支持java对象多映射转换:DefaultJackson2JavaTypeMapper& Jackson2JsonMessageConverter

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
        
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
// (标签, 类的全路径)将标签和类进行绑定
idClassMapping.put("order", com.orcas.spring.entity.Order.class);
idClassMapping.put("packaged", com.orcas.spring.entity.Packaged.class);
        
javaTypeMapper.setIdClassMapping(idClassMapping);
        
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);

4、全局的转换器:ContentTypeDelegatingMessageConverter

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
        
//全局的转换器:
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
        
TextMessageConverter textConvert = new TextMessageConverter();
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
        
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
        
ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter);
        
PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);
                
adapter.setMessageConverter(convert);
container.setMessageListener(adapter);

5、图片转换器:

public class ImageMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        throw new MessageConversionException(" convert error ! ");
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.err.println("-----------Image MessageConverter----------");
        // 获取消息扩展属性中的"extName"
        Object _extName = message.getMessageProperties().getHeaders().get("extName");
        // 若为空默认为png, 否则就获取该扩展名
        String extName = _extName == null ? "png" : _extName.toString();
        
        byte[] body = message.getBody();
        String fileName = UUID.randomUUID().toString();
        // 该图的路径+图片名
        String path = "d:/010_test/" + fileName + "." + extName;
        File f = new File(path);
        try {
            Files.copy(new ByteArrayInputStream(body), f.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return f;
    }

}

六、PDF转换器:

public class PDFMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        throw new MessageConversionException(" convert error ! ");
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.err.println("-----------PDF MessageConverter----------");
        
        byte[] body = message.getBody();
        String fileName = UUID.randomUUID().toString();
        String path = "d:/010_test/" + fileName + ".pdf";
        File f = new File(path);
        try {
            Files.copy(new ByteArrayInputStream(body), f.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return f;
    }

}

七、测试代码:

    @Test
    public void testSendExtConverterMessage() throws Exception {
//            byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png"));
//            MessageProperties messageProperties = new MessageProperties();
//            messageProperties.setContentType("image/png");
//            messageProperties.getHeaders().put("extName", "png");
//            Message message = new Message(body, messageProperties);
//            rabbitTemplate.send("", "image_queue", message);
        
            byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf"));
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("application/pdf");
            Message message = new Message(body, messageProperties);
            rabbitTemplate.send("", "pdf_queue", message);
    }

引用:

https://juejin.im/post/5d0309596fb9a07ebf4b6ad2

https://www.javatt.com/p/11158

免责声明:文章转载自《RabbitMQ(二)核心组件介绍》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇RocketMQ学习笔记(5)----RocketMQ监控平台rocketmq-console-ng的搭建C# 打开模态对话框 和打开文件夹下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

跟着我一步一步的搭建一个基于springcloud的微服务实例

Table of Contentsgenerated with DocToc microservicecloud 插件推荐 建立父工程Microservicecloud 搭建Employ员工服务创建数据库 创建消费者服务microservicecloud-employconsummer Eureka注册中心搭建 单机模式创建microserv...

wsdl详解

<wsdl:definitions xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/" xmlns:ns1="http://org.apache.axis2/xsd" xmlns:ns="http://impl.service.utcs.ctc" xmlns:wsaw="http://www.w3.org/20...

django实现支付宝支付

目录 django支付宝支付 新建支付宝应用 创建应用(使用沙箱环境测试) 按照官方要求生成私钥(可以上支付宝开发平台下载支付宝开发助手) 把生成的app公钥粘贴到沙箱的app中 查看沙箱账号和密码 支付宝开发地址 说明 在utils中封装请求支付宝扫码地址url的函数和生成订单id的函数 在model.py中定义表 在views.py中...

c# 扩展方法 奇思妙用 高级篇 九:OrderBy(string propertyName, bool desc)

下面是 Queryable 类 中最常用的两个排序的扩展方法: 1 2 public static IOrderedQueryable<TSource> OrderBy<TSource, TKey>(this IQueryable<TSource> source, Expression<Func<T...

cjson库的使用以及源码阅读

cjson是一个用c语言开发的json解析库,免费开源只有一个c文件和一个h文件。json和xml功能相似,可以用来传输数据,存储数据以及表达程序当前的状态。 1、下载cjson的源码         https://github.com/DaveGamble/cJSON 2、阅读readme文件可以大概的了解一下cjson的介绍以及使用方法,我尝试着把...

shiro源码解析

一、web.xml 文件中配置的 DelegatingFilterProxy 的 <filter-name>为啥与Spring文件中配置的ShiroFilterFactoryBean的Bean id 保持一致? <filter> <filter-name>shiroFilter</filter-n...