logstash解析嵌套json格式数据

摘要:
ACCESSLOG%{DATETIME:logTime}[%{DATA:threadName}]%{DATA:loglevel}[%{DATA:logType}][%{DATA:appId}]-logTime:%{DATETIME:logTime2}-receiveTime:%{DATETIME:receiveTime}-%{GREEDYDATA:jsonMsg}这个文件json中间还嵌套了一个json,所以需要把里面嵌套的json在拿出来解析,故logstash配置文件应该写成input{kafka{#bootstrap_servers=˃"kafka-service.ops:9092"bootstrap_servers=˃"172.27.27.220:9092,172.27.27.221:9092,172.27.27.222:9092"topics=˃["test-grok"]codec=˃"json"type=˃"test-grok"}}filter{if[type]=="test-grok"{grok{patterns_dir=˃["/opt/appl/logstash/patterns"]match=˃{"message"=˃"%{ACCESSLOG}"}}mutate{gsub=˃["jsonMsg","[",""]gsub=˃["jsonMsg","]",""]}json{source=˃"jsonMsg"}mutate{add_field=˃{"reqs_json"=˃"%{reqs}"}}json{source=˃"reqs_json"remove_field=˃["reqs","reqs_json","message","jsonMsg"]}}ruby{code=˃"event.timestamp.time.localtime"}}output{elasticsearch{hosts=˃["172.27.27.220:9200","172.27.27.221:9200","172.27.27.222:9200"]index=˃"logstash-test-grok-%{+YYYY.MM.dd}"template_overwrite=˃true}}2.原日志文件为  [2019-10-2810:01:01.169][Thread-13086]INFO[192.168.2.1,192.168.1.1,192.168.1.2_1572_smallTrade][INTERFACE]-[HTTP][request]-{"latitude":"","cardCode":"","memberCouponNo":"","transAmount":"900","hbFqNum":"","confirmCode":"9357","couponAmount":"","lastCost":"2360","memberMobile":"","timestamp":"1572228060000","longitude":""}日志只需要取到有lastCost这个关键字的,所以filebeat配置应该为-type:logenabled:truepaths:-/opt/appl/tomcat/logs/test/test.loginclude_lines:['.*lastCost.*']tail_files:truefields:type:interfacelog_module:test-interfaceoutput.kafka:enabled:truehosts:["172.27.27.220:9092,172.27.27.221:9092,172.27.27.222:9092"]topic:'%{[fields][type]}'由于研发同事把客户端的IP加到了第一个第四个字段的第一个IP,所以要把这个IP单独拿出来分析DATETIME%{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T]%{HOUR}:?
logstash解析嵌套json格式数据

1、源文件

1.原日志文件为

2019-10-28 09:49:44:947 [http-nio-8080-exec-23] INFO  [siftLog][qewrw123ffwer2323fdsafd] - logTime:2019-10-28 09:49:25.833-receiveTime:2019-10-28 09:49:44.044-{"area":"","frontInitTime":0,"initiatePaymentMode":"plugin_manual","network":"电信","os":"Microsoft Windows 7","payStatus":"1","reqs":[{"curlCode":"0","end":"2019-10-28 09:49:25.233","errorCode":"","errorDesc":"","totalTime":2153}],"settleAccountsTime":0}

在这里我们需要先把json前面一段的正则写出来,由于这些数据在实际生产没什么实际意义,所以没重点分字段

DATETIME %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?
ACCESSLOG %{DATETIME:logTime} [%{DATA:threadName}] %{DATA:loglevel} [%{DATA:logType}][%{DATA:appId}] - logTime:%{DATETIME:logTime2}-receiveTime:%{DATETIME:receiveTime}-%{GREEDYDATA:jsonMsg}

这个文件json中间还嵌套了一个json,所以需要把里面嵌套的json在拿出来解析,故logstash配置文件应该写成

input {
  kafka {
    #bootstrap_servers => "kafka-service.ops:9092"
    bootstrap_servers => "172.27.27.220:9092,172.27.27.221:9092,172.27.27.222:9092"
    topics => ["test-grok"]
    codec => "json"
    type => "test-grok"
  }
}

filter {
  if [type] == "test-grok" {
    grok {
        patterns_dir => [ "/opt/appl/logstash/patterns" ]
        match => { "message" => "%{ACCESSLOG}" }
    }
    mutate {
      gsub => [ "jsonMsg","[","" ]
      gsub => [ "jsonMsg","]","" ]
    }
    json {
      source => "jsonMsg"
    }
    mutate {
      add_field => { "reqs_json" => "%{reqs}" }
    }
    json {
      source => "reqs_json"
      remove_field => ["reqs","reqs_json","message","jsonMsg"]
    }
  }

  ruby {
    code => "event.timestamp.time.localtime"
  }

}

output {
  elasticsearch {
    hosts => ["172.27.27.220:9200","172.27.27.221:9200","172.27.27.222:9200"]
    index => "logstash-test-grok-%{+YYYY.MM.dd}"
    template_overwrite => true
  }
}

2.原日志文件为  

[2019-10-28 10:01:01.169] [Thread-13086] INFO  [192.168.2.1, 192.168.1.1, 192.168.1.2_1572_smallTrade] [INTERFACE] - [HTTP] [request] - {"latitude":"","cardCode":"","memberCouponNo":"","transAmount":"900","hbFqNum":"","confirmCode":"9357","couponAmount":"","lastCost":"2360","memberMobile":"","timestamp":"1572228060000","longitude":""}

日志只需要取到有lastCost这个关键字的,所以filebeat配置应该为

- type: log
  enabled: true
  paths:
    - /opt/appl/tomcat/logs/test/test.log
  include_lines: ['.*lastCost.*']
  tail_files: true
  fields:
    type: interface
    log_module: test-interface
output.kafka:
  enabled: true
  hosts: ["172.27.27.220:9092,172.27.27.221:9092,172.27.27.222:9092"]
  topic: '%{[fields][type]}'

由于研发同事把客户端的IP加到了第一个第四个字段的第一个IP,所以要把这个IP单独拿出来分析

DATETIME %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?

input {
       kafka {
         bootstrap_servers => "172.27.27.220:9092,172.27.27.221:9092,172.27.27.222:9092"
         topics => ["interface"]
         codec => "json"
         type => "test-interface"
       }
}

filter {
        if [type] == "test-interface" {
                grok {
                        patterns_dir => [ "/opt/logstash/patters" ]
                        match => { "message" => "[%{DATETIME:log_timestamp}] [%{DATA:ThreadName}] %{LOGLEVEL:logLevel}  [%{DATA:IP}] [%{DATA:InterfaceTag}] - [%{DATA:Protocol}] [%{DATA:LogType}] - %{GREEDYDATA:jsonMsg2}" }
                }
                json {
                        source => "jsonMsg2"
                        remove_field => [ "jsonMsg2","message" ]
                }
                mutate {
                        convert => [ "lastCost","float" ]
                        split => ["IP",", "]
                        add_field => { "clientIp" => "%{[IP][0]}" }
                        add_field => { "proxyIp" => "%{[IP][1]}" }
                        add_field => { "time" => "%{[IP][2]}" }
                }
                geoip {
                        source => "clientIp"
                        #database => "/opt/logstash-interface/Geoip/GeoLite2-City_20191022/GeoLite2-City.mmdb"
                }
                }
                ruby {
                code => "event.timestamp.time.localtime"
                }
}

output {
	elasticsearch {
		hosts => ["172.27.27.220:9200","172.27.27.221:9200","172.27.27.222:9200"]
		index => "logstash-test-interface-%{+YYYY.MM.dd}"
		template_overwrite => true
	}
}

免责声明:文章转载自《logstash解析嵌套json格式数据》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇java 后台通过IO流把文件传到前端并下载asp.net 多个域名重定向,在web.Config中配置下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

使用logstash同步Mysql数据表到ES的一点感悟

针对单独一个数据表而言,大致可以分如下两种情况: 1.该数据表中有一个根据当前时间戳更新的字段,此时监控的是这个时间戳字段 具体可以看这个文章:https://www.cnblogs.com/sanduzxcvbnm/p/12858967.html 示例: modification_time就是表中要监控的时间戳字段 input { jdbc {...

ELK+Kafka 企业日志收集平台(一)

背景: 最近线上上了ELK,但是只用了一台Redis在中间作为消息队列,以减轻前端es集群的压力,Redis的集群解决方案暂时没有接触过,并且Redis作为消息队列并不是它的强项;所以最近将Redis换成了专业的消息信息发布订阅系统Kafka, Kafka的更多介绍大家可以看这里:传送门  ,关于ELK的知识网上有很多的哦, 此篇博客主要是总结一下目前线...

logstash过滤器插件filter详解及实例

1、logstash过滤器插件filter 1.1、grok正则捕获 grok是一个十分强大的logstash filter插件,他可以通过正则解析任意文本,将非结构化日志数据弄成结构化和方便查询的结构。他是目前logstash 中解析非结构化日志数据最好的方式 grok的语法规则是: %{语法:语义} “语法”指的是匹配的模式。例如使用NUMBER模式可...

RTP协议之Header结构解析

实时传输协议 RTP,RTP 提供带有实时特性的端对端数据传输服务,传输的数据如:交互式的音频和视频。那些服务包括有效载荷类型定义,序列号,时间戳和传输监测控制。应用程序在 UDP 上运行 RTP 来使用它的多路技术和 checksum 服务。2 种协议都提供传输协议的部分功能。不过,RTP 可能被其他适当的下层网络和传输协议使用。如 果下层网络支持,R...

logstash开机启动

创建用户 groupadd elk #添加组 useradd elk -g elk #添加用户,并加入到之前创建的组 passwd elk # 修改密码,按提示输入两次即可 赋权限 chown -R elk /usr/local/elk/ 创建logstash.service vi /etc/systemd/system/logstash.servic...

从零搭建 ES 搜索服务(二)基础搜索

一、前言 上篇介绍了 ES 的基本概念及环境搭建,本篇将结合实际需求介绍整个实现过程及核心代码。 二、安装 ES ik 分析器插件 2.1 ik 分析器简介 GitHub 地址:https://github.com/medcl/elasticsearch-analysis-ik 提供两种分词模式:「 ik_max_word 」及「 ik_smart 」...