kafka2.5.0 主题Topic

摘要:
$bin/kafka topics.sh--引导服务器localhost:drwxrwxr-x.2joycejoyce 141Jun2105:17my-topic-0drwxrwxrx-x.2joyceJoyce 141jun2105:17my-topic-2drwxrw-x.2joyCejoyce1141Jun2105:

kafka基本命令查看博客《kafka2.5.0基本命令

本博文所使用kafka版本2.5.0,操作系统centos8.

1)创建主题

创建my-topic主题,该主题有 1 个副本,8个分区:

$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 8 --topic my-topic
Created topic my-topic.

注意这里的 --replication-factor 副本个数不能大于broker的个数

2)列出主题

$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
my-topic
test

3)  更改主题分区个数:

注意:分区个数只能改多,不能改少,这里从8个分区改为16个:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my-topic --partitions 16
artitions 16

 验证分区扩建,cd 到kafka默认日志数据目录下: /tmp/kafka-logs,可以查看到16个分区文件夹:

drwxrwxr-x. 2 joyce joyce  141 Jun 21 05:17 my-topic-0
drwxrwxr-x. 2 joyce joyce  141 Jun 21 05:17 my-topic-1
drwxrwxr-x. 2 joyce joyce  141 Jun 21 05:17 my-topic-2
drwxrwxr-x. 2 joyce joyce  141 Jun 21 05:17 my-topic-3
.......

 每个分区文件夹的核心文件是.log文件:

[joyce@192 kafka-logs]$ cd my-topic-0
[joyce@192 my-topic-0]$ ll
total 4
-rw-rw-r--. 1 joyce joyce 10485760 Jun 21 05:17 00000000000000000000.index
-rw-rw-r--. 1 joyce joyce        0 Jun 21 05:17 00000000000000000000.log        
// log日志过期时间可以在server.properties里配置:log.retention.hours=168
// 注意:在分区文件夹目录下,有个00000000000000000000.log文件,如果size大到一定程度,会新建一个10这样二进制命名的.log文件,例如10101010.log,00000000000000000000.log则停止更新,当旧文件的最后更新时间  + 日志保存时间 = 删除时间,00000000000000000000.log日志就会被删除。以此类推。
-rw-rw-r--. 1 joyce joyce 10485756 Jun 21 05:17 00000000000000000000.timeindex -rw-rw-r--. 1 joyce joyce 8 Jun 21 05:17 leader-epoch-checkpoint

 查看分区当前情况:

[joyce@192 kafka_2.12-2.5.0]$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
Topic: kafka-boot    PartitionCount: 1    ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: kafka-boot    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
Topic: my-topic    PartitionCount: 16    ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: my-topic    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: my-topic    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
    Topic: my-topic    Partition: 2    Leader: 0    Replicas: 0    Isr: 0
    Topic: my-topic    Partition: 3    Leader: 0    Replicas: 0    Isr: 0
......

4)生产者

需连接多个broker的话,中间用逗号分隔:

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092,192.168.2.60:9092 --topic my-topic

5)消费者

 消费者消费消息:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

 如果加了--from-beginning,则从头开始消费消息

6)删除Topic

注意:尽量不要删除Topic,可能会弄坏kafka

kafka2.5.0版本里,server.properties已经没有 delete.topic.enable=true 这个配置,所以忽略此项。

步骤1:停止该Topic所有的生产者和消费者。

步骤2:进入kafka的bin目录下,用命令删除 Topic test1。此时可以在zookeeper里看到 admin --> delete-topic下,test1 被标记为删除,但是没有进行物理删除。

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test1

 步骤3:删除kafka存储Topic目录,该目录默认配置在server.properties里的log.dirs=/tmp/log-kafka,进入该目录可以看到Topic的相关分区:test1-0、test1-1

步骤4:删除zookeeper里的topic信息:

bin/zkCli.sh -server 【zookeeper server:port】

