Node.js连接RabbitMQ,断线重连,动态绑定routing key

摘要:
以下将给出基于Promise的写作方法。CH){return“”}msg=JSON。stringify//指定交换机ex、routingkey和消息内容CH.publish}当链接rabbitMQ断开时,主动重新连接函数reconnectRabbitMq(){log.inconconnectRabbitmq()}连接rabbitMQ函数的主函数connectRabbit Mq((amqp.connect.then.then.catch)}动态绑定或解除绑定队列路由键函数toggleBindQueue{returnnewPromise((resolve,reject)=>{if(!否则,如果{log.inforturnCH.bindidQueue}否则,解除绑定{return CH.unbindQueue}})。然后catch})}模块导出={connectRabbitMq,toggleBindQueue,publishMessage}用于加入服务器的方法是Express,所以在应用程序中。js,您可以…const{connectRabbitMq}=requireconnectRabbit mq()…完成代码//连接mq。jsconstamqp=require//rabbitMQ地址const{amqpAddrHost}=require//switch-nameconstex='amqTopic'constmqpAddr=`amqp://${amqpAdderHost}`//读取主机名。例如,在K8S中运行多个实例时,HOSTNAME可以获得当前pod的名称。

RabbitMQ官方提供的教程https://www.rabbitmq.com/tuto...,是基于回调的。

下面将给出基于Promise式的写法。并且实现动态的队列绑定

初始化配置

const amqp = require('amqplib')
// rabbitMQ地址
const {amqpAddrHost} = require('../config/index.js')

// 交换机名称
const ex = 'amq.topic'

const amqpAddr = `amqp://${amqpAddrHost}`

// 读取HOSTNAME, 在跑多实例时,例如在k8s中,HOSTNAME可以获取当前pod的名称
// 多实例时,写日志,或者建立连接时,最好带上pod名称,如果出现问题,也比较好定位哪个pod出现的问题。
const hostName = process.env.HOSTNAME

// 队列的属性设置
// 一般来说,最好设置队列自动删除autoDelete,当链接断开时,队列也会删除,这样不会产生非常多的无用队列
// durable是用来的持久化的,最好也可以设置成不持久化

const queueAttr = {autoDelete: true, durable: false}

// 定义channel的引用,当链接建立时,所有方法都可以通过引用CH来获取channel方法
let CH = null
向队列发送消息的函数

// 向队列发送消息的函数
function publishMessage (msg) {
  if (!CH) {
    return ''
  }

  msg = JSON.stringify(msg)
  // 指定交换机ex, routing key, 以及消息的内容
  CH.publish(ex, eventBusTopic, Buffer.from(msg))
}
当链接rabbitMQ断开时,要主动去重连

function reconnectRabbitMq () {
  log.info('reconnect_rabbit_mq')
  connectRabbitMq()
}
连接rabbitMQ的主要函数

function connectRabbitMq () {
  amqp.connect(amqpAddr, {
    // 设置connection_name的属性,可以在rabbitMQ的控制台的UI上,看到连接是来自哪个实例
    clientProperties: {
      connection_name: hostName
    }
  })
  .then((conn) => {
    log.info('rabbitmq_connect_successd')
    // 一定要加上链接的报错事件处理,否则一旦报error错,如果不处理这个错误,程序就会崩溃
    // error是个特别的事件,务必要处理的
    // 报错就直接去重连
    conn.on('error', (err) => {
      log.error('connect_error ' + err.message, err)
      reconnectRabbitMq()
    })
    // 创建channel
    return conn.createChannel()
  })
  .then((ch) => {
    CH = ch
    // 初始化交换机
    ch.assertExchange(ex, 'topic', {durable: true})
    // 初始化一个队列,队列名就用hostName, 比较容易从对列名上知道是哪个实例创建的队列
    return ch.assertQueue(hostName, queueAttr)
  })
  .then((q) => {
    // 可以在队列初始化完毕就立即绑定routing key, 也可以暂时不绑定,后续动态的绑定
    // CH.bindQueue(q.queue, ex, 'some.topic.aaa')
    // 消费者,获取消息
    CH.consume(q.queue, (msg) => {
      var _msg = msg.content.toString()
      var MSG = JSON.parse(_msg)
      log.info(_msg, MSG)
    }, {noAck: true})
  })
  .catch((err) => {
    console.log(err)
  })
}
动态给队列绑定或者解绑routing key

function toggleBindQueue (routingKey, bind) {
  return new Promise((resolve, reject) => {
    if (!CH) {
      log.error('channel not established')
      reject(new Error('channel not established'))
      return ''
    }
    // 初始化队列,如果队列已经存在,就会直接使用
    CH.assertQueue(`${hostName}`, queueAttr)
    .then((q) => {
      // 如果bind是true,就绑定。否则就解绑
      if (bind) {
        log.info(`bindQueue ${hostName} ${topic}`)
        return CH.bindQueue(q.queue, ex, topic)
      } else {
        return CH.unbindQueue(q.queue, ex, topic)
      }
    })
    .then((res) => {
      resolve()
    })
    .catch((err) => {
      reject(err)
      log.error(err)
    })
  })
}

module.exports = {
  connectRabbitMq,
  toggleBindQueue,
  publishMessage
}
使用方法

加入你的服务端用的是Express, 那么在app.js中可以


...
const {connectRabbitMq} = require('./connect-mq.js')
connectRabbitMq()
...
完整代码

// onnect-mq.js
const amqp = require('amqplib')
// rabbitMQ地址
const {amqpAddrHost} = require('../config/index.js')

