golang-nsq高性能消息队列

摘要:
NSQ具有分布式和分散的拓扑结构,其特点是无单点故障、容错、高可用性和可靠的消息传输。NSQ非常容易配置和部署,具有最大的灵活性,并支持许多消息协议。Topic=test'表示Golang的客户端deb安装将helloworld消息发送到测试主题$curlhttps://raw.githubusercontent.com/golang/dep/master/install.sh|Sh下载依赖包:$gogetgithub。com/nsqio/go nsq Producer:packagemain//Producer importvartcpNsqdAddr=“127.0.0.1:4150”funcmain(){//初始配置配置:=nsq.NewConfig()fori:=0;i˂100;i++{/创建100个Producer tPro,err:=nsq.NewProducer!Consumer:packagemain//Consumer importvartcp NsqdAdd=“127.0.0.1:4150”typeNsqHandlerstruct{/消息数量qCount//ID idnsqHandlerIDstring}funcmin(){//初始化配置config:=nsq.NewConfig()//创建使用者。参数1是订阅的主题,参数2是使用的频道。错误:=nsq.NewConsumerifer!
前言

tips:如果本文对你有用,请爱心点个赞,提高排名,让这篇文章帮助更多的人。谢谢大家!比心❤~
如果解决不了,可以在文末加我微信,进群交流。

NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。

NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。

NSQ 非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。另外,官方还提供了拆箱即用 Go 和 Python 库。如果读者有兴趣构建自己的客户端的话,还可以参考官方提供的协议规范。

网上有人翻译了国外的一篇文章:我们是如何使用NSQ处理7500亿消息的

在这里插入图片描述

安装和部署

官网文档:https://nsq.io/overview/quick_start.html
中文文档:http://wiki.jikexueyuan.com/project/nsq-guide/

我是在ubuntu系统中按照官方操作进行部署测试。

  1. 安装nsq启动服务
    https://nsq.io/deployment/installing.html选择对应的版本,并解压。

    $ tar -zxvf nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
    $ cd nsq-1.2.0.linux-amd64.go1.12.9/bin	
    $ sudo cp ~/Downloads/nsq-1.2.0.linux-amd64.go1.12.9/bin/ -r /usr/local/nsq/bin
    $ sudo vim /etc/profile
    $ source 
    
  2. 后台启动三个服务

    $ ./nsqlookupd > /dev/null 2>&1 &
    [1] 20076
    $ ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
    [2] 20420
    $ ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &
    [3] 20620
    

    > -lookupd-tcp-address 为上面nsqlookupd的IP和tcp的端口4160
    > -lookupd-http-address 是http的端口也就是4161因为admin通过http请求来查询相关信息

  3. 基本概念

    nsqd:基本的节点

    nsqlookupd:汇总节点信息,提供查询和管理topic等服务
    nsqadmin:管理端展示UI界面,能有一个web页面去查看和操作

  4. 简单使用

    • 执行:curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'会创建一个test主题,并发送一个hello world消息
    • 外部通过:http://127.0.0.1:4171/进行访问可以看到NSQ的管理界面,非常的简洁,其中127.0.0.1为服务器IP
      在这里插入图片描述
      在这里插入图片描述
    • 使用./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161消费test中刚才的消息,并输出到服务器/tmp目录中

特性

默认一开始消息不是持久化的
nsq采用的方式时内存+硬盘的模式,当内存到达一定程度时就会将数据持久化到硬盘

  1. 如果将 --mem-queue-size 设置为 0,所有的消息将会存储到磁盘。
  2. 是即使服务器重启也会将当时在内存中的消息持久化
  3. 消息是没有顺序的
    这一点很关键,由于nsq使用内存+磁盘的模式,而且还有requeue的操作,所以发送消息的顺序和接收的顺序可能不一样
  4. 官方不推荐使用客户端发消息
    官方提供相应的客户端发送消息,但是HTTP可能更方便一些
  5. 没有复制
    nsq节点相对独立,节点与节点之间没有复制或者集群的关系。
  6. 没有鉴权相关模块
    当前release版本的nsq没有鉴权模块,只有版本v0.2.29+高于这个的才有
  7. 几个小点
    topic名称有长度限制,命名建议用下划线连接
    消息体大小有限制

nsq优点&缺点

优点:

  1. 部署极其方便,没有任何环境依赖,直接启动就行
  2. 轻量没有过多的配置参数,只需要简单的配置就可以直接使用
  3. 性能高
  4. 消息不存在丢失的情况

缺点:

  1. 消息无顺序
  2. 节点之间没有消息复制
  3. 没有鉴权

客户端

官方提供了很多语言接入的客户端 https://nsq.io/clients/client_libraries.html
针对消息生产者的客户端,官方还推荐直接使用post请求发送消息,如:
curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'
表示向test主题发送hello world这个消息

Golang的客户端

deb安装

$ curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh

