rabbitmq消费端加入精确控频。

摘要:
现在在框架中加入控频功能,即使开200线程,也能保证1秒钟只运行10次任务。里面的rabbitpy后来加的,然来是使用pika的,就框架本身得实现写法上违反了开闭原则,没设计太好。比之前的使用redis原生list结构作为消息队列取代celery框架。更好,主要是rabbitmq有消费确认的概念,redis没有,对随意关停正在运行的程序会造成任务丢失。只是定义了一个子类,用不用都可以"""classExceptionForRabbitmqRequeue:"""遇到此错误,重新放回队列中"""classRabbitmqClientRabbitPy:"""使用rabbitpy包。为1使用rabbitpy,多线程安全的包。"""ifis_use_rabbitpy:self.rabbit_client=RabbitmqClientRabbitPyelse:self.rabbit_client=RabbitmqClientPikadefget_rabbit_cleint:returnself.rabbit_clientclassRabbitmqPublisher:def__init__:""":paramqueue_name::paramis_use_rabbitpy:是否使用rabbitpy包。

控制频率之前用的是线程池的数量来控制,很难控制。因为做一键事情,做一万次,并不是每次消耗的时间都相同,所以很难推测出到底多少线程并发才刚好不超过指定的频率。

现在在框架中加入控频功能,即使开200线程,也能保证1秒钟只运行10次任务。

里面的rabbitpy后来加的,然来是使用pika的,就框架本身得实现写法上违反了开闭原则,没设计太好。好在不影响调用方式。

与celery相比

在推送任务方面比celery的delay要快,推送的任务小。

使用更简单,没那么花哨给函数加装饰器来注册函数路由。

可以满足生产了。

比之前的使用redis原生list结构作为消息队列取代celery框架。更好,主要是rabbitmq有消费确认的概念,redis没有,对随意关停正在运行的程序会造成任务丢失。

#-*- coding: utf-8 -*-
from collections importCallable
importtime
from threading importLock
importunittest
importrabbitpy
from pika importBasicProperties
#noinspection PyUnresolvedReferences
from rabbitpy.message importProperties
importpika
from pika.adapters.blocking_connection importBlockingChannel
from pymongo.errors importPyMongoError
from app.utils_ydf importLogManager
from app.utils_ydf.mixins importLoggerMixin
from app.utils_ydf importdecorators
from app.utils_ydf importBoundedThreadPoolExecutor
from app importconfig as app_config
LogManager('pika.heartbeat').get_logger_and_add_handlers(1)
LogManager('rabbitpy').get_logger_and_add_handlers(2)
LogManager('rabbitpy.base').get_logger_and_add_handlers(2)
classExceptionForRetry(Exception):
    """为了重试的,抛出错误。只是定义了一个子类,用不用都可以"""
classExceptionForRabbitmqRequeue(Exception):
    """遇到此错误,重新放回队列中"""
classRabbitmqClientRabbitPy:
    """
    使用rabbitpy包。
    """
    #noinspection PyUnusedLocal
    def __init__(self, username, password, host, port, virtual_host, heartbeat=60):
        rabbit_url = f'amqp://{username}:{password}@{host}:{port}/{virtual_host}'
        self.connection =rabbitpy.Connection(rabbit_url)
    def creat_a_channel(self) ->rabbitpy.AMQP:
        return rabbitpy.AMQP(self.connection.channel())  #使用适配器,使rabbitpy包的公有方法几乎接近pika包的channel的方法。
classRabbitmqClientPika:
    """
    使用pika包,多线程不安全的包。
    """
    def __init__(self, username, password, host, port, virtual_host, heartbeat=60):
        """
        parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
        connection = pika.SelectConnection(parameters=parameters,
                                  on_open_callback=on_open)
        :param username:
        :param password:
        :param host:
        :param port:
        :param virtual_host:
        :param heartbeat:
        """
        credentials =pika.PlainCredentials(username, password)
        self.connection =pika.BlockingConnection(pika.ConnectionParameters(
            host, port, virtual_host, credentials, heartbeat=heartbeat))
    def creat_a_channel(self) ->BlockingChannel:
        returnself.connection.channel()
