kafka 基本操作命令

摘要:
查看kafka版本并进入kafka安装目录kafka/libs,类似于kafka_2.12-2.0.0.jar,2.12是scala版本,2.0.0是kafka版(kafka使用scala进行开发)。zookeeper服务器启动。sh参考kafka环境来构建zookeeper服务器停止。sh停止kafka bin/kafka-
  • 查看kafka版本

      进入kafka安装目录 ... kafka/libs,看到类似kafka_2.12-2.0.0.jar这样的文件,2.12为scala版本,2.0.0是kafka版本(kafka使用了Scala进行开发).

  • zookeeper-server-start.sh

    参照 kafka环境搭建 

  • zookeeper-server-stop.sh

    停止kafka bin/kafka-server-stop.sh

  • kafka-server-start.sh

   参照 kafka环境搭建 

  • kafka-server-stop.sh

          停止zookeeper  bin/kafka-server-stop.sh

  • kafka-topics.sh

    参照  kafka环境搭建 

创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

展示topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

描述topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

删除topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topicname

删除topic中存储的内容
在config/server.properties中找到如下的位置log.dirs=路径,删除log.dirs指定的文件目录,然后重新启动就可以了
  • kafka-consumer-groups.sh
-- 获取group列表
root@root:/usr/local/kafka/kafka_2.12-2.0.0$ bin/kafka-consumer-groups.sh --bootstrap-server 192.168.101.75:6667 --list console-consumer-61307 -- 查看具体的消费者group信息。其中CURRENT-OFFSET是该分区当前消费到的offset;LOG-END-OFFSET是该分区当前最新的offset;LAG是消费滞后区间
root@root:/usr/local/kafka/kafka_2.12-2.0.0$ bin/kafka-consumer-groups.sh --bootstrap-server 192.168.101.75:6667 --group console-consumer-61307 --describe Consumer group 'console-consumer-61307' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 3 3 0 - - -

  --重置消费者offset:指定GROUP_NAME和topic的offset修改到NEW_OFFSET的位置,重启消费者后,消费中将从指定的offset处消费。注意这里只能NEW_OFFSET只能设置一个值,也就是说,所有的分区都将使用这个值,如果分区消息负载不均衡,需要考虑是否适用
  bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME --reset-offsets --execute --to-offset NEW_OFFSET --topic TOPIC_NAME
--重置消费者offset:将指定GROUP_NAME和topic的offset修改到earliest或者latest位置,使得消费者从头或者从尾部消费
  bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME --reset-offsets --execute --to-earliest/--to-latest --topic TOPIC_NAME

  • kafka-console-consumer.sh

   参照  kafka环境搭建 

-- 从头消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning
-- 指定offset消费,并限制消费数量(partition必须指定,除非'--offset'是明确的)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --offset latest --partition 0 --max-messages 2
  • kafka-run-class.sh

  从kafka jar包中执行指定的工具类.

kafka 基本操作命令第1张

root@root:/usr/local/kafka/kafka_2.12-2.0.0# bin/kafka-run-class.sh kafka.tools.GetOffsetShell 
An interactive shell for getting topic offsets.
Option                                 Description                            
------                                 -----------                            
--broker-list <String: hostname:       REQUIRED: The list of hostname and     
  port,...,hostname:port>                port of the server to connect to.    
--max-wait-ms <Integer: ms>            DEPRECATED AND IGNORED: The max amount 
                                         of time each fetch request waits.    
                                         (default: 1000)                      
--offsets <Integer: count>             DEPRECATED AND IGNORED: number of      
                                         offsets returned (default: 1)        
--partitions <String: partition ids>   comma separated list of partition ids. 
                                         If not specified, it will find       
                                         offsets for all partitions (default: 
                                         )                                    
--time <Long: timestamp/-1(latest)/-2  timestamp of the offsets before that   
  (earliest)>                            (default: -1)                        
--topic <String: topic>                REQUIRED: The topic to get offset from.

root@root:/usr/local/kafka/kafka_2.12-2.0.0# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.101.75:6667  --topic test
test:0:3
root@root:/usr/local/kafka/kafka_2.12-2.0.0# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.101.75:6667  --topic test --time -1
test:0:3
root@root:/usr/local/kafka/kafka_2.12-2.0.0# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.101.75:6667  --topic test --time -2
test:0:0

免责声明:文章转载自《kafka 基本操作命令》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇网络虚拟化基础协议·GeneveLinux下的Jenkins作为hub,Windows作为node节点,在Android手机上执行自动化脚本下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Spring Cloud Stream学习笔记

目录 1 环境 2 简介 3 初见 1 创建项目 添加web rabbitmq stream依赖 2 rabbitmq配置 3 消息接收器 4 在rabbitmq中发送消息 5 查看结果 4 自定义消息通道 1 自定义接口 2 自定义接收器 3 controller进行测试 4 消息输入输出(通道对接) 5 启动、访问 5 消息分组 1...

我的全栈之路-C语言基础之C语言概述与开发环境搭建

我的全栈之路-C语言基础之C语言概述与开发环境搭建 我的全栈之路 1.1 信息技术发展趋势 1.2 浅谈计算机系统架构 1.2.1 计算机系统架构概述 1.2.2 计算机硬件系统 1.2.3 计算机软件系统 1.3 程序和指令 1.4 计算机编程语言发展史 1.5 程序的编译和解释 1.6 计算机语言应用场景 1.7 C语言概览 1.7.1 C...

基于 Docker 的几种常用 CentOS7 镜像

https://blog.csdn.net/github_39577257/article/details/107180891# 1 查看系统中是否已经安装了 Dockersystemctl status dockerrpm -qa | grep -E "docker"## 1.1 如果重新安装可以先卸载旧版本 Dockeryum remove docke...

Mongodb学习总结(2)——MongoDB与MySQL区别及其使用场景对比

对于只有SQL背景的人来说,想要深入研究NoSQL似乎是一个艰巨的任务,MySQL与MongoDB都是开源常用数据库,但是MySQL是传统的关系型数据库,MongoDB则是非关系型数据库,也叫文档型数据库,是一种NoSQL数据库。它们各有优点,关键看用在什么地方。 什么情况下,MongoDB是最好的选择? 很多人认为MongoDB难以置信的强大,是一个可扩...

RocketMQ 消费者核心配置和核心知识

一、RocketMQ4.X 消费者核心配置 consumeFromWhere 配置(某些情况失效:参考https://blog.csdn.net/a417930422/article/details/83585397)这个配置基本不用改,采用默认配置即可。 CONSUME_FROM_FIRST_OFFSET: 初次从消息队列头部开始消费,即历史消息(还储...

SwitchHosts—hosts管理利器

http://www.cnblogs.com/1024zy/p/5951524.html SwitchHosts是一个管理、快速切换Hosts小工具,开源软件,一键切换Hosts配置,非常实用,高效。开发Web过程成,部署有多套环境,网址域名都相同,部署在不同的服务器上,有开发环境、测试环境、预发布环境、生产环境。经常要切换Hosts来访问,测试以及验证...