消息队列(三)Apache ActiveMQ

摘要:
理解JMSJava消息服务指的是两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持JAVA应用程序开发。Connection:客户端到JMS提供者之间的活动连接。JMSProducer:由JMSContext创建的对象,用于发送消息到Queue或TopicJMSConsumer:由JMSContext创建的对象,用于接收Queue或Topic中的消息在简化API中,一个JMSContext对象封装了传统API中Connection和Session两个对象的行为。
在Ubuntu上安装ActiveMQ

系统初始化

$ sudo apt update
$ sudo apt dist-upgrade
$ sudo apt autoremove
$ sudo apt clean

搭建activemq服务

$ mkdir /home/active-mq
$ cd /home/active-mq
$ wget http://www.apache.org/dist/activemq/5.15.9/apache-activemq-5.15.9-bin.tar.gz
# 具体版本请查看http://www.apache.org/dist/activemq
$ tar -zxvf apache-activemq-5.15.9-bin.tar.gz
# 如果未安装jdk,执行 sudo apt-get install openjdk-8-jdk
$ ./activemq start
INFO: Loading '/home/active-mq/apache-activemq-5.15.9//bin/env'
INFO: Using java '/usr/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/home/active-mq/apache-activemq-5.15.9//data/activemq.pid' (pid '6356')

监控

浏览器打开http://localhost:8161/admin/,输入admin,admin

消息队列(三)Apache ActiveMQ第1张

至此,ActiveMQ搭建完成。

理解JMS( Java Message Service)

Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持JAVA应用程序开发。

JMS模型

  • 点对点(P2P)或队列模型

    • 只有一个消费者将获得消息
    • 生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
    • 每一个成功处理的消息都由接收者签收
  • 发布/订阅模型

    • 多个消费者可以获得消息
    • 在发布者和订阅者之间存在时间依赖性。发布者需要创建一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者创建了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。

传统API

传统API提供的主要接口如下:

  • ConnectionFactory:客户端用来创建连接的受管对象。简化API也会使用此接口。

  • Connection:客户端到JMS提供者之间的活动连接。

  • Session:发送和接收消息的一个单线程上下文。

  • MessageProducer:由Session创建的对象,用于发送消息到Queue或Topic

  • MessageConsumer:由Session创建的对象,用于接收Queue或Topic中的消息

消息队列(三)Apache ActiveMQ第2张

简化API

简化API与传统API提供的消息功能是一样的,但是它需要的接口更少、使用更方便。 简化API提供的主要接口如下:

  • ConnectionFactory:客户端用来创建连接的受管对象。传统API也会使用此接口。
  • JMSContext:客户端到JMS提供者之间的活动连接,以及发送和接收消息的一个单线程上下文。
  • JMSProducer:由JMSContext创建的对象,用于发送消息到Queue或Topic
  • JMSConsumer:由JMSContext创建的对象,用于接收Queue或Topic中的消息

消息队列(三)Apache ActiveMQ第3张

在简化API中,一个JMSContext对象封装了传统API中Connection和Session两个对象的行为。

开发一个JMS客户端

一个使用传统API的JMS客户端典型的使用步骤如下:

  • 使用JNDI查找一个ConnectionFactory对象
  • 使用JNDI查找一个或多个Destination对象
  • 使用ConnectionFactory创建一个JMS Connection对象
  • 使用Connection创建一个或多个JMS Session对象
  • 使用Session和Destination对象创建需要的MessageProducer和MessageConsumer对象
  • 通知Connection对象开始投递消息

Active MQ是完全实现JMS规范的JMS客户端

Hello World

创建Hello World项目

创建gradle项目,并编辑build.gradle

    compile group: 'org.apache.activemq', name: 'activemq-all', version: '5.15.9'
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'

创建生产者

public class HelloWorldProducer implements Runnable {
    @Override
    public void run() {
        try {
            // 1. 创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
            // 2. 创建连接
            Connection connection = connectionFactory.createConnection();
            connection.start();
            // 3. 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 4. 创建目的地(主题或队列)
            Destination destination = session.createQueue("TEST.FOO");
            // 5. 从会话创建到目的地的消息发布者
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 6. 创建并发布消息
            String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
            TextMessage message = session.createTextMessage(text);
            System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName());
            producer.send(message);

            // 7. 销毁资源
            session.close();
            connection.close();
        } catch (JMSException e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }
}

创建消费者

