Java ActiveMQ 讲解(二)Spring ActiveMQ整合+注解消息监听

摘要:
对于ActiveMQ消息的发送,原始语音的API操作很麻烦,并且在没有二次封装的情况下打开和关闭会话以及创建各种操作就足够了。Spring为发送和接收消息提供了一个方便的框架Spring-jms。集成Spring之后,代码不仅变得非常优雅,而且还具有更好的可用性和可扩展性--activemq--˃org.apache.xbeanxbean-spring3.16org.springframeworkspringjms${springframeframework.version}org.apache.activemqactivemqall${activemq.version}2.命名空间介绍--查找最新的schemaLocation访问http://www.springframework.org/schema/--˃3.Xml配置˂!

对于ActiveMQ消息的发送,原声的api操作繁琐,而且如果不进行二次封装,打开关闭会话以及各种创建操作也是够够的了。那么,Spring提供了一个很方便的去收发消息的框架,spring jms。整合Spring后,代码不仅变得非常优雅,而且易用性和扩展性更好。

1. maven依赖

        <!-- activemq -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${springframework.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>${activemq.version}</version>
        </dependency>

2.命名空间引入

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-3.2.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

3. Xml配置

    <amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://${activemq.ip}:61616" userName="${activemq.username}" password="${activemq.password}" />

    <bean id="jmsConnectionFactoryExtend" class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="jmsConnectionFactory" />
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- 消息处理器 -->
    <bean id="jmsMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />

    <!-- ====Producer side start==== -->

    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="jmsConnectionFactoryExtend" />
        <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="false" />
        <property name="messageConverter" ref="jmsMessageConverter"></property>
    </bean>

    <!-- 定义JmsTemplate的Topic类型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="jmsConnectionFactoryExtend" />
        <!-- pub/sub模型(发布/订阅) -->
        <property name="pubSubDomain" value="true" />
        <property name="messageConverter" ref="jmsMessageConverter"></property>
    </bean>

    <jms:listener-container destination-type="queue" container-type="default" connection-factory="jmsConnectionFactoryExtend" acknowledge="auto" concurrency="5-10">
        <jms:listener destination="testqueue" ref="queueReciver" />
    </jms:listener-container>

第一个是配置我们的mq连接,ip+端口号,帐号密码的信息。

第二个是引入spring的mq连接池。可以配置缓存的连接数。

第三个是消息处理器,Spring默认提供了基于Jdk Serializable的消息处理和MappingJackson2MessageConventer,其实这两个挺常用,在Spring Redis中,在Spring MVC中,都有着这几种conventer的身影。

下面是两个发送消息的模版类,类似于之前讲过的RedisTemplate。向其注入上面定义的消息处理器,代码中我们会用到。(其实类中已经判断如果不进行注入就设置一个默认的,但是自己注入的话,方便我们控制)

 listener-container是Spring提供的一个监听器容器,用于统一控制我们的监听类来接收处理消息。这里面有一些配置,schema有说明。可以配置响应模式,消费者数量等。开启多消费者,有助于加快队列处理速度。

4.注解方式的实现

如果要用注解的方式,就不需要在xml中自己定义消息监听容器了。只需要加入以下的代码:

<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
        <property name="connectionFactory" ref="jmsConnectionFactoryExtend"/>
    </bean>
    
    <!-- 监听注解支持 -->
    <jms:annotation-driven/>

这样,配置我们消费处理类上的@listener注解,即可监听对应的queue或者topic消息。

5.生产者代码

队列消息:

@Resource
@Component("queueSender")
public class QueueSender {

    @Resource(name = "jmsQueueTemplate")
    private JmsTemplate jmsQueueTemplate;// 通过@Qualifier修饰符来注入对应的bean
    

    public void send(String destination, final Object message) {
        jmsQueueTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return jmsQueueTemplate.getMessageConverter().toMessage(message, session);
            }
        });
    }

}

订阅消息:

@Component
public class TopicSender {
    
    @Resource(name="jmsTopicTemplate")
    private JmsTemplate jmsTemplate;
    
    
    /**
     * 发送一条消息到指定的队列(目标)
     * @param queueName 队列名称
     * @param message 消息内容
     */
    public void publish(String destination,final Object message){
        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return jmsTemplate.getMessageConverter().toMessage(message, session);
            }
        });
    }

}

