Go操作kafka

摘要:
本文介绍了如何使用Go语言发送和接收kafka消息。=nil{fmt.Printfreturn}partitionList,err:=consumer.Partitions//根据topic取到所有的分区iferr!
目录

更新、更全的《Go从入门到放弃》的更新网站,更有python、go、人工智能教学等着你:https://www.cnblogs.com/nickchen121/p/11517502.html

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展等特点。本文介绍了如何使用Go语言发送和接收kafka消息。

一、sarama

Go语言中连接kafka使用第三方库:github.com/Shopify/sarama

1.1 下载及安装

go get github.com/Shopify/sarama

1.2 注意事项

sarama v1.20之后的版本加入了zstd压缩算法,需要用到cgo,在Windows平台编译时会提示类似如下错误:

# github.com/DataDog/zstd
exec: "gcc":executable file not found in %PATH%

所以在Windows平台请使用v1.19版本的sarama。

二、连接kafka发送消息
package main

import (
	"fmt"

	"github.com/Shopify/sarama"
)

// 基于sarama第三方库开发的kafka client

func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

	// 构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "web_log"
	msg.Value = sarama.StringEncoder("this is a test log")
	// 连接kafka
	client, err := sarama.NewSyncProducer([]string{"192.168.1.7:9092"}, config)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	}
	defer client.Close()
	// 发送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed, err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v
", pid, offset)
}
三、连接kafka消费消息
package main

import (
	"fmt"

	"github.com/Shopify/sarama"
)

// kafka consumer

func main() {
	consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
	if err != nil {
		fmt.Printf("fail to start consumer, err:%v
", err)
		return
	}
	partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
	if err != nil {
		fmt.Printf("fail to get list of partition:err%v
", err)
		return
	}
	fmt.Println(partitionList)
	for partition := range partitionList { // 遍历所有的分区
		// 针对每个分区创建一个对应的分区消费者
		pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Printf("failed to start consumer for partition %d,err:%v
", partition, err)
			return
		}
		defer pc.AsyncClose()
		// 异步从每个分区消费信息
		go func(sarama.PartitionConsumer) {
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
			}
		}(pc)
	}
}

免责声明:文章转载自《Go操作kafka》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇html5左右滑动列表python中常用的数据类型之整型(int),浮点型(float), 布尔值(bool), 复数(complex)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

linux shell脚本编程笔记(五): 重定向

I/O重定向 简述: 默认情况下始终有3个"文件"处于打开状态, stdin (键盘), stdout (屏幕), and stderr (错误消息输出到屏幕上). 这3个文件和其他打开的文件都可以被重定向. 对于重定向简单的解释就是捕捉一个文件, 命令, 程序, 脚本, 或者甚至是脚本中的代码块的输出, 然后将这些输出作为输入发送到另一个文件, 命令,...

【PHP】你使用过redis做异步队列么,是怎么用的?有什么缺点?

Redis设计主要是用来做缓存的,但是由于它自身的某种特性使得它可以用来做消息队列。 它有几个阻塞式的API可以使用,正是这些阻塞式的API让其有能力做消息队列; 另外,做消息队列的其他特性例如FIFO(先入先出)也很容易实现,只需要一个list对象从头取数据,从尾部塞数据即可; Redis能做消息队列还得益于其list对象blpop brpop接口以及P...

kafka一个broker挂掉无法写入

Kafka日志: broker 133 received LeaderAndIsrRequest with correlation id 1 from controller 132 epoch 35 fro partition [__consumer_offsets,13] but cannot become follower since the new...

SAP系统中发送公告的几种办法

      刚刚在闪存里看到朋友在闪存里提到:如何在SAP系统发送公告的T-CODE,觉得今后可能会用到,所以顺便在网上搜索了一下相关资料。由于时间关系,太晚了,得休息了,在此我就不再一个个抠图了,直接借用一下别人的文章,将此方法记录了下来,希望今后有用得到的地方……     1、SM02创建消息,并设定有效期。当用户刷新窗口或打开窗口时会显示。这个消息对...

小程序之模板消息

使用模板消息 获取模板 id 登录https://mp.weixin.qq.com获取模板,如果没有合适的模板,可以申请添加新模板,审核通过后可使用,详见模板审核说明 页面的 <form/> 组件,属性report-submit为true时,可以声明为需发模板消息,此时点击按钮提交表单可以获取formId,用于发送模板消息。或者当用户完成支...

DELPHI中的消息处理机制(三种消息处理方法的比较,如何截断消息)

DELPHI中的消息处理机制 Delphi是Borland公司提供的一种全新的WINDOWS编程开发工具。由于它采用了具有弹性的和可重用的面向对象Pascal(object-orientedpascal)语言,并有强大的数据库引擎(BDE),快速的代码编译器,同时又提供了众多出色的构件。受到广大编程人员的青睐。在众多的编程语言(如VB,PowerBuild...