classRabbitMqFactory:
    def __init__(self, username=app_config.RABBITMQ_USER, password=app_config.RABBITMQ_PASS, host=app_config.RABBITMQ_HOST, port=app_config.RABBITMQ_PORT, virtual_host=app_config.RABBITMQ_VIRTUAL_HOST, heartbeat=60, is_use_rabbitpy=1):
        """
        :param username:
        :param password:
        :param port:
        :param virtual_host:
        :param heartbeat:
        :param is_use_rabbitpy: 为0使用pika,多线程不安全。为1使用rabbitpy,多线程安全的包。
        """
        ifis_use_rabbitpy:
            self.rabbit_client =RabbitmqClientRabbitPy(username, password, host, port, virtual_host, heartbeat)
        else:
            self.rabbit_client =RabbitmqClientPika(username, password, host, port, virtual_host, heartbeat)
    defget_rabbit_cleint(self):
        returnself.rabbit_client
classRabbitmqPublisher(LoggerMixin):
    def __init__(self, queue_name, is_use_rabbitpy=1, log_level_int=10):
        """
        :param queue_name:
        :param is_use_rabbitpy: 是否使用rabbitpy包。不推荐使用pika。
        :param log_level_int:
        """
        self._queue_name =queue_name
        self._is_use_rabbitpy =is_use_rabbitpy
        self.logger.setLevel(log_level_int)
        self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=is_use_rabbitpy).get_rabbit_cleint()
        self.channel =self.rabbit_client.creat_a_channel()
        self.queue = self.channel.queue_declare(queue=queue_name, durable=True)
        self._lock_for_pika =Lock()
        self._lock_for_count =Lock()
        self._current_time =None
        self.count_per_minute =None
        self._init_count()
        self.logger.info(f'{self.__class__} 被实例化了')
    def_init_count(self):
        with self._lock_for_count:
            self._current_time =time.time()
            self.count_per_minute =0
    defpublish(self, msg: str):
        ifself._is_use_rabbitpy:
            self._publish_rabbitpy(msg)
        else:
            self._publish_pika(msg)
        self.logger.debug(f'向{self._queue_name} 队列,推送消息 {msg}')
        """
        # 屏蔽统计减少加锁,能加快速度。
        with self._lock_for_count:
            self.count_per_minute += 1
        if time.time() - self._current_time > 60:
            self._init_count()
            self.logger.info(f'一分钟内推送了 {self.count_per_minute} 条消息到 {self.rabbit_client.connection} 中')
        """
    @decorators.tomorrow_threads(100)
    def_publish_rabbitpy(self, msg: str):
        #noinspection PyTypeChecker
self.channel.basic_publish(
            exchange='',
            routing_key=self._queue_name,
            body=msg,
            properties={'delivery_mode': 2},
        )
    def_publish_pika(self, msg: str):
        with self._lock_for_pika:  #亲测pika多线程publish会出错。
            self.channel.basic_publish(exchange='',
                                       routing_key=self._queue_name,
                                       body=msg,
                                       properties=BasicProperties(
                                           delivery_mode=2,  #make message persistent
)
                                       )
    defclear(self):
        self.channel.queue_purge(self._queue_name)
    defget_message_count(self):
        ifself._is_use_rabbitpy:
            returnself._get_message_count_rabbitpy()
        else:
            returnself._get_message_count_pika()
    def_get_message_count_pika(self):
        queue = self.channel.queue_declare(queue=self._queue_name, durable=True)
        returnqueue.method.message_count
    def_get_message_count_rabbitpy(self):
        ch =self.rabbit_client.connection.channel()
        q =rabbitpy.amqp_queue.Queue(ch, self._queue_name)
        q.durable =True
        msg_count = q.declare(passive=True)[0]
        ch.close()
        returnmsg_count