6.消费者代码

package cn.test.activemq.consumer.queue;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.stereotype.Component;

import cn.test.MqBean;
import cn.test.activemq.message.types.QueueDefination;

/**
 * @author Han
 */
@Component("spqueueconsumertest")
public class SpringQueueReciverTest extends MessageListenerAdapter{
    private static final Logger log = LoggerFactory.getLogger(SpringQueueReciverTest.class);
    
    
    
    @JmsListener(destination=QueueDefination.TEST_QUEUE,concurrency="5-10")
    public void onMessagehehe(Message message, Session session) throws JMSException {
        try {
            MqBean bean = (MqBean) getMessageConverter().fromMessage(message);
            System.out.println(bean.getName());
            System.out.println(session);
            message.acknowledge();
            message.acknowledge();
        } catch (MessageConversionException | JMSException e) {
            e.printStackTrace();
        }
        
    }
    
}

上面的@JmsListener(destination=QueueDefination.TEST_QUEUE,concurrency="5-10")是在用注解方式监听的时候加入。如果用xml配置容易,可以忽略。

附上MqBean

public class MqBean implements Serializable{
    private Integer age;
    private String name;
    public Integer getAge() {
        return age;
    }
    public void setAge(Integer age) {
        this.age = age;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    
}

 运行效果截图:

Java ActiveMQ 讲解(二)Spring ActiveMQ整合+注解消息监听第1张

免责声明:文章转载自《Java ActiveMQ 讲解(二)Spring ActiveMQ整合+注解消息监听》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇npm之基本使用转:WebSocket与Java下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

[SpringBoot] SpringApplication.run 执行流程

作者:王奕然链接:https://www.zhihu.com/question/21346206/answer/101789659来源:知乎著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。建议不要硬着头皮看spring代码,本身的代码800多m,就是不上班开始看也不知道什么时候看完。如果想学学ioc,控制反转这些建议看看jodd项目,...

整理分布式锁:业务场景&amp;amp;分布式锁家族&amp;amp;实现原理

1、引入业务场景 业务场景一出现: 因为小T刚接手项目,正在吭哧吭哧对熟悉着代码、部署架构。在看代码过程中发现,下单这块代码可能会出现问题,这可是分布式部署的,如果多个用户同时购买同一个商品,就可能导致商品出现库存超卖 (数据不一致)现象,对于这种情况代码中并没有做任何控制。 原来一问才知道,以前他们都是售卖的虚拟商品,没啥库存一说,所以当时没有考虑那么多...

数据库连接池SQLAlchemy中多线程安全问题

数据库连接池SQLAlchemy中多线程安全的问题 1、数据库模块model.py from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker session_factory = sessionmaker(bind=some_engine) Ses...

kafka(一)入门

一、消息引擎系统 这类系统引以为豪的消息传递属性,像引擎一样,具备某种能量转换传输的能力 消息引擎系统是一组规范,企业利用这组规范在不同系统之间传递语义准确的消息,实现松耦合的异步式数据传递。通俗地讲就是系统A发送消息给消息引擎系统,系统B从消息引擎系统读取系统A的消息 既然消息引擎系统是用于不同系统之间传输消息的,如何设计待传输消息的格式,提供可重用性及...

rocketmq学习(一) rocketmq介绍与安装

1.消息队列介绍   消息队列本质上来说是一个符合先进先出原则的单向队列:一方发送消息并存入消息队列尾部(生产者投递消息),一方从消息队列的头部取出消息(消费者消费消息)。但对于一个成熟可靠的消息队列来说,所需要解决的主要问题还包括:高效可靠的消息投递、存储;能承受高并发的流量冲击,可通过集群部署来解决单点故障等等。   由于消息队列具备了以上特点,因此在...

java常见异常

算术异常类:ArithmeticExecption 空指针异常类:NullPointerException 类型强制转换异常:ClassCastException 数组负下标异常:NegativeArrayException 数组下标越界异常:ArrayIndexOutOfBoundsException 违背安全原则异常:SecturityExceptio...