filebeat对接kafka

摘要:
当我在工作中遇到时,filebeat与kafka对接。写下来并分享。这也是为了防止我忘记filebeat是一个收集客户端上运行的日志的代理。Filebeat是一个耳朵进耳朵出的经纪人。In表示要侦听的日志文件,out表示要输出监视的日志内容的位置。当然,这里我们输出到kafka消息队列,而kafka是一个消息队列,为什么要使用kafka?因为现在有很多,我们在工作中确实遇到了filebeat对接kafka。

工作中遇到了,filebeat对接kafka,记下来,分享一下,也为了防止自己忘记

对于filebeat是运行在客户端的一个收集日志的agent,filebeat是一个耳朵进一个耳朵出,进的意思是监听哪个日志文件,出的意思是监听的日志内容输出到哪里去,当然,这里我们输出到kafka消息队列中,而kafka就是一个消息队列,为什么要用kafka?因为现在用的很多,而且工作中也确实遇到filebeat对接kafka了。具体的可以自行百度查询,废话不多说,开始做

第一步,安装helm3

helm3的使用方法和安装,博客里有,在哪下载?https://github.com/helm/helm/releases/tag/v3.5.2,目前是最新的

第二步, 加载helm仓库,本来是需要加载官网的仓库地址的,可惜,翻不了墙,用阿里的代理一下吧,也能用,亲测

filebeat对接kafka第1张

 第三步,下载helm文件,解压出来就是filebeat目录

helm pull apphub/kafka
helm pull apphub/filebeat

 filebeat对接kafka第2张

 第四步,应用kafka文件内容,红色字体非常有用,接下来给你们解释一下

[root@VM-0-15-centos ~]# helm install kafka2 ./kafka
NAME: kafka2
LAST DEPLOYED: Fri Feb  5 22:57:45 2021
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
** Please be patient while the chart is being deployed **

Kafka can be accessed via port 9092 on the following DNS name from within your cluster:

    kafka2.default.svc.cluster.local #在k8s这个内容可以当做域名来解析出来kafka的ip地址

To create a topic run the following command:#创建主题

    export POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka2,app.kubernetes.io/component=kafka" -o jsonpath="{.items[0].metadata.name}")
    kubectl --namespace default exec -it $POD_NAME -- kafka-topics.sh --create --zookeeper kafka2-zookeeper:2181 --replication-factor 1 --partitions 1 --topic test

To list all the topics run the following command:#查看所有的主题

    export POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka2,app.kubernetes.io/component=kafka" -o jsonpath="{.items[0].metadata.name}")
    kubectl --namespace default exec -it $POD_NAME -- kafka-topics.sh --list --zookeeper kafka2-zookeeper:2181

To start a kafka producer run the following command:#进入kafka生产者命令行可以给主题添加消息

    export POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka2,app.kubernetes.io/component=kafka" -o jsonpath="{.items[0].metadata.name}")
    kubectl --namespace default exec -it $POD_NAME -- kafka-console-producer.sh --broker-list localhost:9092 --topic test

To start a kafka consumer run the following command:#消费者窗口,可以查看到生产者发出的信息

    export POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka2,app.kubernetes.io/component=kafka" -o jsonpath="{.items[0].metadata.name}")
    kubectl --namespace default exec -it $POD_NAME -- kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    #以下两行是在容器内部运行的命令,一个是生产者,一个是消费者
    PRODUCER:
        kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
    CONSUMER:
        kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning

接下来我们来安装filebeat

filebeat有两种方式:

  1.以sidecar方式去手机容器日志,也就是说,一个pod中必须要运行一个filebeat容器,这样的话,如果有1000个pod,每一个pod跑一个应用,一个filebeat,那么就是2000个,果断放弃

  2.以daemonSet方式,以节点运行,那么只需要有几个node就运行几个filebeat就可以了,所以我么选择第二种

第一步,修改values.yml文件,如下,红色字体需要注意,filebeat.input是监听的文件,output.kafka是输出到哪里去,我们这里配的是域名,coredns会自动解析成ip,具体规则是

pod名称.名称空间.svc.cluster.local,topic为我们创建的主题名称

[root@VM-0-15-centos filebeat]# cat values.yaml | grep -v "#" | grep -v "^$"
image:
  repository: docker.elastic.co/beats/filebeat-oss
  tag: 7.4.0
  pullPolicy: IfNotPresent
