控制频率之前用的是线程池的数量来控制,很难控制。因为做一键事情,做一万次,并不是每次消耗的时间都相同,所以很难推测出到底多少线程并发才刚好不超过指定的频率。
现在在框架中加入控频功能,即使开200线程,也能保证1秒钟只运行10次任务。
里面的rabbitpy后来加的,然来是使用pika的,就框架本身得实现写法上违反了开闭原则,没设计太好。好在不影响调用方式。
与celery相比
在推送任务方面比celery的delay要快,推送的任务小。
使用更简单,没那么花哨给函数加装饰器来注册函数路由。
可以满足生产了。
比之前的使用redis原生list结构作为消息队列取代celery框架。更好,主要是rabbitmq有消费确认的概念,redis没有,对随意关停正在运行的程序会造成任务丢失。
#-*- coding: utf-8 -*- from collections importCallable importtime from threading importLock importunittest importrabbitpy from pika importBasicProperties #noinspection PyUnresolvedReferences from rabbitpy.message importProperties importpika from pika.adapters.blocking_connection importBlockingChannel from pymongo.errors importPyMongoError from app.utils_ydf importLogManager from app.utils_ydf.mixins importLoggerMixin from app.utils_ydf importdecorators from app.utils_ydf importBoundedThreadPoolExecutor from app importconfig as app_config LogManager('pika.heartbeat').get_logger_and_add_handlers(1) LogManager('rabbitpy').get_logger_and_add_handlers(2) LogManager('rabbitpy.base').get_logger_and_add_handlers(2) classExceptionForRetry(Exception): """为了重试的,抛出错误。只是定义了一个子类,用不用都可以""" classExceptionForRabbitmqRequeue(Exception): """遇到此错误,重新放回队列中""" classRabbitmqClientRabbitPy: """ 使用rabbitpy包。 """ #noinspection PyUnusedLocal def __init__(self, username, password, host, port, virtual_host, heartbeat=60): rabbit_url = f'amqp://{username}:{password}@{host}:{port}/{virtual_host}' self.connection =rabbitpy.Connection(rabbit_url) def creat_a_channel(self) ->rabbitpy.AMQP: return rabbitpy.AMQP(self.connection.channel()) #使用适配器,使rabbitpy包的公有方法几乎接近pika包的channel的方法。 classRabbitmqClientPika: """ 使用pika包,多线程不安全的包。 """ def __init__(self, username, password, host, port, virtual_host, heartbeat=60): """ parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F') connection = pika.SelectConnection(parameters=parameters, on_open_callback=on_open) :param username: :param password: :param host: :param port: :param virtual_host: :param heartbeat: """ credentials =pika.PlainCredentials(username, password) self.connection =pika.BlockingConnection(pika.ConnectionParameters( host, port, virtual_host, credentials, heartbeat=heartbeat)) def creat_a_channel(self) ->BlockingChannel: returnself.connection.channel() classRabbitMqFactory: def __init__(self, username=app_config.RABBITMQ_USER, password=app_config.RABBITMQ_PASS, host=app_config.RABBITMQ_HOST, port=app_config.RABBITMQ_PORT, virtual_host=app_config.RABBITMQ_VIRTUAL_HOST, heartbeat=60, is_use_rabbitpy=1): """ :param username: :param password: :param port: :param virtual_host: :param heartbeat: :param is_use_rabbitpy: 为0使用pika,多线程不安全。为1使用rabbitpy,多线程安全的包。 """ ifis_use_rabbitpy: self.rabbit_client =RabbitmqClientRabbitPy(username, password, host, port, virtual_host, heartbeat) else: self.rabbit_client =RabbitmqClientPika(username, password, host, port, virtual_host, heartbeat) defget_rabbit_cleint(self): returnself.rabbit_client classRabbitmqPublisher(LoggerMixin): def __init__(self, queue_name, is_use_rabbitpy=1, log_level_int=10): """ :param queue_name: :param is_use_rabbitpy: 是否使用rabbitpy包。不推荐使用pika。 :param log_level_int: """ self._queue_name =queue_name self._is_use_rabbitpy =is_use_rabbitpy self.logger.setLevel(log_level_int) self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=is_use_rabbitpy).get_rabbit_cleint() self.channel =self.rabbit_client.creat_a_channel() self.queue = self.channel.queue_declare(queue=queue_name, durable=True) self._lock_for_pika =Lock() self._lock_for_count =Lock() self._current_time =None self.count_per_minute =None self._init_count() self.logger.info(f'{self.__class__} 被实例化了') def_init_count(self): with self._lock_for_count: self._current_time =time.time() self.count_per_minute =0 defpublish(self, msg: str): ifself._is_use_rabbitpy: self._publish_rabbitpy(msg) else: self._publish_pika(msg) self.logger.debug(f'向{self._queue_name} 队列,推送消息 {msg}') """ # 屏蔽统计减少加锁,能加快速度。 with self._lock_for_count: self.count_per_minute += 1 if time.time() - self._current_time > 60: self._init_count() self.logger.info(f'一分钟内推送了 {self.count_per_minute} 条消息到 {self.rabbit_client.connection} 中') """ @decorators.tomorrow_threads(100) def_publish_rabbitpy(self, msg: str): #noinspection PyTypeChecker self.channel.basic_publish( exchange='', routing_key=self._queue_name, body=msg, properties={'delivery_mode': 2}, ) def_publish_pika(self, msg: str): with self._lock_for_pika: #亲测pika多线程publish会出错。 self.channel.basic_publish(exchange='', routing_key=self._queue_name, body=msg, properties=BasicProperties( delivery_mode=2, #make message persistent ) ) defclear(self): self.channel.queue_purge(self._queue_name) defget_message_count(self): ifself._is_use_rabbitpy: returnself._get_message_count_rabbitpy() else: returnself._get_message_count_pika() def_get_message_count_pika(self): queue = self.channel.queue_declare(queue=self._queue_name, durable=True) returnqueue.method.message_count def_get_message_count_rabbitpy(self): ch =self.rabbit_client.connection.channel() q =rabbitpy.amqp_queue.Queue(ch, self._queue_name) q.durable =True msg_count = q.declare(passive=True)[0] ch.close() returnmsg_count classRabbitmqConsumer(LoggerMixin): def __init__(self, queue_name, consuming_function: Callable = None, threads_num=100, max_retry_times=3, log_level=10, is_print_detail_exception=True, msg_schedule_time_intercal=0.0, is_use_rabbitpy=1): """ :param queue_name: :param consuming_function: 处理消息的函数,函数有且只能有一个参数,参数表示消息。是为了简单,放弃策略和模板来强制参数。 :param threads_num: :param max_retry_times: :param log_level: :param is_print_detail_exception: :param msg_schedule_time_intercal:消息调度的时间间隔,用于控频 :param is_use_rabbitpy: 是否使用rabbitpy包。不推荐使用pika. """ self._queue_name =queue_name self.consuming_function =consuming_function self._threads_num =threads_num self.threadpool =BoundedThreadPoolExecutor(threads_num) self._max_retry_times =max_retry_times self.logger.setLevel(log_level) self.logger.info(f'{self.__class__} 被实例化') self._is_print_detail_exception =is_print_detail_exception self._msg_schedule_time_intercal =msg_schedule_time_intercal self._is_use_rabbitpy =is_use_rabbitpy defstart_consuming_message(self): ifself._is_use_rabbitpy: self._start_consuming_message_rabbitpy() else: self._start_consuming_message_pika() @decorators.tomorrow_threads(100) @decorators.keep_circulating(1) #是为了保证无论rabbitmq异常中断多久,无需重启程序就能保证恢复后,程序正常。 def_start_consuming_message_rabbitpy(self): #noinspection PyArgumentEqualDefault channel = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint().creat_a_channel() #type: rabbitpy.AMQP # channel.queue_declare(queue=self._queue_name, durable=True) channel.basic_qos(prefetch_count=self._threads_num) for message inchannel.basic_consume(self._queue_name): body =message.body.decode() self.logger.debug(f'从rabbitmq取出的消息是: {body}') time.sleep(self._msg_schedule_time_intercal) self.threadpool.submit(self._consuming_function_rabbitpy, message) def _consuming_function_rabbitpy(self, message: rabbitpy.message.Message, current_retry_times=0): if current_retry_times <self._max_retry_times: #noinspection PyBroadException try: self.consuming_function(message.body.decode()) message.ack() exceptException as e: ifisinstance(e, (PyMongoError, ExceptionForRabbitmqRequeue)): return message.nack(requeue=True) self.logger.error(f'函数 {self.consuming_function} 第{current_retry_times+1}次发生错误, 原因是 {type(e)} {e}', exc_info=self._is_print_detail_exception) self._consuming_function_rabbitpy(message, current_retry_times + 1) else: self.logger.critical(f'达到最大重试次数 {self._max_retry_times} 后,仍然失败') #错得超过指定的次数了,就确认消费了。 message.ack() @decorators.tomorrow_threads(100) @decorators.keep_circulating(1) #是为了保证无论rabbitmq异常中断多久,无需重启程序就能保证恢复后,程序正常。 def_start_consuming_message_pika(self): channel = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint().creat_a_channel() #此处先固定使用pika. channel.queue_declare(queue=self._queue_name, durable=True) channel.basic_qos(prefetch_count=self._threads_num) defcallback(ch, method, properties, body): body =body.decode() self.logger.debug(f'从rabbitmq取出的消息是: {body}') time.sleep(self._msg_schedule_time_intercal) self.threadpool.submit(self._consuming_function_pika, ch, method, properties, body) channel.basic_consume(callback, queue=self._queue_name, #no_ack=True ) channel.start_consuming() @staticmethod def __ack_message_pika(channelx, delivery_tagx): """Note that `channel` must be the same pika channel instance via which the message being ACKed was retrieved (AMQP protocol constraint). """ ifchannelx.is_open: channelx.basic_ack(delivery_tagx) else: #Channel is already closed, so we can't ACK this message; #log and/or do something that makes sense for your app in this case. pass def _consuming_function_pika(self, ch, method, properties, body, current_retry_times=0): if current_retry_times <self._max_retry_times: #noinspection PyBroadException try: self.consuming_function(body) ch.basic_ack(delivery_tag=method.delivery_tag) #self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag)) exceptException as e: ifisinstance(e, (PyMongoError, ExceptionForRabbitmqRequeue)): return ch.basic_nack(delivery_tag=method.delivery_tag) self.logger.error(f'函数 {self.consuming_function} 第{current_retry_times+1}次发生错误, 原因是 {type(e)} {e}', exc_info=self._is_print_detail_exception) self._consuming_function_pika(ch, method, properties, body, current_retry_times + 1) else: self.logger.critical(f'达到最大重试次数 {self._max_retry_times} 后,仍然失败') #错得超过指定的次数了,就确认消费了。 ch.basic_ack(delivery_tag=method.delivery_tag) #self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag)) #noinspection PyMethodMayBeStatic class_Test(unittest.TestCase): deftest_publish(self): rabbitmq_publisher = RabbitmqPublisher('queue_test', is_use_rabbitpy=1, log_level_int=10) [rabbitmq_publisher.publish(str(msg)) for msg in range(2000)] deftest_consume(self): deff(body): print('.... ', body) time.sleep(10) #模拟做某事需要阻塞10秒种,必须用并发。 rabbitmq_consumer = RabbitmqConsumer('queue_test', consuming_function=f, threads_num=200, is_use_rabbitpy=1, msg_schedule_time_intercal=0.5) rabbitmq_consumer.start_consuming_message() if __name__ == '__main__': unittest.main()