classRabbitmqConsumer(LoggerMixin):
    def __init__(self, queue_name, consuming_function: Callable = None, threads_num=100, max_retry_times=3, log_level=10, is_print_detail_exception=True, msg_schedule_time_intercal=0.0, is_use_rabbitpy=1):
        """
        :param queue_name:
        :param consuming_function: 处理消息的函数,函数有且只能有一个参数,参数表示消息。是为了简单,放弃策略和模板来强制参数。
        :param threads_num:
        :param max_retry_times:
        :param log_level:
        :param is_print_detail_exception:
        :param msg_schedule_time_intercal:消息调度的时间间隔,用于控频
        :param is_use_rabbitpy: 是否使用rabbitpy包。不推荐使用pika.
        """
        self._queue_name =queue_name
        self.consuming_function =consuming_function
        self._threads_num =threads_num
        self.threadpool =BoundedThreadPoolExecutor(threads_num)
        self._max_retry_times =max_retry_times
        self.logger.setLevel(log_level)
        self.logger.info(f'{self.__class__} 被实例化')
        self._is_print_detail_exception =is_print_detail_exception
        self._msg_schedule_time_intercal =msg_schedule_time_intercal
        self._is_use_rabbitpy =is_use_rabbitpy
    defstart_consuming_message(self):
        ifself._is_use_rabbitpy:
            self._start_consuming_message_rabbitpy()
        else:
            self._start_consuming_message_pika()
    @decorators.tomorrow_threads(100)
    @decorators.keep_circulating(1)  #是为了保证无论rabbitmq异常中断多久,无需重启程序就能保证恢复后,程序正常。
    def_start_consuming_message_rabbitpy(self):
        #noinspection PyArgumentEqualDefault
        channel = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint().creat_a_channel()  #type:  rabbitpy.AMQP         #
        channel.queue_declare(queue=self._queue_name, durable=True)
        channel.basic_qos(prefetch_count=self._threads_num)
        for message inchannel.basic_consume(self._queue_name):
            body =message.body.decode()
            self.logger.debug(f'从rabbitmq取出的消息是:  {body}')
            time.sleep(self._msg_schedule_time_intercal)
            self.threadpool.submit(self._consuming_function_rabbitpy, message)
    def _consuming_function_rabbitpy(self, message: rabbitpy.message.Message, current_retry_times=0):
        if current_retry_times <self._max_retry_times:
            #noinspection PyBroadException
            try:
                self.consuming_function(message.body.decode())
                message.ack()
            exceptException as e:
                ifisinstance(e, (PyMongoError, ExceptionForRabbitmqRequeue)):
                    return message.nack(requeue=True)
                self.logger.error(f'函数 {self.consuming_function}  第{current_retry_times+1}次发生错误,
 原因是 {type(e)}  {e}', exc_info=self._is_print_detail_exception)
                self._consuming_function_rabbitpy(message, current_retry_times + 1)
        else:
            self.logger.critical(f'达到最大重试次数 {self._max_retry_times} 后,仍然失败')  #错得超过指定的次数了,就确认消费了。
message.ack()
    @decorators.tomorrow_threads(100)
    @decorators.keep_circulating(1)  #是为了保证无论rabbitmq异常中断多久,无需重启程序就能保证恢复后,程序正常。
    def_start_consuming_message_pika(self):
        channel = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint().creat_a_channel()  #此处先固定使用pika.
        channel.queue_declare(queue=self._queue_name, durable=True)
        channel.basic_qos(prefetch_count=self._threads_num)
        defcallback(ch, method, properties, body):
            body =body.decode()
            self.logger.debug(f'从rabbitmq取出的消息是:  {body}')
            time.sleep(self._msg_schedule_time_intercal)
            self.threadpool.submit(self._consuming_function_pika, ch, method, properties, body)
        channel.basic_consume(callback,
                              queue=self._queue_name,
                              #no_ack=True
)
        channel.start_consuming()
    @staticmethod
    def __ack_message_pika(channelx, delivery_tagx):
        """Note that `channel` must be the same pika channel instance via which
        the message being ACKed was retrieved (AMQP protocol constraint).
        """
        ifchannelx.is_open:
            channelx.basic_ack(delivery_tagx)
        else:
            #Channel is already closed, so we can't ACK this message;
            #log and/or do something that makes sense for your app in this case.
            pass
    def _consuming_function_pika(self, ch, method, properties, body, current_retry_times=0):
        if current_retry_times <self._max_retry_times:
            #noinspection PyBroadException
            try:
                self.consuming_function(body)
                ch.basic_ack(delivery_tag=method.delivery_tag)
                #self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag))
            exceptException as e:
                ifisinstance(e, (PyMongoError, ExceptionForRabbitmqRequeue)):
                    return ch.basic_nack(delivery_tag=method.delivery_tag)
                self.logger.error(f'函数 {self.consuming_function}  第{current_retry_times+1}次发生错误,
 原因是 {type(e)}  {e}', exc_info=self._is_print_detail_exception)
                self._consuming_function_pika(ch, method, properties, body, current_retry_times + 1)
        else:
            self.logger.critical(f'达到最大重试次数 {self._max_retry_times} 后,仍然失败')  #错得超过指定的次数了,就确认消费了。
            ch.basic_ack(delivery_tag=method.delivery_tag)
            #self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag))
#noinspection PyMethodMayBeStatic
class_Test(unittest.TestCase):
    deftest_publish(self):
        rabbitmq_publisher = RabbitmqPublisher('queue_test', is_use_rabbitpy=1, log_level_int=10)
        [rabbitmq_publisher.publish(str(msg)) for msg in range(2000)]
    deftest_consume(self):
        deff(body):
            print('....  ', body)
            time.sleep(10)  #模拟做某事需要阻塞10秒种,必须用并发。

        rabbitmq_consumer = RabbitmqConsumer('queue_test', consuming_function=f, threads_num=200, is_use_rabbitpy=1, msg_schedule_time_intercal=0.5)
        rabbitmq_consumer.start_consuming_message()
if __name__ == '__main__':
    unittest.main()

免责声明:文章转载自《rabbitmq消费端加入精确控频。》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇文档大师 在Win10 IE11下,文档集画面无法正常显示Word等Office文档的解决方法Java ---Listener监听器下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

openssh交叉编译

下载源码包: openssh http://www.openssh.com/portable.html openssl http://www.openssl.org/source zlib    http://www.zlib.net/ 推荐版本:zlib-1.2.11.tar.gz,openssl-1.0.2q.tar.gz,openssh-8.2p1....

Shiro权限管理框架(四):深入分析Shiro中的Session管理

其实关于Shiro的一些学习笔记很早就该写了,因为懒癌和拖延症晚期一直没有落实,直到今天公司的一个项目碰到了在集群环境的单点登录频繁掉线的问题,为了解决这个问题,Shiro相关的文档和教程没少翻。最后问题解决了,但我觉得我也是时候来做一波Shiro学习笔记了。 本篇是Shiro系列第四篇,Shiro中的过滤器初始化流程和实现原理。Shiro基于URL的权限...

(三)Cacti的使用

一、Cacti的使用 1.界面介绍 登陆Cacti后,可以看到左上角是两个选项卡,“console”和“graphs”。console表示控制台,在此进行所有的配置等操作;而graphs则是用来查看所有服务器的性能图像的界面。 2.console菜单 Create: New Graphs——创建新图像的快捷方式; Management: Graph Man...

log4net.redis+logstash+kibana+elasticsearch+redis 实现日志系统

前端时间写了个随笔 log4net.NoSql +ElasticSearch 实现日志记录 ,因项目原因需要把日志根java平台的同事集成采用logstash+kibana+elasticsearch+redis结构实现日志统计分析,所以需要一个将log4net日志输出到redis的组件。没有找到现成的,就自己动手了。参考了 log4net.NoSql的代...

rabbitMq无法消费发送的q的问题

1、问题叙述:   该项目配置了10来个mq,应对新开发需求,我也加了一个mq配置,然后在本地代码当中调用,当中接受,与前面写法相似,项目上测试环境测试。发现发送了queue之后本地消费日志没有的bug。 处理方案:   1、检查我的mq的配置,检查代码中对应mq的配置,然后重试     看不出问题出在哪里,寻找方法本地测试   2、本地测试,本地项目启动...

qemu kvm 虚拟化

虚拟化: KVM是一个基于Linux内核的虚拟机,属于完全虚拟化。虚拟机监控的实现模型有两类:监控模型(Hypervisor)和宿主机模型(Host-based)。由于监控模型需要进行处理器调度,还需要实现各种驱动程序,以支撑运行其上的虚拟机,因此实现难度上一般要大于宿主机模型。KVM的实现采用宿主机模型(Host-based),KVM是集成在Linux内...