RabbitMQ消息队列

摘要:
MQ全称MessageQueue,它是一种应用程序对应用程序的通信方式。此时我们就可以以管理员身份打开cmd,输入:netstartrabbitmq启动服务;输入:netstoprabbitmq关闭服务  三、RabbitMQ简单模式在使用过程中,始终贯穿着三个部分,一是生产者,二是消费者,三是RabbitMQServer,生产者是往消息队列中放数据的,而消费者是从消息队列中取数据的。')#这是关闭连接connection.close()消费者,consumer.pyimportpika#连接rabbitMQconnection=pika.BlockingConnectionchannel=connection.channel()#创建队列,这里也是创建队列的意思,消费者和生产者说不定哪一个先启动,所以谁先启动就谁创建,当另一个进来后。

  一、简介

RabbitMQ是一个在AMQP基础上完整的、可复用的企业消息系统,遵循Mozilla Public License开源协议。MQ全称Message Queue(消息队列),它是一种应用程序对应用程序的通信方式。应用程序通过读写入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接他们。消息传递指的是程序之间通过在消息中发送数据通信,而不是直接调用彼此来通信,直接调用通常用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

应用场景:

1,系统集成,分布式系统的设计。各种子系统通过消息来对接,这种解决方案也逐步发展成一种架构风格,即‘通过消息传递的架构’

2,当系统中的同步处理方式严重影响了吞吐量,比如日志记录。假如需要记录系统中所有的用户行为日志,如果通过同步的方式记录日志势必会影响系统的响应速度,当我们将日志消息发送到消息队列,记录日志的子系统就会通过异步的方式拿到日志消息。

3,系统的高可用性,比如电商的秒杀场景,当某一时刻应用服务器或数据库服务器收到大量请求,将会出现系统宕机。如果能够将请求转发到消息队列,再由服务器去拿到这些消息,将会使得请求平稳,提高系统的可用性。

  二、下载及安装

  1,安装erlang,在官网下载,然后一直点下一步进行安装

  2,安装RabbitMQ,也是官网下载,直接安装

  3,配置

用cmd进入到RabbitMQ Server abbitmg_server-3.6.5sbin目录下,输入:rabbitmg-plugins enable rabbitmg-management,这样就配置好了。此时我们就可以以管理员身份打开cmd,输入:net start rabbitmq 启动服务;输入:net stop rabbitmq 关闭服务

  三、RabbitMQ简单模式

在使用过程中,始终贯穿着三个部分,一是生产者,二是消费者,三是RabbitMQ Server(是运行在某个服务器上的),生产者是往消息队列中放数据的,而消费者是从消息队列中取数据的。我们是在python中实现的,所以得安装一个pika的模块,帮我们连接队列。

  1,基本代码

生产者,producer.py

importpika
#连接rabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
channel =connection.channel()
#创建队列,队列名为‘hello’,这个名字随意
channel.queue_declare(queue='hello')
#往队列里添加值,routing_key是表示我们要往‘hello’队列放数据,body表示我们这次放入的数据为‘hello world’
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
#这是关闭连接 connection.close()

消费者,consumer.py

importpika
#连接rabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
channel =connection.channel()
#创建队列,这里也是创建队列的意思,消费者和生产者说不定哪一个先启动,所以谁先启动就谁创建,当另一个进来后。如果队列存在了,就不创建了
channel.queue_declare(queue='hello')
#回调函数
defcallback(ch, method, properties, body):
    print(body)
#确定监听队列事件,当队列里有值,就会取值,然后返回给回调函数
channel.basic_consume( callback,
                       queue='hello',
                       no_ack=True)
#这才是真正的开始监听
channel.start_consuming()

  2,no_ack参数

2.1 no_ack=True时,为无应答模式,这里的应答指的是消费者不给队列回应。这种情况下,消费者从队列中拿走一条数据,队列会立即把这条数据删掉,当消费者在处理这条数据时出现错误导致消费者断开而没有完成任务时,消费者是不可能再次从队列里拿到刚才的那条数据,也就意味着这条数据没有处理但是消失了,从而这条数据永远也得不到处理了。