config:
  filebeat.config:
    modules:
      path: ${path.config}/modules.d/*.yml
      reload.enabled: false
  filebeat.inputs:
    - type: log
      enabled: true
      paths:
        - /var/a.log
  output.kafka:
     enabled: true
     hosts: ["kafka.default.svc.cluster.local:9092"]
     topic: test1111
  http.enabled: true
  http.port: 5066
overrideConfig: {}
data:
  hostPath: /var/lib/filebeat
indexTemplateLoad: []
plugins: []
command: []
args: []
extraVars: []
extraVolumes: []
extraVolumeMounts: []
extraSecrets: {}
extraInitContainers: []
resources: {}
priorityClassName: ""
nodeSelector: {}
annotations: {}
tolerations: []
affinity: {}
rbac:
  create: true
serviceAccount:
  create: true
  name:
podSecurityPolicy:
  enabled: False
  annotations: {}
privileged: false
monitoring:
  enabled: true
  serviceMonitor:
    enabled: true
  image:
    repository: trustpilot/beat-exporter
    tag: 0.1.1
    pullPolicy: IfNotPresent
  resources: {}
  args: []
  exporterPort: 9479
  targetPort: 9479

第二步,应用filebeat

[root@VM-0-15-centos ~]# helm install filebeat2 ./filebeat
NAME: filebeat2
LAST DEPLOYED: Fri Feb  5 23:09:51 2021
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
To verify that Filebeat has started, run:

  kubectl --namespace=default get pods -l "app=filebeat,release=filebeat2"

第三步,我们查看一下所有的pod,zookeeper是kafka集群必带的

filebeat对接kafka第3张

 第四步,测试

进入到容器,然后在/var/a.log这个文件内输入点东西

filebeat对接kafka第4张

 第五步,查看另一边监听的

filebeat对接kafka第5张

 补充

如果要监控另外容器日志,那么,我们可以把这个应用的容器日志输出到宿主机的目录下面,然后再在filebeat容器跟这个宿主机目录做映射,在配置filebeat.yml文件,来完成filebeat对应用容器产生的日志做监控。最主要就是配置values.yml这个文件,此文件内容如下:

[root@iZ8vb1m9mvb3ev1tqgrldwZ shell]# cat filebeat/values.yaml | grep -v "#" | grep -v "^$"
image:
  repository: docker.elastic.co/beats/filebeat-oss
  tag: 7.4.0
  pullPolicy: IfNotPresent
config:
  filebeat.config:
    modules:
      path: ${path.config}/modules.d/*.yml
      reload.enabled: false
  processors:
    - add_cloud_metadata:
  filebeat.inputs:
    - type: log
      enabled: true
      paths:
      - /host/var/melocal/logs/*.log
      - /host/var/geo/logs/*.log
      - /host/var/rgeo/log/*.log
  output.kafka:
    enabled: true
    hosts: ["kafka.default.svc.cluster.local:9092"]
    topic: test_topic
  http.enabled: true
  http.port: 5066
overrideConfig: {}
data:
  hostPath: /var/lib/filebeat
indexTemplateLoad: []
plugins: []
command: []
args: []
extraVars: []
extraVolumes:
   - hostPath:
       path: /root/jiaohang/amap-melocal/logs
     name: melocal-log
   - hostPath:
       path: /root/jiaohang/amap-geo/data/geocoding/log
     name: geo-log
   - hostPath:
       path: /root/jiaohang/amap-rgeo/data/reverse_geocoding/log
     name: rgeo-log
extraVolumeMounts:
   - name: melocal-log
     mountPath: /host/var/melocal/logs
     readOnly: true
   - name: geo-log
     mountPath: /host/var/geo/log
     readOnly: true
   - name: rgeo-log
     mountPath: /host/var/rgeo/log
     readOnly: true
extraSecrets: {}
extraInitContainers: []
resources: {}
priorityClassName: ""
nodeSelector: {}
annotations: {}
tolerations: []
affinity: {}
rbac:
  create: true
serviceAccount:
  create: true
  name:
podSecurityPolicy:
  enabled: False
  annotations: {}
privileged: false
monitoring:
  enabled: true
  serviceMonitor:
    enabled: true
  image:
    repository: trustpilot/beat-exporter
    tag: 0.1.1
    pullPolicy: IfNotPresent
  resources: {}
  args: []
  exporterPort: 9479
  targetPort: 9479

我在这里监听了3个目录下面的日志动态,配置成功后在kafka下面就会有日志输出了

filebeat对接kafka第6张

免责声明:文章转载自《filebeat对接kafka》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇一步一步带你安装史上最难安装的 vim 插件springboot项目中集成ip2region遇到的问题及终极解决办法下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

安卓消息推送解决方案

一、推送工具使用 我们在做安卓开发的时候,通常需要一些消息推送功能,我个人平时用的是极光推送,极光推送(JPush)是一个端到端的推送服务,使得服务器端消息能够及时地推送到终端用户手机上,让开发者积极地保持与用户的连接,从而提高用户活跃度、提高应用的留存率。极光推送客户端支持 Android, iOS 两个平台。 使用的时候我们可以先去官网看开发文档,我是...

kubernetes之kubeadm 安装kubernetes 高可用集群

1. 架构信息 系统版本:CentOS 7.6内核:3.10.0-957.el7.x86_64 Kubernetes: v1.14.1Docker-ce: 18.09.5推荐硬件配置:4核8G Keepalived保证apiserever服务器的IP高可用 Haproxy实现apiserver的负载均衡 2. 节点信息 目前测试为 6 台虚拟机,etcd...

Kafka:生产者

Kafka java客户端数据生产流程解析 ProducerRecord ProducerRecord 含义: 发送给Kafka Broker的key/value 值对 //ProducerRecord的成员变量 public class ProducerRecord<K, V> { private final String top...

thinkphp 6 消息队列

1.安装think-queue composer require topthink/think-queue 2.配置消息队列,将config/queue.php将’default’ => ‘sync’改为’default’ => ‘redis’,使用Redis驱动 如选择database,需创建表 CREATE TABLE `prefix_j...

k8s的yaml说明

理解k8s里的几个概念 Kubernetes 通过各种 Controller 来管理 Pod 的生命周期。为了满足不同业务场景,Kubernetes 开发了 Deployment、ReplicaSet、DaemonSet、StatefuleSet、Job 等多种 Controller。最常用的 Deployment用来建立Pod,以下是它的步骤 kube...

WebRTC学习(五)WebRTC信令服务器

一:使用socket.io发送消息 (一)socket.io服务端发送消息 broadcast会向站点中的所有房间发送消息 (二)socket.io客户端处理消息 二:WebRTC信令服务器 (一)信令服务器(TCP)作用 1.媒体相关信息交换:发送SDP描述信息(是否支持音频、视频,已经对应的编解码信息),通过信令服务器进行中转(因为两个客户端...