rabbitmq 生产者 消费者(多个线程消费同一个队列里面的任务。) 一个通用rabbitmq消费确认,快速并发运行的框架。

摘要:
rabbitmq作为消息队列可以有消息消费确认机制,之前写个基于redis的通用生产者消费者并发框架,redis的list结构可以简单充当消息队列,但不具备消费确认机制,随意关停程序,会丢失一部分正在程序中处理但还没执行完的消息。基于redis的与基于rabbitmq相比对消息消费速度和消息数量没有天然的支持。#coding=utf-8"""一个通用的rabbitmq生产者和消费者。使用多个线程消费同一个消息队列。self.channel.basic_publishself.logger.debugself.count_per_minute+=1iftime.time()-self._current_time˃60:self._init_count()self.logger.infoclassRabbitmqConsumer:def__init__:""":paramqueue_name::paramconsuming_function:处理消息的函数,函数有且只能有一个参数,参数表示消息。

rabbitmq作为消息队列可以有消息消费确认机制,之前写个基于redis的通用生产者 消费者 并发框架,redis的list结构可以简单充当消息队列,但不具备消费确认机制,随意关停程序,会丢失一部分正在程序中处理但还没执行完的消息。基于redis的与基于rabbitmq相比对消息消费速度和消息数量没有天然的支持。

使用rabbitmq的最常用库pika

不管是写代码还是运行起来都比celery使用更简单,基本能够满足绝大多数场景使用,用来取代celery worker模式(celery有三个模式,worker模式最常用,其余是定时和间隔时间两种模式)的后台异步的作用。

#coding=utf-8
"""
一个通用的rabbitmq生产者和消费者。使用多个线程消费同一个消息队列。
"""
from collections importCallable
importfunctools
importtime
from threading importLock
from pika importBasicProperties
#noinspection PyUnresolvedReferences
from app.utils_ydf import(LoggerMixin, LogManager, decorators, RabbitMqHelper, BoundedThreadPoolExecutor)
classRabbitmqPublisher(LoggerMixin):
    def __init__(self, queue_name, log_level_int=1):
        self._queue_name =queue_name
        self.logger.setLevel(log_level_int * 10)
        channel =RabbitMqHelper().creat_a_channel()
        channel.queue_declare(queue=queue_name, durable=True)
        self.channel =channel
        self.lock =Lock()
        self._current_time =None
        self.count_per_minute =None
        self._init_count()
        self.logger.info(f'{self.__class__} 被实例化了')
    def_init_count(self):
        self._current_time =time.time()
        self.count_per_minute =0
    defpublish(self, msg):
        with self.lock:  #亲测pika多线程publish会出错。
            self.channel.basic_publish(exchange='',
                                       routing_key=self._queue_name,
                                       body=msg,
                                       properties=BasicProperties(
                                           delivery_mode=2,  #make message persistent
)
                                       )
            self.logger.debug(f'放入 {msg} 到 {self._queue_name} 队列中')
            self.count_per_minute += 1
            if time.time() - self._current_time > 60:
                self._init_count()
                self.logger.info(f'一分钟内推送了 {self.count_per_minute} 条消息到 {self.channel.connection} 中')
classRabbitmqConsumer(LoggerMixin):
    def __init__(self, queue_name, consuming_function: Callable = None, threads_num=100, max_retry_times=3, log_level=1, is_print_detail_exception=True):
        """
        :param queue_name:
        :param consuming_function: 处理消息的函数,函数有且只能有一个参数,参数表示消息。是为了简单,放弃策略和模板来强制参数。
        :param threads_num:
        :param max_retry_times:
        :param log_level:
        :param is_print_detail_exception:
        """
        self._queue_name =queue_name
        self.consuming_function =consuming_function
        self.threadpool =BoundedThreadPoolExecutor(threads_num)
        self._max_retry_times =max_retry_times
        self.logger.setLevel(log_level * 10)
        self.logger.info(f'{self.__class__} 被实例化')
        self._is_print_detail_exception =is_print_detail_exception
        self.rabbitmq_helper = RabbitMqHelper(heartbeat_interval=30)
        channel =self.rabbitmq_helper.creat_a_channel()
        channel.queue_declare(queue=self._queue_name, durable=True)
        channel.basic_qos(prefetch_count=threads_num)
        self.channel =channel
        LogManager('pika.heartbeat').get_logger_and_add_handlers(1)
    @decorators.keep_circulating(1)  #是为了保证无论rabbitmq异常中断多久,无需重启程序就能保证恢复后,程序正常。
    defstart_consuming_message(self):
        defcallback(ch, method, properties, body):
            msg =body.decode()
            self.logger.debug(f'从rabbitmq取出的消息是:  {msg}')
            #ch.basic_ack(delivery_tag=method.delivery_tag)
            self.threadpool.submit(self.__consuming_function, ch, method, properties, msg)
        self.channel.basic_consume(callback,
                                   queue=self._queue_name,
                                   #no_ack=True
)
        self.channel.start_consuming()
    @staticmethod
    defack_message(channelx, delivery_tagx):
        """Note that `channel` must be the same pika channel instance via which
        the message being ACKed was retrieved (AMQP protocol constraint).
        """
        ifchannelx.is_open:
            channelx.basic_ack(delivery_tagx)
        else:
            #Channel is already closed, so we can't ACK this message;
            #log and/or do something that makes sense for your app in this case.
            pass
    def __consuming_function(self, ch, method, properties, msg, current_retry_times=0):
        if current_retry_times <self._max_retry_times:
            #noinspection PyBroadException
            try:
                self.consuming_function(msg)
                #ch.basic_ack(delivery_tag=method.delivery_tag)
