ActiveMQ之JMS及保证消息的可靠性<持久化、事务、签收>(三)

摘要:
但对于非持久性消息,服务器关闭后消息将永远丢失(消息){TextMessagetextMessage=Message;System.out.println;Message=topicSubscriber.receive;//1秒后,消息为空,跳过循环,控制台关闭}。。。对于消息使用者来说,打开事务可以防止消息被多次使用,以及后台和服务器数据的不一致。Sessionsession=connection.createSession;//Sessionsession=connection.createSession;//它也是自动登录。。。一场commit();然而,在不提交的情况下启动事务将重复消耗很少的知识:brokerbroker将以代码形式启动ActiveMQ,并将MQ嵌入Java代码中,Java代码可以随时启动,从而节省资源并提高可靠性。

1.JAVAEE 是一套使用Java 进行企业级开发的13 个核心规范工业标准 , 包括:

 JDBC  数据库连接

 JNDI  Java的命名和目录接口

 EJB   Enterprise java bean

 RMI   远程方法调用    一般使用TCP/IP 协议

 Java IDL    接口定义语言

 JSP    

 Servlet 

 XML

 JMS    Java 消息服务

 JTA        

 JTS

 JavaMail

 JAF

JMS部件:

ActiveMQ之JMS及保证消息的可靠性<持久化、事务、签收>(三)第1张

 5 个主要的消息头:

ActiveMQ之JMS及保证消息的可靠性<持久化、事务、签收>(三)第2张

发送和接收的消息类型必须一致

消息属性:识别、去重、重点标注

2. 如何保证消息的可靠性 

    JMS 可靠性:Persistent   持久性  、 事务 、 Acknowledge  签收

   2.1 持久化

// 在队列为目的地的时候持久化消息
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

// 队列为目的地的非持久化消息
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

 持久化的消息,服务器宕机后消息依旧存在,只是没有入队,当服务器再次启动,消息任就会被消费。

但是非持久化的消息,服务器宕机后消息永远丢失。 而当你没有注明是否是持久化还是非持久化时,默认是持久化的消息。

对于目的地为主题(topic)来说,默认就是非持久化的,让主题的订阅支持化的意义在于:对于订阅了公众号的人来说,当用户手机关机,在开机后任就可以接受到关注公众号之前发送的消息。

代码实现:持久化topic 的消费者

      ……    // 前面代码相同,不复制了      
        Topic topic = session.createTopic(TOPIC_NAME);
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");

         // 5 发布订阅
        connection.start();

        Message message = topicSubscriber.receive();// 一直等
         while (null != message){
             TextMessage textMessage = (TextMessage)message;
             System.out.println(" 收到的持久化 topic :"+textMessage.getText());
             message = topicSubscriber.receive(3000L);    // 等1秒后meesage 为空,跳出循环,控制台关闭
         }
   ……

持久化生产者

          ……  
   
        MessageProducer messageProducer = session.createProducer(topic);
        // 6 通过messageProducer 生产 3 条 消息发送到消息队列中

        // 设置持久化topic 在启动
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); 
        connection.start();
        for (int i = 1; i < 4 ; i++) {
            // 7  创建字消息
            TextMessage textMessage = session.createTextMessage("topic_name--" + i);
            // 8  通过messageProducer发布消息
            messageProducer.send(textMessage);

            MapMessage mapMessage = session.createMapMessage();
            //    mapMessage.setString("k1","v1");
            //     messageProducer.send(mapMessage);
        }
        // 9 关闭资源
      …… 

ActiveMQ之JMS及保证消息的可靠性&lt;持久化、事务、签收&gt;(三)第3张

 当生产者启动后:

ActiveMQ之JMS及保证消息的可靠性&lt;持久化、事务、签收&gt;(三)第4张

 消息被消费,并且:  (因为我在receive方法中设置了如果接收到消息后3秒还没有消息就离线,也也可以设置永久存活)

ActiveMQ之JMS及保证消息的可靠性&lt;持久化、事务、签收&gt;(三)第5张

 2.2 事务

     createSession的第一个参数为true 为开启事务,开启事务之后必须在将消息提交,才可以在队列中看到消息

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

 提交:

session.commit(); 

事务开启的意义在于,如果对于多条必须同批次传输的消息,可以使用事务,如果一条传输失败,可以将事务回滚,再次传输,保证数据的完整性。

  对于消息消费者来说,开启事务的话,可以避免消息被多次消费,以及后台和服务器数据的不一致性。举个栗子:

如果消息消费的  createSession  设置为 ture  ,但是没有 commit ,此时就会造成非常严重的后果,那就是在后台看来消息已经被消费,但是对于服务器来说并没有接收到消息被消费,此时就有可能被多次消费。

 2.3 Acknowledge  签收  (俗称ack)

    非事务    :