在这里插入图片描述

依赖包下载:

$ go get github.com/nsqio/go-nsq

生产者:

package main

// 生产者
import (
	"fmt"

	"github.com/nsqio/go-nsq"
)

var tcpNsqdAddr = "127.0.0.1:4150"

func main() {
	// 初始化配置
	config := nsq.NewConfig()
	for i := 0; i < 100; i++ {
		// 创建100个生产者
		tPro, err := nsq.NewProducer(tcpNsqdAddr, config)
		if err != nil {
			fmt.Printf("tPro new failed:%s", err)
		}

		// 主题
		topic := "Insert"
		// 主题内容
		tCommand := "New data!"
		// 发布消息
		err = tPro.Publish(topic, []byte(tCommand))
		if err != nil {
			fmt.Printf("Publish failed:%s", err)
		}
	}
}

其中127.0.0.1:4150为发送消息的地址,消费者里面写的也是相同的地址就可以了。

消费者:

package main
// 消费者
import (
	"fmt"
	"sync"
	"time"

	"github.com/nsqio/go-nsq"
)

var tcpNsqdAddr = "127.0.0.1:4150"

type NsqHandler struct {
	// 消息数
	msqCount int
	// 标识id
	nsqHandlerID string
}

func main() {
	// 初始化配置
	config := nsq.NewConfig()
	// 创造消费者,参数一是订阅的主题,参数二是使用的通道
	com, err := nsq.NewConsumer("Insert", "channel1", config)
	if err != nil {
		fmt.Println(err)
	}

	// 添加处理回调
	com.AddHandler(&NsqHandler{nsqHandlerID: "One"})

	// 连接对应的nsqd
	err = com.ConnectToNSQD(tcpNsqdAddr)
	if err != nil {
		fmt.Println(err)
	}

	// 只是为了不结束进程,这里没有意义
	var wg = &sync.WaitGroup{}
	wg.Add(1)
	wg.Wait()

}

// HandleMessage 实现HandleMessage方法
// message是接收到的消息
func (s *NsqHandler) HandleMessage(message *nsq.Message) error {
	// 每接收到一条消息]+1
	s.msqCount ++ 
	// 打印输出信息和ID
	fmt.Println(s.msqCount,s.nsqHandlerID)
	// 打印消息的一些基本信息
	fmt.Printf("msg.Timestamp=]%v,msg.nsqaddress=%s,msg.body=]%s", time.Unix(0,message.Timestamp).Format("2006-01-02 03:04:05"),message.NSQDAddress,string(message.Body))
	return nil
}

免责声明:文章转载自《golang-nsq高性能消息队列》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Docker 基本部署自定义控件(2)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

celery redis rabbitMQ各是什么及之间的区别?

Celery:  Celery是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。  1、 celery工作流程: 消息中间件(message broker):Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB ,SQLAl...

Spring Cloud Stream

1 前言 在实际的企业开发中,消息中间件是至关重要的组件之一。消息中间件主要解决应用解耦、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性的架构。不同的中间件其实现方式,内部结构是不一样的。如常见的RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,如RabbitMQ有Exchange,Kafka有Topic、Partitio...

ZeroMQ接口函数之 :zmq_setsockopt –设置ZMQ socket的属性

ZeroMQ API 目录 :http://www.cnblogs.com/fengbohello/p/4230135.html 本文地址 :http://www.cnblogs.com/fengbohello/p/4398953.html 翻译:郝峰波 mail : fengbohello@qq.com ZeroMQ 官方地址 :http://api....

【RabbitMQ】一文带你搞定springboot整合RabbitMQ涉及消息的发送确认,消息的消费确认机制,延时队列的实现

说明 这一篇里,我们将继续介绍RabbitMQ的高级特性,通过本篇的学习,你将收获: 什么是延时队列 延时队列使用场景 RabbitMQ中的TTL 如何利用RabbitMQ来实现延时队列 本文大纲 什么是延迟队列 延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。 其次,延时队列,最...

主流消息队列rocketMq,rabbitMq比对使用

首先整理这个文章是因为我正好有机会实战了一下rocketmq,阿里巴巴的一个开源消息中间件。所以就与以往中rabbitmq进行小小的比较一下。这里主线的根据常见面试问题进行整理。 一.消息队列常用的场景   1.削峰    例如我们做得考试系统中,用户通过人脸识别登录系统,考虑到考试系统的特殊性,三万名考生参加考试,需要记录人脸识别登录照片。从考试完结果上...

Win32编程

    Win32编程 此资料为ITjob软件开发教程网提供,特此分享,互相学习! C/C++/VC/MFC技术交流群:95453496 一、Win32编程基本概念 1、消息驱动 在介绍Windows消息驱动概念之前,我们首先来回顾面向过程的程序结构:main()程序有明显的开始、中间过程和结束点,程序是围绕这个过程编写好相关的子过程,再把这些子过程串联...