kafka的配置,kafka和flume的配置

摘要:
//Files.cnblogs.com/Files/han-guang-xue/kafka.zip实现图中所示效果的详细步骤如下:han03:

参考文档:  https://files.cnblogs.com/files/han-guang-xue/kafka.zip

其中实现如图的效果详细步骤如下:

kafka的配置,kafka和flume的配置第1张

#han01.conf

a1.sources=r1 a1.channels=c1 a1.sinks=k1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /logs a1.sinks.k1.type=avro a1.sinks.k1.hostname=han01 a1.sinks.k1.port=22222 a1.channels.c1.type=file a1.channels.c1.checkpointDir=/home/uplooking/data/flume/checkpoint a1.channels.c1.dataDirs = /home/uplooking/data/flume/data a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
#han02-1.conf
a1.sources
=r1 a1.channels=c1 a1.sinks=k1

a1.sources.r1.type=exec
a1.sources.r1.command = tail  -F  /logs/a.log
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=han01
a1.sinks.k1.port=22222

a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/home/uplooking/data/flume/checkpoint
a1.channels.c1.dataDirs = /home/uplooking/data/flume/data

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
#han02-2.conf

b1.sources=r1 b1.channels=c1 b1.sinks=k1 b1.sources.r1.type=spooldir b1.sources.r1.spoolDir = /logs b1.sinks.k1.type=avro b1.sinks.k1.hostname=han01 b1.sinks.k1.port=22222 b1.channels.c1.type=file b1.channels.c1.checkpointDir=/home/uplooking/data/flume/checkpoint b1.channels.c1.dataDirs = /home/uplooking/data/flume/data b1.sources.r1.channels=c1 b1.sinks.k1.channel=c1
# han03.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

#对于source的配置描述 监听avro(表示flume的类型)

a1.sources.r1.type = avro

a1.sources.r1.bind = han01

a1.sources.r1.port = 22222

 

#sink到kafka里面

a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink

#设置Kafka的Topic

a1.sinks.k1.kafka.topic = haha1

#设置Kafka的broker地址和端口号

a1.sinks.k1.kafka.bootstrap.servers = han01:9092,han02:9092,han03:9092

#配置批量提交的数量

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.ki.kafka.producer.compression.type= snappy

 

#对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint

a1.channels.c1.dataDirs = /home/uplooking/data/flume/data

 

#通过channel c1将source r1和sink k1关联起来

a1.sources.r1.channels = c1

先开启 han03 机器上的flume; 在开启其他的

开启flume命令:

bin/flume-ng agent --conf conf --conf-file conf/flume-kafka.conf  --name a1  -Dflume.root.logger=INFO,console

然后开启消费者:

./bin/kafka-console-consumer.sh --bootstrap-server zhiyou01:9092, zhiyou02:9092, zhiyou03:9092 --from-beginning --topic test3

创建话题命令:

./bin/kafka-topics.sh --create --zookeeper zhiyou01:2181,zhiyou02:2181,zhiyou03:2181 --replication-factor 2 --partitions 3 --topic test3

 
  

免责声明:文章转载自《kafka的配置,kafka和flume的配置》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇(DevExpress2011控件教程)ASPxGridView 范例4 :ASPxGridView 行选择、多表头设计、数据导出、主表细表等功能实现SpringMvc 项目配置下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Kubernetes之应用升级回滚弹性伸缩

  通过命令可以替换镜像升级 kubectl set image   以上一篇的web.yaml为例    把镜像替换成nginx kubectl set image deployment web java=nginx   三个副本滚动升级,新容器处于运行状态会删除旧容器      再次访问无法访问了    因为之前java的后端端口是80...

shell命令--sudo

shell命令--sudo 0、sudo命令的专属图床 点此快速打开文章【图床_shell命令sudo】 1、sudo命令的功能说明 ​ sudo 命令以系统管理者的身份执行指令,也就是说,经由 sudo 所执行的指令就好像是 root 亲自执行。使用权限:在 /etc/sudoers 中有出现的使用者。 2、sudo命令的语法格式 SYNOPSIS...

Yarn对外接口

1 概述 Yarn对外接口 https://forum.huawei.com/enterprise/zh/forum.php?mod=viewthread&tid=451687 本文档专供需要对Yarn进行应用开发的用户使用。本指南主要适用于具备Java开发经验的开发人员。 简介 Yarn是一个分布式的资源管理系统,用于提高分布式的集群环境下的资源...

echarts markLine 辅助线非直线设置

效果图: 用例option: option = { title: { text: '未来一周气温变化', subtext: '纯属虚构' }, tooltip: { trigger: 'axis' }, legend: { data:...

Linux命令(一)

一、cd  切换文件夹   绝对路径(以/根目录开头的路径)   相对路径(基于某个跟目录下的路径,不以/根目录开头)   cd /home  绝对路径(以根目录开头)   cd admin  相对路径(不以根目录开头)   cd ..  返回上级目录   cd ~  返回到自己的家目录(/home/admin)   cd -  回放功能  pwd  查看...

redis(4)

事务  开启事务 multi  作用 设定事务的开启位置,此指令执行后,后续的所有指令均加入到事务中  执行事务 exec  作用 设定事务的结束位置,同时执行事务。与multi成对出现,成对使用   注意:加入事务的命令暂时进入到任务队列中,并没有立即执行,只有执行exec命令才开始执行   取消事务  discard  作用 终止当前事务...