self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag))
            exceptException as e:
                self.logger.error(f'函数 {self.consuming_function}  第{current_retry_times+1}次发生错误,
 原因是{e}', exc_info=self._is_print_detail_exception)
                self.__consuming_function(ch, method, properties, msg, current_retry_times + 1)
        else:
            self.logger.critical(f'达到最大重试次数 {self._max_retry_times} 后,仍然失败')
            #ch.basic_ack(delivery_tag=method.delivery_tag)
self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag))
if __name__ == '__main__':
    rabbitmq_publisher = RabbitmqPublisher('queue_test')
    [rabbitmq_publisher.publish(str(i)) for i in range(1000)]
    deff(msg):
        print('....  ', msg)
        time.sleep(10)  #模拟做某事需要10秒种。

    rabbitmq_consumer = RabbitmqConsumer('queue_test', consuming_function=f, threads_num=20)
    rabbitmq_consumer.start_consuming_message()

1、放入任务 (图片鼠标右键点击新标签打开查看原图)

rabbitmq 生产者 消费者(多个线程消费同一个队列里面的任务。) 一个通用rabbitmq消费确认,快速并发运行的框架。第1张/2、

2、开启消费者,写一个函数传给消费者类。

rabbitmq 生产者 消费者(多个线程消费同一个队列里面的任务。) 一个通用rabbitmq消费确认,快速并发运行的框架。第2张

3、并发运行效果。

rabbitmq 生产者 消费者(多个线程消费同一个队列里面的任务。) 一个通用rabbitmq消费确认,快速并发运行的框架。第3张

rabbitmq这个专业的消息中间件就是比redis作为消息中间件专业了很多。

rabbitmq 生产者 消费者(多个线程消费同一个队列里面的任务。) 一个通用rabbitmq消费确认,快速并发运行的框架。第4张

免责声明:文章转载自《rabbitmq 生产者 消费者(多个线程消费同一个队列里面的任务。) 一个通用rabbitmq消费确认,快速并发运行的框架。》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇percona-toolkit介绍及安装java 日期validate下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

MQTT-Client-FrameWork使用整理

作者: wbl MQTT MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和制动器(比如通过Twitter让房屋联网)的通信协议 MQTT特点 MQTT协议是为大量计算...

rabbitmq消费端加入精确控频。

控制频率之前用的是线程池的数量来控制,很难控制。因为做一键事情,做一万次,并不是每次消耗的时间都相同,所以很难推测出到底多少线程并发才刚好不超过指定的频率。 现在在框架中加入控频功能,即使开200线程,也能保证1秒钟只运行10次任务。 里面的rabbitpy后来加的,然来是使用pika的,就框架本身得实现写法上违反了开闭原则,没设计太好。好在不影响调用方式...

RocketMQ双主双从集群搭建

1 各角色介绍 Producer:消息的发送者;举例:发信者 Consumer:消息接收者;举例:收信者 Broker:暂存和传输消息;举例:邮局 NameServer:管理Broker;举例:各个邮局的管理机构 Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息 M...

rabbitMq实战使用

只做下工作记录,比较重要的几个属性: concurrency:一个生产者可以同时由多少个消费者消费,这个一般根据你的机器性能来进行配置 prefetch:允许为每个consumer指定最大的unacked messages数目。要是对实时性要求很高的话,prefetch应该设置成1,concurrency的值调高点 队列中Ready状态和Unacknowl...

SOAP、SOCKET协议

一、SOAP( SOAP:Simple Object Access Protocol) 简单对象访问协议,简单对象访问协议(SOAP)是一种轻量的、简单的、基于 XML 的协议,它被设计成在 WEB 上交换结构化的和固化的信息。 SOAP 可以和现存的许多因特网协议和格式结合使用,包括超文本传输协议( HTTP),简单邮件传输协议(SMTP),多用途网际邮件...

Redis之Stream

【Stream简介】Redis5.0增加了一种新的数据结构:Stream,它是一个支持多播的可持久化消息队列。Stream的结构是一个链表,将所有的消息都串起来,每个消息都有一个唯一的ID和对应的内容。消息是持久化的,Redis重启后,内容还在。和其它的结构一样,结构上的不同,都是value不同,key都是字符串形式的。key就是Stream这个结构的名称...