Session.AUTO_ACKNOWLEDGE      自动签收,默认

Session.CLIENT_ACKNOWLEDGE     手动签收
手动签收需要acknowledge   
textMessage.acknowledge();

 而对于开启事务时,设置手动签收和自动签收没有多大的意义,都默认自动签收,也就是说事务的优先级更高一些。

Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//Session session = connection.createSession(true,Session.CLIENT_ACKNOWLEDGE);   //  也是自动签收   

        ……

session.commit(); 

 但是开启事务没有commit 任就会重复消费

小知识:  broker 

broker 就是实现了用代码形式启动 ActiveMQ 将 MQ 内嵌到 Java 代码中,可以随时启动,节省资源,提高了可靠性。

就是将 MQ 服务器作为了 Java 对象 

使用多个配置文件启动 activemq 

cp activemq.xml  activemq02.xml 

// 以active02 启动mq 服务器
./activemq start xbean:file:/myactivemq/apache-activemq-5.15.9/conf/activemq02.xml 

 把小型 activemq 服务器嵌入到 java 代码: 不在使用linux 的服务器

  需要的包:

<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.9.5</version>
</dependency>

代码实现:

/**
*Embebroker Demo
*2019-12-23 17:31:34
*/
public
class Embebroker { public static void main(String[] args) throws Exception { // broker 服务 BrokerService brokerService = new BrokerService(); // 把小型 activemq 服务器嵌入到 java 代码 brokerService.setUseJmx(true); // 原本的是 192.…… 是linux 上的服务器,而这里是本地windows 的小型mq 服务器 brokerService.addConnector("tcp://localhost:61616"); brokerService.start(); } }

免责声明:文章转载自《ActiveMQ之JMS及保证消息的可靠性&amp;lt;持久化、事务、签收&amp;gt;(三)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇[WCF安全系列]实例演示:TLS/SSL在WCF中的应用[SSL over TCP]git clone远程仓库的分支下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

android的几种“通知”方式简单实现(Notification&amp;amp;NotificationManager)

  关于通知Notification相信大家都不陌生了,平时上QQ的时候有消息来了或者有收到了短信,手机顶部就会显示有新消息什么的,就类似这种。今天就稍微记录下几种Notification的用法。3.0以前的通知和3.0以后的通知是有些区别的。话不多说,直接上代码。   1、新建一个android项目     我新建项目的 minSdkVersion="1...

iOS数据持久化的方式

概论 所谓的持久化,就是将数据保存到硬盘中,使得在应用程序或机器重启后可以继续访问之前保存的数据。在iOS开发中,有很多数据持久化的方案,接下来我将尝试着介绍一下5种方案: plist文件(属性列表) preference(偏好设置) NSKeyedArchiver(归档) SQLite 3 CoreData 沙盒 在介绍各种存储方法之前,有必要说明以...

ABP VNext框架基础知识介绍(2)微服务的网关

ABP VNext框架如果不考虑在微服务上的应用,也就是开发单体应用解决方案,虽然也是模块化开发,但其集成使用的难度会降低一个层级,不过ABP VNext和ABP框架一样,基础内容都会设计很多内容,如数据库都支持Oracle、SQLServer、MySql、PostgreSQL、SQLite,都有利用Redis作为分布式缓存,使用RabbitMQ作为事件总...

Mysql面对高并发修改的问题处理【2】

一、线上修改表结构有哪些风险? 如果有一天业务系统需要增大一个字段长度,能否在线上直接修改呢?在回答这个问题前,我们先来看一个案例: 以上语句尝试修改user表的name字段长度,语句被阻塞。按照惯例,我们检查一下当前进程: 从进程可以看出alter语句在等待一个元数据锁,而这个元数据锁很可能是上面这条select语句引起的,事实正是如此。在执行DML...

在qt中用tcp传输xml消息

在qt中用tcp传输xml消息 本文博客链接:http://blog.csdn.net/jdh99,作者:jdh,转载请注明. 环境: 主机:WIN7 开发环境:Qt5 3.1.2 说明: 在tcp上传输xml消息. 协议格式例如以下: 2字节标识(0xc55c,网络序)+2字节预留 +4字节报文内容长度(网络序) + 4字节命令字(网络序)+报...

# AMQP协议 0-9-1 简介

目录 AMQP是什么AMQP 0-9-1 模型简介 交换机和交换机类型 默认交换机 直连交换机 扇型交换机 主题交换机 头交换机 队列 队列名称 队列持久化 绑定 消费者 消息确认 拒绝消息 Negative Acknowledgements 预取消息 消息属性和有效载荷(消息主体) 消息确认 其他 AMQP 0-9-1 方法...