// 交换机名称
const ex = 'amq.topic'

const amqpAddr = `amqp://${amqpAddrHost}`

// 读取HOSTNAME, 在跑多实例时,例如在k8s中,HOSTNAME可以获取当前pod的名称
// 多实例时,写日志,或者建立连接时,最好带上pod名称,如果出现问题,也比较好定位哪个pod出现的问题。
const hostName = process.env.HOSTNAME

// 队列的属性设置
// 一般来说,最好设置队列自动删除autoDelete,当链接断开时,队列也会删除,这样不会产生非常多的无用队列
// durable是用来的持久化的,最好也可以设置成不持久化

const queueAttr = {autoDelete: true, durable: false}

// 定义channel的引用,当链接建立时,所有方法都可以通过引用CH来获取channel方法
let CH = null


// 向队列发送消息的函数
function publishMessage (msg) {
  if (!CH) {
    return ''
  }

  msg = JSON.stringify(msg)
  // 指定交换机ex, routing key, 以及消息的内容
  CH.publish(ex, eventBusTopic, Buffer.from(msg))
}

// 当链接rabbitMQ断开时,要主动去重连
function reconnectRabbitMq () {
  log.info('reconnect_rabbit_mq')
  connectRabbitMq()
}

// 链接rabbitMQ的主要函数
function connectRabbitMq () {
  amqp.connect(amqpAddr, {
    // 设置connection_name的属性,可以在rabbitMQ的控制台的UI上,看到链接是来自哪个实例
    clientProperties: {
      connection_name: hostName
    }
  })
  .then((conn) => {
    log.info('rabbitmq_connect_successd')
    // 一定要加上链接的报错事件处理,否则一旦报error错,如果不处理这个错误,程序就会崩溃
    // error是个特别的事件,务必要处理的
    // 报错就直接去重连
    conn.on('error', (err) => {
      log.error('connect_error ' + err.message, err)
      reconnectRabbitMq()
    })
    // 创建channel
    return conn.createChannel()
  })
  .then((ch) => {
    CH = ch
    // 初始化交换机
    ch.assertExchange(ex, 'topic', {durable: true})
    // 初始化一个队列,队列名就用hostName, 比较容易从对列名上知道是哪个实例创建的队列
    return ch.assertQueue(hostName, queueAttr)
  })
  .then((q) => {
    // 可以在队列初始化完毕就立即绑定routing key, 也可以暂时不绑定,后续动态的绑定
    // CH.bindQueue(q.queue, ex, 'some.topic.aaa')
    // 消费者,获取消息
    CH.consume(q.queue, (msg) => {
      var _msg = msg.content.toString()
      var MSG = JSON.parse(_msg)
      log.info(_msg, MSG)
    }, {noAck: true})
  })
  .catch((err) => {
    console.log(err)
  })
}


// 动态给队列绑定或者解绑routing key
function toggleBindQueue (routingKey, bind) {
  return new Promise((resolve, reject) => {
    if (!CH) {
      log.error('channel not established')
      reject(new Error('channel not established'))
      return ''
    }
    // 初始化队列,如果队列已经存在,就会直接使用
    CH.assertQueue(`${hostName}`, queueAttr)
    .then((q) => {
      // 如果bind是true,就绑定。否则就解绑
      if (bind) {
        log.info(`bindQueue ${hostName} ${topic}`)
        return CH.bindQueue(q.queue, ex, topic)
      } else {
        return CH.unbindQueue(q.queue, ex, topic)
      }
    })
    .then((res) => {
      resolve()
    })
    .catch((err) => {
      reject(err)
      log.error(err)
    })
  })
}

module.exports = {
  connectRabbitMq,
  toggleBindQueue,
  publishMessage
}

来源:https://segmentfault.com/a/1190000016807727

免责声明:文章转载自《Node.js连接RabbitMQ,断线重连,动态绑定routing key》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇虚拟机安装中标麒麟桌面版7.0系统 + 升级Firefox浏览器Jq 操作json下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

C语言两个libxml2库使用的问题

最近使用libxml2想做点东西,翻看一些example后还是有些疑问,去segmentfault问了下,感谢@pingjiang的热心解答,问题解决,记录如下 (一)如下是一个XML文件,p为根结点 <p> <one>1</one> <two>2</two> <th...

控制Windows音量

public partial class AdjustVolume { [DllImport("user32.dll", CharSet = CharSet.Auto, SetLastError = true)] static extern IntPtr SendMessage(IntPtr hWnd, uint M...

C++17结构化绑定

动机 std::map<K, V>的insert方法返回std::pair<iterator, bool>,两个元素分别是指向所插入键值对的迭代器与指示是否新插入元素的布尔值,而std::map<K, V>::iterator解引用又得到键值对std::pair<const K, V>。在一个涉及std::m...

如来神掌第一式第十四招----HAPROXY详解

################################################################################ Name : Mahavairocana # Author : Mahavairocana # QQ : 10353512 # WeChat : shenlan-qianlan # Blog :...

echarts实时数据图表

import React, { PureComponent } from 'react'; import ReactEcharts from 'echarts-for-react'; import moment from 'moment'; export defaultclass Charts extends PureComponent { getO...

Android深入浅出之 AudioTrack分析

Android深入浅出之Audio 第一部分 AudioTrack分析 一 目的 本文的目的是通过从Audio系统来分析Android的代码,包括Android自定义的那套机制和一些常见类的使用,比如Thread,MemoryBase等。 分析的流程是: l         先从API层对应的某个类开始,用户层先要有一个简单的使用流程。 l        ...