2.2 no-ack=false,为应答模式,消费者每取一条数据,当处理成功后会给队列一个应答,此时,队列收到应答才会把数据删除;当消费者处理数据失败而没有给队列应答,队列是不会删除这条数据,等着下一个消费者再次来取这个数据,当收到应答后才会删除这条数据

2.3 代码,这过程只是消费者与队列的关系变化,所以只用改变消费者的代码既可

消费者,consumer_ack.py

importpika
connection =pika.BlockingConnection(pika.ConnectionParameters(
        host='10.211.55.4'))
channel =connection.channel()
channel.queue_declare(queue='hello')
defcallback(ch, method, properties, body):
    print(body)
#在这加上一句应答 ch.basic_ack(delivery_tag
=method.delivery_tag) channel.basic_consume(callback, queue='hello', no_ack=False) #把no-ack设置为False channel.start_consuming()

消费者在处理过程中由于某种原因(比如bug等)断开连接后,消息是不会丢失的,这个数据会给下一个来拿去数据的消费者

3,durable参数,也就是数据持久化存储

生产者把数据放在队列中,当消费者还没拿取数据,队列所在的服务器崩了,此时,队列里面的数据就会消失了。我们要想吹这种情况,那只有让队列里的数据持久化存储了,这需要我们在定义队列是就应该声明。

生产者,producer_durable.py

importpika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel =connection.channel()
#给durable赋为True既可,也就是让其持久化存储 channel.queue_declare(queue
='hello', durable=True) channel.basic_publish(exchange='', routing_key='hello', body='Hello World!',
#这里要把模式设置为2 properties
=pika.BasicProperties( delivery_mode=2, )) connection.close()

消费者,consumer_durable.py

importpika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel =connection.channel()
#durable设置为True channel.queue_declare(queue
='hello', durable=True) defcallback(ch, method, properties, body): print(body) ch.basic_ack(delivery_tag =method.delivery_tag) channel.basic_consume(callback, queue='hello', no_ack=False) channel.start_consuming()

rabbitMQ服务器宕机,数据不丢失

4,消息获取顺序

队列的数据默认是按照先后顺序取值,也就是有三个消费者,假如第一波取值顺序为a-b-c,那以后的顺序都是a-b-c,不管a处理数据的快慢,比如说a还在处理数据,然而b已经处理完了,但b还是不能拿值,必须a先拿值,然后b才能拿值。这种形式效率太低。

channel.basic_qos(prefetch_count=1)设置这个参数后,就不是按顺序取值,而是谁先来谁取值。这只是消费者有关的设置。

消费者,consumer_prefetch.py

importpika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel =connection.channel()
channel.queue_declare(queue='hello')
defcallback(ch, method, properties, body):
    print(body)
    ch.basic_ack(delivery_tag =method.delivery_tag)
#加上这句就行
channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)
channel.start_consuming()

  四、RabbitMQ的exchange模式

  1,发布订阅模式

简单模式下,一条数据只会给一个消费者;发布订阅模式下,一条消息给所有订阅的消费者。

生产者把消息放在一个指定的exchange里面,然后每个消费者创建一个队列跟这个exchange绑定,从而消费者就可以拿到订阅的数据了。

RabbitMQ消息队列第1张

发布者,

importpika

connection =pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.137.208'))
channel =connection.channel()
#模式要更改
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = "Hello World!"
#这里发布者发送消息到exchange channel.basic_publish(exchange='logs', routing_key='', body=message) connection.close()

订阅者

importpikaconnection =pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.137.208'))
channel =connection.channel()
#模式也要改
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')
#随机创建队列
result = channel.queue_declare(exclusive=True)
#拿到队列名字 queue_name
=result.method.queue #把队列绑定到exchange channel.queue_bind(exchange='logs', queue=queue_name) defcallback(ch, method, properties, body): print(body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()

  2,关键字模式

在发布者发布消息时,会含有关键字;而订阅者这次不单单只是把队列跟exchange绑定,还要绑定关键字,当发布者的关键字和绑定的关键字相同时,订阅者才能拿到消息,然而一个队列可以跟一个exchange绑定多个关键字。

RabbitMQ消息队列第2张

发布者

importpika
connection =pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel =connection.channel()

#声明一个交换机
channel.exchange_declare(exchange='direct_logs',exchange_type="direct")

