一种基于Python Pika库的RabbitMQ Client简单封装

摘要:
代码参考注意代码Github地址:https://github.com/HanseyLee/RabbitMQClient#!/usr/bin/python#-*-coding:utf-8-*-importpikaimporthashlibimportjsondefgetMd5(input_str):""":paramstrinput_str:Unicode-objectsmustbeencoded
代码

Github地址:https://github.com/HanseyLee/RabbitMQClient

#!/usr/bin/python
# -*- coding:utf-8 -*-
import pika
import hashlib
import json

def getMd5(input_str):
    """
    :param str input_str: Unicode-objects must be encoded before hashing
    :rtype: str
    """
    hash_obj = hashlib.md5(input_str.encode("utf-8"))
    return hash_obj.hexdigest()

class RabbitMQClient:
    """RabbitMQClient using pika library

    default: exchange type is 'topic', routing key is '#', dead letter exchange is 'DLX' and dead letter queue is 'DLQ'.
    """
    __default_exchange_type = "topic"
    # (hash) can substitute for zero or more words, * (star) can substitute for exactly one word.
    __default_routing_key = "#"
    __default_DeadLetterExchange = "DLX"
    __default_DeadLetterQueue = "DLQ"

    def __init__(self, username, password, host, port=5672):
        self.host = str(host)
        self.port = int(port)
        # set heartbeat=0, deactivate heartbeat default
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host,
			port=self.port, credentials=pika.PlainCredentials(username,password), heartbeat=0))  
        self.channel = self.connection.channel()

    #
    # basic operations
    #

    def close_connection(self):
        self.connection.close()

    def declare_exchange(self, exchange, exchange_type=__default_exchange_type):
        self.channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=True)

    def delete_exchange(self, exchange):
        self.channel.exchange_delete(exchange=exchange)

    def declare_queue(self, queue):
        self.channel.queue_declare(queue=queue, durable=True)

    def declare_queue_dlx(self, queue, dlx=__default_DeadLetterQueue):
        self.channel.queue_declare(queue=queue, durable=True, arguments={'x-dead-letter-exchange': dlx})

    def declare_queue_ttl(self, queue, ttl_seconds):
        self.channel.queue_declare(queue=queue, durable=True, arguments={'x-message-ttl': ttl_seconds})

    def delete_queue(self, queue):
        self.channel.queue_delete(queue=queue)

    def bind_exchange_queue(self, queue, exchange, binding_key=__default_routing_key):
        self.channel.queue_bind(queue=queue, exchange=exchange, routing_key=binding_key)

    #
    # combined operations
    #

    def declare_dlx_dlq(self, dlx=__default_DeadLetterExchange, dlq=__default_DeadLetterQueue):
        """
        :param str dlx: dead letter exchange
        :param str dlq: dead letter queue
        """

        self.declare_exchange(exchange=dlx, exchange_type='fanout')
        self.declare_queue(queue=dlq)
        self.bind_exchange_queue(exchange=dlx, queue=dlq)

    def publish(self, message, exchange, queue, routing_key, message_id=None,         
        close_connection=True):
        """
        publish messages with message_id, disk persistency property
        """

        if message_id is None:
            message_id = getMd5(input_str=message)
        self.declare_queue(queue=queue)
        self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message,
            properties=pika.BasicProperties(delivery_mode=2,message_id=message_id,content_type="application/json"))
        if close_connection:
            self.close_connection()

    def consume(self, callback, queue, dlx=__default_DeadLetterExchange, dlq=__default_DeadLetterQueue, 
        exclusive=False, consumer_tag=None,**kwargs):
        self.declare_dlx_dlq(dlx=dlx, dlq=dlq)
        self.channel.basic_consume(queue=queue, on_message_callback=callback, exclusive=exclusive,
            consumer_tag=consumer_tag,**kwargs)
        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.channel.stop_consuming()
            self.close_connection()

        
    @staticmethod
    def ack_message(channel, method):
        channel.basic_ack(delivery_tag=method.delivery_tag)

    @staticmethod
    def reject_to_dlx(channel, method):
        """
        need the queue from which message is consuming has dead letter exchage property
        """
        channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

    @staticmethod
    def transmit(channel, method, properties, message, exchange=__default_DeadLetterExchange, 
        routing_key=__default_routing_key, queue=__default_DeadLetterQueue,handler=None):
        if handler is not None:
            message = handler(message)
        message_id = properties.message_id
        if message_id is None:
            message_id = getMd5(input_str=message)
        channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message,
            properties=pika.BasicProperties(delivery_mode=2,message_id=message_id,content_type="application/json"))
        channel.basic_ack(delivery_tag=method.delivery_tag)
#
### Testing
#
def callback(ch, method, properties, body):
    print("consumer_tag %r, consume_func %r, %r" % (method.consumer_tag, method.routing_key, properties.message_id))
    # RabbitMQClient.transmit(channel=ch, method=method, properties=properties, message=str(body, 'utf-8'), handler=handler)
    RabbitMQClient.ack_message(channel=ch, method=method)

def handler(input_str):
    return "hadled"+input_str

