kafka搭建命令与使用

摘要:
对于kafka来说,一个单独的broker意味着kafka集群中只有一个接点。要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可。

安装前的环境准备

由于Kafka是用Scala语言开发的,运行在JVM上,因此在安装Kafka之前需要先安装JDK。

# yum install java-1.8.0-openjdk* -y

kafka依赖zookeeper,所以需要先安装zookeeper

# wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
# tar -zxvf zookeeper-3.4.12.tar.gz
# cd zookeeper-3.4.12
# cp conf/zoo_sample.cfg conf/zoo.cfg
启动zookeeper
# bin/zkServer.sh start
# bin/zkCli.sh 
# ls /			#查看zk的根目录相关节点

第一步:下载安装包

下载1.1.0 release版本,并解压:

# wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
# tar -xzf kafka_2.11-1.1.0.tgz
# cd kafka_2.11-1.1.0

第二步:启动服务

现在来启动kafka服务:
启动脚本语法:

kafka-server-start.sh [-daemon] server.properties

可以看到,server.properties的配置路径是一个强制的参数,-daemon表示以后台进程运行,否则ssh客户端退出后,就会停止服务。(注意,在启动kafka时会使用linux主机名关联的ip地址,所以需要把主机名和linux的ip映射配置到本地host里,用vim /etc/hosts)

# bin/kafka-server-start.sh -daemon config/server.properties

我们进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树

# bin/zkCli.sh 
# ls /			#查看zk的根目录kafka相关节点
# ls /brokers/ids	#查看kafka节点

第三步:创建主题

现在我们来创建一个名字为“test”的Topic,这个topic只有一个partition,并且备份因子也设置为1:

# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

现在我们可以通过以下命令来查看kafka中目前存在的topic

# bin/kafka-topics.sh --list --zookeeper localhost:2181

除了我们通过手工的方式创建Topic,我们可以配置broker,当producer发布一个消息某个指定的Topic,但是这个Topic并不存在时,就自动创建。

第四步:发送消息

kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。
首先我们要运行发布消息的脚本,然后在命令中输入要发送的消息的内容:

# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>this is a msg
>this is a another msg

第五步:消费消息

对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出:

# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test   --from-beginning #老版本
# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --consumer-property group.id=testGroup --consumer-property client.id=consumer-1  --topic test    #新版本

如果你是通过不同的终端窗口来运行以上的命令,你将会看到在producer终端输入的内容,很快就会在consumer的终端窗口上显示出来。
以上所有的命令都有一些附加的选项;当我们不携带任何参数运行命令的时候,将会显示出这个命令的详细用法。

其他常用指令

查看组名

#  bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer

查看消费者的消费偏移量

# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup

消费多主题

# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "test|test-2"

单播消费

一条消息只能被某一个消费者消费的模式,类似queue模式,只需让所有消费者在同一个消费组里即可
分别在两个客户端执行如下消费命令,然后往主题里发送消息,结果只有一个客户端能收到消息

# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --consumer-property group.id=testGroup --topic test

多播消费

一条消息能被多个消费者消费的模式,类似publish-subscribe模式费,针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。我们再增加一个消费者,该消费者属于testGroup-2消费组,结果两个客户端都能收到消息

# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --consumer-property group.id=testGroup-2 --topic test

kafka集群配置

到目前为止,我们都是在一个单节点上运行broker,这并没有什么意思。对于kafka来说,一个单独的broker意味着kafka集群中只有一个接点。要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可。为了有更好的理解,现在我们在一台机器上同时启动三个broker实例。
首先,我们需要建立好其他2个broker的配置文件:

# cp config/server.properties config/server-1.properties
# cp config/server.properties config/server-2.properties

配置文件的内容分别如下:

config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2

broker.id属性在kafka集群中必须要是唯一的。我们需要重新指定port和log目录,因为我们是在同一台机器上运行多个实例。如果不进行修改的话,consumer只能获取到一个instance实例的信息,或者是相互之间的数据会被影响。
目前我们已经有一个zookeeper实例和一个broker实例在运行了,现在我们只需要在启动2个broker实例即可:

