nodejs操作消息队列RabbitMQ

摘要:
1、 什么是消息队列消息队列?从字面上看,它本质上是一个队列。FIFO是先进先出,但存储在队列中的内容是消息。为什么生成消息队列?

一. 什么是消息队列

消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
其主要用途:不同进程Process/线程Thread之间通信。

为什么会产生消息队列?有几个原因:

不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;

不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

二. 常用的消息队列

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq

三. 使用场景

异步处理

应用解耦

流量削峰

四 使用amqplib操作RabbitMQ

安装 amqplib

npm install amqplib
生产者:

let amqp = require('amqplib');

class RabbitMQ {
constructor() {
this.hosts = [];
this.index = 0;
this.length = this.hosts.length;
this.open = amqp.connect(this.hosts[this.index]);
}
sendQueueMsg(queueName, msg, errCallBack) {
let self = this;

self.open
.then(function (conn) {
return conn.createChannel();
})
.then(function (channel) {
return channel.assertQueue(queueName).then(function (ok) {
return channel.sendToQueue(queueName, new Buffer(msg), {
persistent: true
});
})
.then(function (data) {
if (data) {
errCallBack && errCallBack("success");
channel.close();
}
})
.catch(function () {
setTimeout(() => {
if (channel) {
channel.close();
}
}, 500)
});
})
.catch(function () {
let num = self.index++;

if (num <= self.length - 1) {
self.open = amqp.connect(self.hosts[num]);
} else {
self.index == 0;
}
});
}
}

let mq = new RabbitMQ();
mq.sendQueueMsg('testQueue', '123', (error) => {
console.log(error)
})
消费者

let amqp = require('amqplib');

class RabbitMQ {
constructor() {
this.hosts = [];
this.index = 0;
this.length = this.hosts.length;
this.open = amqp.connect(this.hosts[this.index]);
}

receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
let self = this;

self.open
.then(function (conn) {
return conn.createChannel();
})
.then(function (channel) {
return channel.assertQueue(queueName)
.then(function (ok) {
return channel.consume(queueName, function (msg) {
if (msg !== null) {
let data = msg.content.toString();
channel.ack(msg);
receiveCallBack && receiveCallBack(data);
}
})
.finally(function () {
setTimeout(() => {
if (channel) {
channel.close();
}
}, 500)
});
})
})
.catch(function () {
let num = self.index++;
if (num <= self.length - 1) {
self.open = amqp.connect(self.hosts[num]);
} else {
self.index = 0;
self.open = amqp.connect(self.hosts[0]);
}
});
}
}

let mq = new RabbitMQ();
mq.receiveQueueMsg('testQueue',(msg) =>
{
console.log(msg)//123
})
打开mq后台 http://127.0.0.1:15672/ 看到新增队列,接受一条消息

当运行消费者代码时输入 123,消息队列消息为0


---------------------

免责声明:文章转载自《nodejs操作消息队列RabbitMQ》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇MIPI DSI协议介绍nginx真实ip配置下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Angular引入第三方库

 原文已经写的很好了。原文链接: https://blog.csdn.net/yuzhiqiang_1993/article/details/71215232        加上2点给自己用,引入bootstrap样式,需要在angular-cli.json的styles中引入。        安装的类型描述文件@types/jquery在node_mod...

Nuxt项目启动或打包时,显示内存不足溢出问题解决方案

大型的Nuxt项目,后期会积攒太多模块和太多静态资源,这样会导致项目启动缓慢,启动和打包时候也会出现内存不足,无法成功的情况,导致这样的原因如下: 在Node中通过JavaScript使用内存时就会发现只能使用部分内存(64位系统下约为1.4 GB,32位系统下约为0.7 GB)。在这样的限制下,将会导致Node无法直接操作大内存对象,比如无法将一个2 G...

搭建高可用的rabbitmq集群 + Mirror Queue + 使用C#驱动连接

  我们知道rabbitmq是一个专业的MQ产品,而且它也是一个严格遵守AMQP协议的玩意,但是要想骚,一定需要拿出高可用的东西出来,这不本篇就跟大家说 一下cluster的概念,rabbitmq是erlang写的一个成品,所以知道如何构建erlang的node集群就ok了,他需要一个统一的cookie机制。。。本篇的测试环境如下: centos1:192...

Ansible原理与安装部署

今天,我们开始学习运维自动化工具Ansible。 一、Ansible原理 1.1 什么是Ansible Ansible一种集成IT系统的配置管理、应用部署、执行特定任务的开源平台/框架。基于Python语言实现,核心模块包括:jinja2、PyYAML和paramiko。Ansible允许重复执行而不出错,客户端无agent,服务端无deamon进程。An...

前端加密MD5

今天接触了MD5加密方式,记录一下使用方法,又去搜了搜关于MD5的详细内容   MD5在vue中使用方法 1、下载MD5模块 cnpm install md5 -S 2、引入模块 const md5 = require("md5") 3、加密 const str = "12345"; console.log(md5(str)...

一、从Windows消息机制说起

    一,消息       消息(Message)指的就是Windows 操作系统发给应用程序的一个通知,它告诉应用程序某个特定的事件发生了。比如,用户单击鼠标或按键都会引发Windows 系统发送相应的消息。最终处理消息的是应用程序的窗口函数,如果程序没处理的话操作系统有默认函数将会作出处理。     从数据结构的角度来说,消息是一个结构体,它包含了...