RabbitMQ 消息应答机制

摘要:
一、概述消费者处理一个任务是需要一段时间的,如果有一个消费者正在处理一个比较耗时的任务并且只处理了一部分,突然这个时候消费者宕机了,那么会出现什么情况呢?

一、概述

消费者处理一个任务是需要一段时间的,如果有一个消费者正在处理一个比较耗时的任务并且只处理了一部分,突然这个时候消费者宕机了,那么会出现什么情况呢?

要回答这个问题,我们先了解一下 RabbitMQ 的消息应答机制

为了保证消息从队列可靠地达到消费者并且被消费者消费处理,RabbitMQ 提供了消息应答机制,RabbitMQ 有两种应答机制,自动应答和手动应答

1、自动应答、RabbitMQ 只要将消息分发给消费者就被认为消息传递成功,就会将内存中的消息删除,而不管消费者有没有处理完消息

2、手动应答、RabbitMQ 将消息分发给了消费者,并且只有当消费者处理完成了整个消息之后才会被认为消息传递成功了,然后才会将内存中的消息删除

可以看出,如果是自动应答模式,消费者在处理任务的过程中宕机了,那么消息将会丢失,手动应答则能够保证消息不会被丢失,所以在实际的应用当中绝大多数都采用手动应答

二、手动应答常用 API

// 该消息已经处理完成了,RabbitMQ 内存可以删除该消息了
void basicAck(long deliveryTag, boolean multiple)
// 不处理该消息,直接拒绝,然后将该消息丢弃
void basicReject(long deliveryTag, boolean requeue)
void basicNack(long deliveryTag, boolean multiple, boolean requeue)

三、原理图

Producer 生产消息发送给消息队列,Consumer01 消费消息1、Consumer02 消费消息2、Consumer01 接收到了消息之后,在处理完部分逻辑的时候突然宕机了,Consumer01 未发送 ACK,此时消息1 不会丢失,而是重新进入队列,由状态正常的 Consumer02 消费掉

RabbitMQ 消息应答机制第1张

四、编码

1、RabbitmqUtils(工具类)

public class RabbitmqUtils {
    private static final String HOST_ADDRESS = "192.168.59.130";
    private static final String USER_NAME = "admin";
    private static final String PASSWORD = "admin123";

    public static Channel getChannel() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST_ADDRESS);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

2、Producer

public class Producer {
    private static final String QUEUE_NAME = "ackDemo";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        String message = "有意思的消息--->";
        for (int i = 1; i < 11; i++) {
            channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes(StandardCharsets.UTF_8));
        }
        System.out.println("Producer send message successfully");
    }
}

3、Consumer01

public class Consumer01 {
    private static final String QUEUE_NAME = "ackDemo";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            try {
                // 休眠 10 s
                Thread.sleep(10 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 参数一、deliveryTag:消息应答标记
            // 参数二、multiple:(false、只应答接收到的那个消息 true、应答所有传递过来的消息)
            // 处理完逻辑之后应答 ack
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            System.out.println(message);
        };

        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag);
        };

        // 设置手动应答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}

4、Consumer02

public class Consumer02 {
    private static final String QUEUE_NAME = "ackDemo";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            // 参数一、deliveryTag:消息应答标记
            // 参数二、multiple:(false、只应答接收到的那个消息 true、应答所有传递过来的消息)
            // 处理完逻辑之后应答 ack
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            System.out.println(message);
        };

        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag);
        };

        // 设置手动应答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}

  

五、测试过程及结果

1、先启动 Cousumer01、Consumer02

2、生产者发送 10 条消息,根据默认的轮询规则,一个消费者(假设此时为 Consumer01)消费第 1、3、5、7、9 条消息,另外一个消费者(假设此时为 Consumer02)消费第 2、4、6、8、10 条消息

3、当 Consumer01 消费第 1、3 条消息的时候手动强制关闭 Consumer01,那么原先本应该由 Consumer01 消费的第 5、7、9 条消息不会丢失,它们将重新进入队列由 Consumer02 消费掉

4、Consumer01、Consumer02 消费的消息如下:

RabbitMQ 消息应答机制第2张

免责声明:文章转载自《RabbitMQ 消息应答机制》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇OpenStack 中的neutron-server启动过程Python爬取中文页面的时候出现的乱码问题下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

dubbo

目录 java spi dubbo扩展机制spi 注册中心 RegistryService Registry RegistryFactory NotifyListener AbstractRegistry FailbackRegistry zookeeper注册中心ZookeeperRegistry 远程通信 Transporter Ex...

c# 实现网页上用户自动登陆|asp.net 模拟网站登录

using System;using System.Collections.Generic;using System.Text;using System.Net;using System.IO; namespace Czt.Web{ /// <summary> /// 实现网站登录类 /// </summary> public cl...

c#根据标题判断进程是否存在,是否已退出

 int handle = FindWindow(null, "XXX");  if (handle == 0)//0说明不存在 [DllImport("User32.dll", EntryPoint = "FindWindow")] private static extern int FindWindow(string lpClassName, stri...

Jakarta Java Mail属性参数配置

前言 Jakarta Mail网址:https://eclipse-ee4j.github.io/mail SMTP协议可匹配的属性:https://eclipse-ee4j.github.io/mail/docs/api/com/sun/mail/smtp/package-summary.html 翻译(Package com.sun.mail.smtp...

提取网页中的超链接(C#)

using System;using System.Xml;using System.Text;using System.Net;using System.IO;using System.Collections;using System.Text.RegularExpressions; public class App{public static voi...

JSP中文乱码问题的由来以及解决方法

首先明确一点,在计算机中,只有二进制的数据! 一、java_web乱码问题的由来 1.字符集 1.1 ASCII字符集 在早期的计算机系统中,使用的字符非常少,这些字符包括26个英文字母、数字符号和一些常用符号(包括控制符号),对这些字符进行编码,用1个字节就足够了(1个字节可以表示28=256种字符)。然而实际上,表示这些字符,只使用了1个字节的7位,这...