RabbitMQ(二):Java 操作队列

摘要:
简单的队列是一一对应的。在我们的实际开发中,生产者很容易发送消息,而消费者通常需要与业务集成。在收到消息后,消费者需要处理它们,这可能需要时间。此时,队列将积压许多消息。

1. 简单模式

模型:

RabbitMQ(二):Java 操作队列第1张

  • P:消息的生产者
  • 队列:rabbitmq
  • C:消息的消费者

获取 MQ 连接

public static Connection getConnection() throws IOException, TimeoutException {
        // 定义一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置服务地址
        factory.setHost("127.0.0.1");
        // AMQP 5672
        factory.setPort(5672);
        // vhost
        factory.setVirtualHost("/vhost_ljf");
        // 用户名
        factory.setUsername("ljf");
        // 密码
        factory.setPassword("123456");
        return factory.newConnection();
    }

生产者生产消息

public class Send {
    private static final String QUEUE_NAME = "test_simple_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取一个连接
        Connection connection = ConnectionUtils.getConnection();
        // 从连接中获取一个通道
        Channel channel = connection.createChannel();
        // 创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String msg = "hello simple!";
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        System.out.println("--send msg: " + msg);
        channel.close();
        connection.close();
    }
}

消费者接收消息

public class Recv {
    private static final String QUEUE_NAME = "test_simple_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取到达的消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("recv: " + msg);
            }
        };
        // 监听队列
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

简单队列的不足

耦合性高,生产者一一对应消费者(如果我想有多个消费者消费队列中的消息,这时候就不行了);

队列名变更,这时候得同时变更。

2. 工作队列模式(Work Queue)

模型

RabbitMQ(二):Java 操作队列第2张

为什么会出现工作队列?

simple 队列是一一对应的,而且我们实际开发,生产者发送消息是毫不费力的,而消费者一般是要跟业务相结合的,消费者接收到消息之后就需要处理,可能需要花费时间,这时候队列就会积压了很多消息。

生产者

/**
 *                 |----C1
 *   P----Queue----|
 *                 |----C2
 */
public class Send {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException ,InterruptedException{
        // 获取连接
        Connection connection = ConnectionUtils.getConnection();

        // 获取 channel
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 50; i++) {
            String msg = "hello" + i;
            System.out.println("[WQ] send: " + msg);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            Thread.sleep(i*20);
        }
        channel.close();
        connection.close();
    }
}

消费者

  • 消费者1
public class Recv1 {
    private static final String QUEUE_NAME = "test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 获取 channel
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义一个消费者
        Consumer consumer = new DefaultConsumer(channel) {
            // 消息到达 触发方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[1] Recv msg: " + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[1] done.");
                }
            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}
  • 消费者2
public class Recv2 {
    private static final String QUEUE_NAME = "test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 获取 channel
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义一个消费者
        Consumer consumer = new DefaultConsumer(channel) {
            // 消息到达 触发方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[2] Recv msg: " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[2] done.");
                }
            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

现象

先运行消费者1和消费者2,再运行生产者

消费者1 和 消费者2 处理的消息数量是一样多的。

消费者1:偶数

消费者2:奇数

这种方式叫做轮询分发(round-robin),结果就是不管谁忙谁清闲,都不会多给一个消息。

3. 公平分发(fair dipatch)

生产者

public class Send {
    private static final String QUEUE_NAME = "test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException ,InterruptedException{
        // 获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 获取 channel
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /**
         * 每个消费者:发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息
         */
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        for (int i = 0; i < 50; i++) {
            String msg = "hello" + i;
            System.out.println("[WQ] send: " + msg);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            Thread.sleep(i*5);
        }
        channel.close();
        connection.close();
    }
}

消费者

  • 消费者1
public class Recv1 {
    private static final String QUEUE_NAME = "test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 获取 channel
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);  // 保证一次只发送一个
        // 定义一个消费者
        Consumer consumer = new DefaultConsumer(channel) {
            // 消息到达 触发方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[1] Recv msg: " + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[1] done.");
                    // 手动回执
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        boolean autoAck = false; // 自动应答 false
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}
  • 消费者2