if __name__ == "__main__":
    mqc = RabbitMQClient(username='xxx',password='xxx',host='xxx',port=5672)
    msg = json.dumps({'a':'aaa'})
    queue = "DLQ"
    # mqc.publish(message=msg, exchange='', routing_key=queue, queue=queue)
    # mqc.consume(callback=callback, queue=queue, consumer_tag='consumer-1')    
    print("==done==")
参考

https://stackoverflow.com/questions/18418936/rabbitmq-and-relationship-between-channel-and-connection
https://www.rabbitmq.com/tutorials/tutorial-five-python.html

注意
  • Connection

    • a real TCP connection to the message broker,It is designed to be long-lived。
    • 设置connection属性heartbeat=0, deactivate heartbeat,这样连接就不会超时,一直保持。
  • Channel

    • a virtual connection (AMPQ connection) inside a Connection,designed to be transient。
    • 一个Channel下设置一个consumer
      • Channel instances must not be shared between threads. Channels are not generally thread-safe as it would make no sense to share them among threads. If you have another thread that needs to use the broker, a new channel is needed.
  • Exchange, Routing_key, Queue

    • Exchange --- Routing_key ---> Queue
    • topic形式的Exchange几乎可以模拟其他的所有模式。
        • (star) can substitute for exactly one word. # (hash) can substitute for zero or more words.
      • When special characters "*" (star) and "#" (hash) aren't used in bindings, the topic exchange will behave just like a direct one => direct模式
      • When a queue is bound with "#" (hash) binding key - it will receive all the messages, regardless of the routing key => like in fanout exchange
    • Queue 可以设置TTL(Time To Live)属性,设置一定时间后,消息自动移除队列,单位为妙。
      • queue_declare(queue=queue, durable=True, arguments={'x-message-ttl': ttl_seconds})
  • Publisher

    • 发布、消费消息之前都需要声明所需的Exchange Queue。
    • 推荐每个消息都设置其 message_id 属性,方便后续业务追踪、打点等。
  • Consumer

    • 多消费者 Round-Robin(RR)公平调度消费

      • Each Consumer runs in its own thread allocated from the consumer thread pool. If multiple Consumers are subscribed to the same Queue(In different channnels), the broker uses round-robin to distribute the messages between them equally.
    • 对应消费者,最好在消费消息前指定死亡信件交换器和死信队列(DLX:dead letter exchange, DLQ:dead letter queue)。在消费过程中,不能够处理或处理出现异常的消息可以转发至DLX和DLQ。在转发前也可以对消息进行特定的处理和包装。(如果声明队列的时候指定了DLX属性,如arguments={'x-dead-letter-exchange': dlx}, 消费者在消费时可以直接reject消息,被拒绝的消息会直接到DLX, 这样的好处是不用自己写转发逻辑,缺点是不够灵活,不能够对消息进行处理和包装。)

<全文完>

注:内容同步自同名CSDN博客:https://blog.csdn.net/fzlulee/article/details/98480724

免责声明:文章转载自《一种基于Python Pika库的RabbitMQ Client简单封装》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇List&amp;lt;T&amp;gt;的Sort()方法,传入Comparison&amp;lt;T&amp;gt;比较器aiofiles拆分大文件下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Python——PYQT:控件基本使用

QtGui.QComboBox控件常用函数: .addItem(string) #添加字符串项到Item .addItems(list) #添加列表或元组元素到Item .clear() #清除所有Item .clearEditText() #清除编辑框内容 .count() #返回Item数目 .currentIndex...

RabbitMQ(二):Java 操作队列

1. 简单模式 模型: P:消息的生产者 队列:rabbitmq C:消息的消费者 获取 MQ 连接 public static Connection getConnection() throws IOException, TimeoutException { // 定义一个连接工厂 ConnectionFactory...

用python实现批量替换.doc文件文件内容

整个功能实现最重要的模块是docx这个模块 安装语句 pip install python-docx docx模块只能操作.docx文件,所以在这之前我们要将.doc文件转换成.docx。.doc文件是不能直接转换成.docx文件的,如果直接改后缀名会引起文件打不开或者乱码问题。 所以我们需要将文件另存为.docx格式。 import sys import...

python 最短路径

贾格尔(Jagger)找到一张地图,该地图指示大量宝藏的位置,并希望找到它们。 该地图将几个位置标记为节点和几个边缘,这表示两个位置直接相连。 总共有n个节点和m个边。 贾格尔(Jagger)位于节点1,宝物位于节点n。 当他运行最短路径算法以找出通往宝藏的最短路径时,他突然发现除了他的起始节点和宝藏的位置以外,每个节点都有一个怪物。 节点u上的怪物具有力...

python如何获取公众号下面粉丝的openid

如何获取公众号下面粉丝的openid呢,首先要获取一个access_token,这个token可不是令牌(Token),如何获取这个access_token呢?有两种方法,方法如下: #-*- coding: cp936 -*- #python 27 #xiaodeng #原文在 https://www.cnblogs.com/dengyg200891/p...

Ubuntu18.04安装RabbitMQ

一.安装erlang 由于rabbitMq需要erlang语言的支持,在安装rabbitMq之前需要安装erlang sudo apt-get install erlang-nox 二.安装Rabbitmq 更新源 sudo apt-get update 安装 sudo apt-get install rabbitmq-server 启动、停止、重启...