登录到zk shell,然后找到topic所在目录: ls /brokers/topics,找到要删除的topic,然后执行命令:

rmr /brokers/topics/【topic name】

rmr /admin/delete_topics/ 【topic name】 

bin/zkCli.sh -server localhost:2181
[zk: localhost:2181(CONNECTED) 0] ls /brokers/topics
[__consumer_offsets, abc, my-topic, test1]
[zk: localhost:2181(CONNECTED) 1] deleteall /brokers/topics/test1         // deleteall 命令可以删除该文件夹,即使目录不为空
[zk: localhost:2181(CONNECTED) 1] deleteall /admin/delete_topics/test1 

kafka2.5.0 主题Topic第1张

步骤5

重启zookeeper:

./zkServer.sh start        

重启kafka:

bin/kafka-server-start.sh config/server.properties

  

end.

免责声明:文章转载自《kafka2.5.0 主题Topic》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇java虚拟机详细图解9--JVM机器指令集Dubbo学习笔记12:使用Dubbo中需要注意的一些事情下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Storm实战:在云上搭建大规模实时数据流处理系统(Storm+Kafka)

在大数据时代,数据规模变得越来越大。由于数据的增长速度和非结构化的特性,常用的软硬件工具已无法在用户可容忍的时间内对数据进行采集、管理和处理。本文主要介绍如何在阿里云上使用Kafka和Storm搭建大规模消息分发和实时数据流处理系统,以及这个过程中主要遭遇的一些挑战。实践主要立足建立一套汽车状态实时监控系统,可以在阿里云上立即进行部署。   实时大数据处理...

只需两步快速获取微信小程序源码

只需两步快速获取微信小程序源码 第一次在掘金这样高大上的社区写文章,忐忑地敲下我获取小程序源码过程中的经验分享。 最近在学习微信小程序开发,半个月学习下来,很想实战一下踩踩坑,于是就仿写了某个小程序的前端实现,过程一言难尽,差不多两周时间过去了,发现小程序的坑远比想象的要多的多!!在实际练手中,完全是黑盒的,纯靠推测,部分效果在各种尝试后能能做出大致的实...

mysql安装和遇到的问题处理

遇到需要在新系统上安装MySQL的事情,简单记录一下过程。 声明:最好的文档是官方文档,我也是看的官方文档,只是中间遇到点问题,记录一下出现的问题和处理方式。贴一些官方文档地址。 用tar包的安装方式:https://dev.mysql.com/doc/refman/5.7/en/binary-installation.html 用yum源等其他安装方式:...

vue 在nginx下页面刷新出现404问题解决和在nginx下页面加载了js但是页面显示空白问题解决

一、vue 在nginx下页面刷新出现404   在网上翻遍了所有这样问题的解决办法,全都是一个解决办法也是正确的解决办法,(后来在vue官网上关于history方式出现404解决方法也是这样说的),只是没有表达完整,可能会让比较急于解决这个问题的人简单复制却始终解决不了问题 nginx正确的配置: 1、如果是在根目录则配置如下 location / { ...

MQTT协议中的topic

1、MQTT协议中的topic 定阅与发布必须要有主题,只有当定阅了某个主题后,才能收到相应主题的payload,才能进行通信。 2、 主题层级分隔符——“/” 主题层级分隔符使得主题名结构化。如果存在分隔符,它将主题名分割为多个主题层级。 斜杠(‘/’ U+002F)用于分割主题的每个层级,为主题名提供一个分层结构。当客户端订阅指定的主题过滤器包含两种通...

jenkins安装配置

一.Jenkins简介 Jenkins是一个开源软件项目,旨在提供一个开放易用的软件平台,使软件的持续集成编程可能。Jenkins是基于Java开发的一种持续集成工具,用于监控持续重复的工具,功能包括: 1.持续的软件版本发布/测试项目。 2.监控外部调用执行的工作。 今天我们主要讲持续的软件版本发布,即:项目的“自动化”编译、打包、分发部署。   二.J...