public class Recv2 {
    private static final String QUEUE_NAME = "test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 获取 channel
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);  // 保证一次只发送一个
        // 定义一个消费者
        Consumer consumer = new DefaultConsumer(channel) {
            // 消息到达 触发方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[2] Recv msg: " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[2] done.");
                    // 手动回执
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        boolean autoAck = false; // 自动应答 false
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

现象

消费者2 处理的消息比 消费者1 多,能者多劳。

4. 消息应答与消息持久化

消息应答

boolean autoAck = false; // 自动应答 false
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  • boolean autoAck = true;(自动确认模式)

一旦 rabbitmq 将消息分发给消费者,就会从内存中删除;

这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息。

  • boolean autoAck = false;(手动模式)

如果一个消费者挂掉,就会交付给其他消费者;

rabbitmq 支持消息应答,消费者发送一个消息应答,告诉 rabbitmq 这个消息我已经处理完成,可以删掉,然后 rabbitmq 就删除内存中的消息。

消息应答默认是打开的,即为 false;

如果 rabbitmq 挂了,消息任然会丢失。

消息持久化

// 声明队列
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

注意:rabbitmq 不允许重新定义(不同参数)一个已存在的队列

免责声明:文章转载自《RabbitMQ(二):Java 操作队列》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Python字符编码期权波动率模型及交易策略分析下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

HiveSQL 数据定义语言(DDL)

第一章、数据定义语言(DDL)概述 1.1 DDL语法的作用 数据定义语言 (Data Definition Language, DDL),是SQL语言集中对数据库内部的对象结构进行创建,删除,修改等的操作语言,这些数据库对象包括database(schema)、table、view、index等。核心语法由CREATE、ALTER与DROP三个所组成。D...

Java实现 “ 将数字金额转为大写中文金额 ”

前言:输入数字金额参数,运行程序得到其对应的大写中文金额;例如:输入 12.56,输出 12.56 : 壹拾贰元伍角陆分;重点来了:本人亲测有效。 奉上代码:/*** @Title: ConvertUpMoney* @Description: 将数字金额转换为大写中文金额* @date: 2019年6月18日 下午10:52:27*/public clas...

SHA256加密(登陆注册的-密码加盐)

主要代码: 1 // 盐值 2 String salt = null; 3 String password = user.getPassword(); 4 //加密密码 5 String encryptPassword = null; 6 7 salt...

【转】IOS中各种常用控件的默认高度,很全

1.状态栏 状态栏一般高度为20像素,在打手机或者显示消息时会放大到40像素高,注意,两倍高度的状态栏在好像只能在纵向的模式下使用。如下图   用户可以隐藏状态栏,也可以将状态栏设置为灰色,黑色或者半透明的黑色。   如果需要隐藏状态栏可以使用调用: [[UIApplication sharedApplication] setStatusBarHi...

【RabbitMQ】一文带你搞定springboot整合RabbitMQ涉及消息的发送确认,消息的消费确认机制,延时队列的实现

说明 这一篇里,我们将继续介绍RabbitMQ的高级特性,通过本篇的学习,你将收获: 什么是延时队列 延时队列使用场景 RabbitMQ中的TTL 如何利用RabbitMQ来实现延时队列 本文大纲 什么是延迟队列 延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。 其次,延时队列,最...

[转载]ASP.NET Core文件上传与下载(多种上传方式)

ASP.NET Core文件上传与下载(多种上传方式)  前言 前段时间项目上线,实在太忙,最近终于开始可以研究研究ASP.NET Core了. 打算写个系列,但是还没想好目录,今天先来一篇,后面在整理吧. ASP.NET Core 2.0 发展到现在,已经很成熟了.下个项目争取使用吧. 正文 1.使用模型绑定上传文件(官方例子) 官方机器翻译的地址:...