# bin/kafka-server-start.sh -daemon config/server-1.properties
# bin/kafka-server-start.sh -daemon config/server-2.properties
### 设置备份因子

现在我们创建一个新的topic,备份因子设置为3:

# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

查看topic信息

现在我们已经有了集群,并且创建了一个3个备份因子的topic,但是到底是哪一个broker在为这个topic提供服务呢(因为我们只有一个分区,所以肯定同时只有一个broker在处理这个topic

# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

RZ2qyR.png

topic信息解析

以下是输出内容的解释

第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。因为目前我们只有一个partition,因此关于partition的信息只有一行。

  • leader节点负责给定partition的所有读写请求。
  • replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。
  • isr 是replicas的一个子集,它只列出当前还存活着的,并且备份了该partition的节点。
    现在我们的案例中,0号节点是leader,即使用server.properties启动的那个进程。
    我们可以运行相同的命令查看之前创建的名称为”test“的topic
# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

RZW18e.png

之前设置了topic的partition数量为1,备份因子为1,因此显示就如上所示了。
现在我们向新建的topic中发送一些message:

# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

my test msg 1
my test msg 2
现在开始消费:

# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

my test msg 1
my test msg 2

现在我们来测试我们容错性,因为broker0目前是leader,所以我们要将其kill

# ps -ef | grep server.properties
# kill -9 1177

现在再执行命令:

# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic

RZWTM9.png

我们可以看到,leader节点已经变成了broker 2

要注意的是,在Isr中,已经没有了0号节点。leader的选举也是从ISR(in-sync replica)中进行的。
此时,我们依然可以 消费新消息:

# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test msg 1
my test msg 2

查看主题分区对应的leader信息:
RZfVJS.png

免责声明:文章转载自《kafka搭建命令与使用》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Win10 教育版Java正确创建对象数组下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Kafka:生产者

Kafka java客户端数据生产流程解析 ProducerRecord ProducerRecord 含义: 发送给Kafka Broker的key/value 值对 //ProducerRecord的成员变量 public class ProducerRecord<K, V> { private final String top...

zookeeper(二):linux centos下安装zookeeper(单机和集群)

下载 http://zookeeper.apache.org/releases.html 解压 tar –zxvf zookeeper-3.4.6.tar.gz 解压文件到"/usr/local/zookeeper-3.4.6". 复制conf目录下的zoo_sample.cfg,并命名为zoo.cfg 修改zoo.cfg配置文件 # The number...

kafka拦截器

目录 Kafka 拦截器分为生产者拦截器和消费者拦截器。 生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑; 而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。 使用 当前 Kafka 拦截器的设置方法是通过参数配置完成的。生产者和消费者两端有一个相同的参数,名字叫 interceptor.classes,它指定的是一组...

Kafka网络模型

摘要:很多人喜欢把RocketMQ与Kafka做对比,其实这两款消息队列的网络通信层还是比较相似的,本文就为大家简要地介绍下Kafka的NIO网络通信模型,通过对Kafka源码的分析来简述其Reactor的多线程网络通信模型和总体框架结构,同时简要介绍Kafka网络通信层的设计与具体实现。 一、Kafka网络通信模型的整体框架概述 Kafka的网络通信模型...

kafka错误集锦

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchResponseSize,clientId=iot 在CloudearManager中安装kafka时,报了这样一个错: [Kafka S...

kafka 基础知识梳理-kafka是一种高吞吐量的分布式发布订阅消息系统

一、kafka 简介 今社会各种应用系统诸如商业、社交、搜索、浏览等像信息工厂一样不断的生产出各种信息,在大数据时代,我们面临如下几个挑战: 如何收集这些巨大的信息 如何分析它 如何及时做到如上两点 以上几个挑战形成了一个业务需求模型,即生产者生产(produce)各种信息,消费者消费(consume)(处理分析)这些信息,而在生产者与消费者之间,需要...