public class HelloWorldConsumer implements Runnable, ExceptionListener {
    @Override
    public void run() {
        try {
            // 1. 创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
            // 2. 创建连接
            Connection connection = connectionFactory.createConnection();
            connection.start();
            // 3. 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 4. 创建目的地(主题或队列)
            Destination destination = session.createQueue("TEST.FOO");
            // 5. 从会话创建到目的地的消息消费者
            MessageConsumer consumer = session.createConsumer(destination);
            // 6. 等待接收消息
            Message message = consumer.receive(1000);
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                System.out.println("Received: " + text);
            } else {
                System.out.println("Received: " + message);
            }

            // 7. 销毁资源
            consumer.close();
            session.close();
            connection.close();
        } catch (JMSException e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }

    @Override
    public synchronized void onException(JMSException exception) {
        System.out.println("JMS Exception occured.  Shutting down client.");
    }
}

测试类

public class App {
    public static void main(String[] args) throws InterruptedException {
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        Thread.sleep(1000);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        Thread.sleep(1000);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldProducer(), false);
        Thread.sleep(1000);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
    }

    public static void thread(Runnable runnable, boolean daemon) {
        Thread brokerThread = new Thread(runnable);
        brokerThread.setDaemon(daemon);
        brokerThread.start();
    }
}

运行我们的测试程序,控制台将会打印:

Sent message: 507732978 : Thread-6
Sent message: 2056557229 : Thread-0
Sent message: 39234146 : Thread-8
Sent message: 1100925878 : Thread-13
Sent message: 1566392082 : Thread-17
Sent message: 1329793151 : Thread-1
Sent message: 988436874 : Thread-16
Received: Hello world! From: Thread-6 : 1442537083
Received: Hello world! From: Thread-1 : 1531760310
Received: Hello world! From: Thread-0 : 1817576164
Received: Hello world! From: Thread-8 : 262381200
Received: Hello world! From: Thread-17 : 1647178742
Received: Hello world! From: Thread-13 : 1610404140

免责声明:文章转载自《消息队列(三)Apache ActiveMQ》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇异常:Unknown lifecycle phase "mvn". You must specify a valid lifecyclelayer第二个按钮点击后关闭的解决方法下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

一次library cache pin故障的解决过程

在dbsnake 上看到的这篇文章,转过来。 主要还是学习解决问题的一个思路。这个往往比问题的解决更重要。 原文链接如下: http://dbsnake.com/2010/06/solve-library-cache-pin.html 内容如下: 今天接到同事的电话,说他的一个存储过程已经run了一个多小时了,还在继续run,他觉得极不正常,按道理说不应该...

接口测试小结

环境准备 1.JDK版本和Jar包确认,无特殊要求JDK安装后即可 2.数据库确认(通常使用dev),环境配置文件 ats-config.properties,数据库信息文件devdb.conf 3.在trunk流测试时需要查看基类是否有本地测试限制,有限制放开即可   1.session初始化 1.RPC接口写测试脚本时,往往需要初始化sessio...

springboot 整合websocket实现消息推送(nginx、vue)

最近需要一个动态图表的功能,如下图。 这种实现需要实时推送数据上来,那一般有两种方法 方法一:前端写个定时器,不断轮询后台即可。这当然是很low的,请求太多很不友好,果断抛弃 方法二:使用websocket,废话不多说直接上代码 springboot 整合websocket有两种方法,这里先记录原始方法: 添加webSocket插件      <!...

[转]SecureCRT的详细使用教程

原文链接: http://www.heibai.net/book/html/wangluogongju/yuanchengkongzhi/2009/0911/1081.html# VanDyke CRT 和 VanDyke SecureCRT是最常用的终端仿真程序,简单的说就是Windows下登录UNIX或Liunx服务器主机的软件。二者不同的是Sec...

Akka系列(九):Akka分布式之Akka Remote

前言.... Akka作为一个天生用于构建分布式应用的工具,当然提供了用于分布式组件即Akka Remote,那么我们就来看看如何用Akka Remote以及Akka Serialization来构建分布式应用。 背景 很多同学在程序的开发中都会遇到一个问题,当业务需求变得越来越复杂,单机服务器已经不足以承载相应的请求的时候,我们都会考虑将服务部署到不同的...

PHP漏洞之session会话劫持

本文主要介绍针对PHP网站Session劫持。session劫持是一种比较复杂的攻击方法。大部分互联网上的电脑多存在被攻击的危险。这是一种劫持tcp协议的方法,所以几乎所有的局域网,都存在被劫持可能。 服务端和客户端之间是通过session(会话)来连接沟通。当客户端的浏览器连接到服务器后,服务器就会建立一个该用户的session。每个用 户的sessio...