message ="warning: Hello World!"channel.basic_publish(exchange='direct_logs',
                      routing_key='warning',    #这是发布者发送消息的带的关键字
                      body=message)connection.close()

订阅者

importpika
connection =pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel =connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name =result.method.queue

channel.queue_bind(exchange='direct_logs',
                   queue=queue_name,
                   routing_key="error")#这是队列跟exchange绑定的关键字defcallback(ch, method, properties, body):
    print(body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

  3,模糊匹配

这是基于关键字的,但这次不是要相同了,而是用模糊匹配,‘#’代表匹配0或多个字符,‘*’表示匹配一个任意字符

发布者

importpika
connection =pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel =connection.channel()

#声明一个交换机
channel.exchange_declare(exchange='topic_logs',exchange_type="topic")

message ="Hello World!"channel.basic_publish(exchange='topic_logs',
                      routing_key='banana.apple.xigua.juzi',     #这是发布时带着的关键字
                      body=message)connection.close()

订阅者

importpika
importsys

connection =pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel =connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

result = channel.queue_declare(exclusive=True)
queue_name =result.method.queue

channel.queue_bind(exchange='topic_logs',
                   queue=queue_name,
                   routing_key="*.apple.#")    #这是队列跟exchange绑定的关键字,但这里是模糊匹配,能匹配上,就可以拿到值
defcallback(ch, method, properties, body): print(body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()

免责声明:文章转载自《RabbitMQ消息队列》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Android Studio项目导入aar包报错go errors转string下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

wcf通道Channel

正文       客户端与服务进行交互的过程是通过通道进行交互的。客户端通过调用代理类执行相应的方法,通过通道编码,调用上下文,传输客户端的事务,管理可靠会话,对消息正文的加密,最后要执行的通道是传输通道就像我们七层的最后一层是物理传输层与服务端的那一头的传输通道交接。服务端拿到以后会逐个拆包,然后交给分发器,分发器交给对应的服务处理。         ...

高并发处理思路与手段(三):消息队列

一、消息队列在实际场景中的使用     流程A在处理时没有在当前线程同步的处理完而是直接发送了一条消息A1到队列里,然后消息队列过了一段时间(可能是几毫秒 几秒 几分钟)这个消息开始被处理,消息处理的过程就相当于流程A被处理;当然这只是一个简单的模型下面我们套用实际的场景来看一下,比如下单成功后发送短信提醒;如果没有消息队列我们会选择同步调用...

软件需求分析—消息管理

软件简介:该软件主要为大学生提供一个寻找丢失物品的平台,帮助丢失物品者或者捡到物品者找到相应的物品或者失主。 N(need)需求:对于这样一个寻找丢失物品的平台,对于后台数据库消息的管理是非常重要的,要将丢失物品者和捡到物品者的消息有条理的保存,并且要对垃圾消息及时的处理。 A(approach)做法:消息管理主要体现在后台的数据库的管理上,实现上应该在用...

[转]ZABBIX API简介及使用

API简介 Zabbix API开始扮演着越来越重要的角色,尤其是在集成第三方软件和自动化日常任务时。很难想象管理数千台服务器而没有自动化是多么的困难。Zabbix API为批量操作、第三方软件集成以及其他作用提供可编程接口。 Zabbix API是在1.8版本中开始引进并且已经被广泛应用。所有的Zabbix移动客户端都是基于API,甚至原生的WEB前端部...

WCF学习笔记1(体系架构和行为扩展)

引用《WCF服务编程》里的一段话:“以WCF为基础框架搭建面向服务的企业级应用程序,以WF工作流引擎支撑企业应用中业务流程的传递与控制,以Cardspace和WCF固有的安全测罗保证企业信息的安全,最后以ASP.NET AJAX,WPF和SILVERLIGHT技术丰富客户端界面的绚丽表现,从而改善企业客户对应用程序的体验,这就是微软实现企业级应用的霸业宏图...

wpf mvvm模式下 在ViewModel关闭view

本文只是博主用来记录笔记,误喷 使用到到了MVVM中消息通知功能 第一步:在需要关闭窗体中注册消息   1 public UserView() 2 { 3 this.DataContext = new UserViewModel(); 4 InitializeComponent();...