【原创】大数据基础之Logstash(3)应用之file解析(grok/ruby/kv)

摘要:
从nginx日志中进行url解析/v1/test?:%{NUMBER:bytes}|-)%{QS:referrer}%{QS:agent}%{QS:x_forward_for}"}pattern_definitions=˃{"PARAMVALUE"=˃"[^&]*"}}urldecode{all_fields=˃true}date{match=˃["access_time_raw","dd/MMM/yyyy:HH:mm:ssZ"]target=˃"access_time_tmp"}ruby{code=˃"event.setevent.set"}if[send_time_raw]{date{match=˃["send_time_raw","yyyy-MM-ddHH:mm:ss"]target=˃"send_time_tmp"timezone=˃"UTC"}ruby{code=˃"event.set"}}mutate{remove_field=˃["message","ident","auth","verb","bytes","reponse","x_forward_for","http_version","access_time_raw","access_time_tmp","path","response","send_time_raw","send_time_tmp"]}}else{drop{}}}output{if[param1]and[param2]and[param3]and"_grokparsefailure"notin[tags]{stdout{codec=˃json}}}注意:1)对url的参数名和位置硬编码,不灵活2)使用自定义pattern:PARAMVALUE3)一定要使用urldecode,否则time得到的value为2019-03-18%2017%3A34%3A14,logstash中date插件使用joda解析pattern会报错,因为含有字母A;4)如果time为空,则使用access_time;5)不匹配的记录drop掉;6)只有满足条件的记录才会被output;7)在filter和output中使用if-else定义分支;8)date插件要注意timezone,否则会按照时区偏移;2使用grok+rubyinput{file{path=˃["/var/log/nginx/access.log"]start_position=˃"beginning"}}filter{if[message]=~/test/{grok{match=˃{"message"=˃"%{IPORHOST:client_ip}[%{HTTPDATE:access_time_raw}]"(?

从nginx日志中进行url解析

/v1/test?param2=v2&param3=v3&time=2019-03-18%2017%3A34%3A14
->
{'param1':'v1','param2':'v2','param3':'v3','time':'2019-03-18 17:34:14'}

nginx日志示例:

1.119.132.168 - - [18/Mar/2019:09:13:50 +0000] "POST /param1/test?param2=1&param3=2&time=2019-03-18%2017%3A34%3A14 HTTP/1.1" 200 929 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "-"

1 使用grok
input {

file {

path => [ "/var/log/nginx/access.log"]
        start_position => "beginning"
    }
}
filter {
  if [message] =~ /test/{
    grok {
        match => { "message" => "%{IPORHOST:client_ip} (%{USER:ident}|-) (%{USER:auth}|-) [%{HTTPDATE:access_time_raw}] "(?:%{WORD:verb} (/%{PARAMVALUE:param1}/test?param2=%{PARAMVALUE:param2}&param3=%{PARAMVALUE:param3}&time=%{PARAMVALUE:send_time_raw})(?: HTTP/%{NUMBER:http_version})?|-)" (%{NUMBER:response}|-) (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent} %{QS:x_forward_for}"}
        pattern_definitions => { "PARAMVALUE" => "[^& ]*"}
    }
    urldecode {
        all_fields =>true 
    }
    date {
        match => [ "access_time_raw","dd/MMM/yyyy:HH:mm:ss Z"]
        target => "access_time_tmp"
} ruby { code => "event.set('access_time', (event.get('access_time_tmp').to_i * 1000000).to_s) event.set('send_time', event.get('access_time'))" } if[send_time_raw] { date { match => [ "send_time_raw","yyyy-MM-dd HH:mm:ss"] target => "send_time_tmp"
timezone => "UTC"
} ruby { code => "event.set('send_time', (event.get('send_time_tmp').to_i * 1000000).to_s)" } } mutate { remove_field => ["message", "ident", "auth", "verb", "bytes", "reponse", "x_forward_for", "http_version", "access_time_raw", "access_time_tmp", "path", "response", "send_time_raw", "send_time_tmp"] } } else{ drop {} } } output { if [param1] and [param2] and [param3] and "_grokparsefailure" not in[tags] { stdout {codec =>json} } }

注意:
1)对url的参数名和位置硬编码,不灵活
2)使用自定义pattern:PARAMVALUE
3)一定要使用urldecode,否则time得到的value为2019-03-18%2017%3A34%3A14,logstash中date插件使用joda解析pattern会报错,因为含有字母A;
4)如果time为空,则使用access_time;
5)不匹配的记录drop掉;
6)只有满足条件的记录才会被output;
7)在filter和output中使用if-else定义分支;
8)date插件要注意timezone,否则会按照时区偏移;

2 使用grok+ruby

input {
file {
path => [ "/var/log/nginx/access.log" ]
start_position => "beginning"
}
}


filter {
  if [message] =~ /test/{
    grok {
        match => { "message" => "%{IPORHOST:client_ip} (%{USER:ident}|-) (%{USER:auth}|-) [%{HTTPDATE:access_time_raw}] "(?:%{WORD:verb} (%{URIPATHPARAM:request}|-)(?: HTTP/%{NUMBER:http_version})?|-)" (%{NUMBER:response}|-) (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}"}
    }
    urldecode {
        all_fields =>true
    }
    date {
        match => [ "access_time_raw","dd/MMM/yyyy:HH:mm:ss Z"]
        target => "access_time_tmp"
} ruby { code => "event.set('access_time', (event.get('access_time_tmp').to_i * 1000000).to_s) event.set('send_time', event.get('access_time'))" } if[request] { ruby {
init => "
def convertName(name)
result = ''
name.each_char{|ch| result += (if ch < 'a' then '_' + ch.downcase else ch end)}
result
end
"
code
=> " event.set('param1', event.get('request').split('?')[0].split('/')[1]) pairs = event.get('request').split('?')[1].split('&') pairs.each{ |item| arr=item.split('='); event.set(arr[0], arr[1])} " } if[time] { date { match => [ "time","yyyy-MM-dd HH:mm:ss"] target => "send_time_tmp"
timezone => "UTC"
} ruby { code => "event.set('send_time', (event.get('send_time_tmp').to_i * 1000000).to_s)" } } } mutate { remove_field => ["message", "ident", "auth", "verb", "bytes", "reponse", "x_forward_for", "http_version", "access_time_raw", "access_time_tmp", "path", "response", "time", "send_time_tmp"] } } else{ drop {} } } output { if [param1] and [param2] and [param3] and "_grokparsefailure" not in[tags] { stdout {codec =>json} } }

注意:
1)直接使用默认的nginx日志的grok pattern;
2)在ruby中直接按照key=value进行解析,更灵活;
3)自定义函数;

logstash的ruby代码中getter和setter必须使用代码,比如event.get('field'),不能使用event['field'],因为

[2019-03-19T17:15:32,729][ERROR][logstash.filters.ruby ] Ruby exception occurred: Direct event field references (i.e. event['field'] = 'value') have been disabled in favor of using event get and set methods (e.g. event.set('field', 'value')). Please consult the Logstash 5.0 breaking changes documentation for more details.

3 使用grek+kv
input {
    file {
        path => [ "/data/tmp/access.log"]
        start_position => "beginning"
    }
}
filter{
  if [message] =~ /dataone/u1/{
    grok {
        match => { "message" => "%{IPORHOST:client_ip} (%{USER:ident}|-) (%{USER:auth}|-) [%{HTTPDATE:access_time_raw}] "(?:%{WORD:verb} (%{URIPATHPARAM:request}|-)(?: HTTP/%{NUMBER:http_version})?|-)"(%{NUMBER:response}|-) (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}"}
    }
    kv {
      source => "request"
      field_split => "&?"
      value_split => "="
    }
    urldecode {
        all_fields => true
    }
    date {
        match => [ "access_time_raw","dd/MMM/yyyy:HH:mm:ss Z"]
        target => "access_time_tmp"
    }
    ruby {
        code => "event.set('access_time', (event.get('access_time_tmp').to_i * 1000000).to_s)
                event.set('send_time', event.get('access_time'))"
    }
    if[send_time_raw] {
      date {
          match => [ "send_time_raw","yyyy-MM-dd HH:mm:ss"]
          target => "send_time_tmp"
      }
      ruby {
          code => "event.set('send_time', (event.get('send_time_tmp').to_i * 1000000).to_s)"
      }
    }
    mutate {
        remove_field => ["message", "ident", "auth", "verb", "bytes", "reponse", "x_forward_for", "http_version", "access_time_raw", "access_time_tmp", "path", "response", "send_time_raw", "send_time_tmp"]
    }
  } else{
    drop {}
  }
}

参考:https://www.elastic.co/guide/en/logstash/current/plugins-filters-kv.html

免责声明:文章转载自《【原创】大数据基础之Logstash(3)应用之file解析(grok/ruby/kv)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇手把手教你反编译别人的APP自动化测试框架为什么选择 Pytest,而不是 Robot Framework?下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

logstash配置文件详解

logstash配置文件详解 logstash pipeline 包含两个必须的元素:input和output,和一个可选元素:filter。 从input读取事件源,(经过filter解析和处理之后),从output输出到目标存储库(elasticsearch或其他)。 在生产环境使用logstash,一般使用都将配置写入文件里面,然后启动logst...

mysql日期 武胜

CURDATE() 返回当前日期如1970-01-01 CURTIME() 返回当前时间如00:00:00 NOW() 返回当前日期时间如1970-01-01 00:00:00 DATE(合法日期时间) 获取日期如1970-01-01 EXTRACT(单位 FROM 合法日期时间) 将日期转为指定单位 select EXTRACT(YEAR FROM `...

logstash收集日志并写入Redis再到es集群

redis做数据缓存 图形架构: 环境准备 172.31.2.101 es1 + kibana 172.31.2.102 es2 172.31.2.103 es3 172.31.2.104 logstash1 172.31.2.105 logstash2 172.31.2.106 Redis 172.31.2.107 web1 安装redis [roo...

ELK(elasticsearch+logstash+kibana)入门到熟练-从0开始搭建日志分析系统教程

#此文篇幅较长,涵盖了elk从搭建到运行的知识,看此文档,你需要会点linux,还要看得懂点正则表达式,还有一个聪明的大脑,如果你没有漏掉步骤的话,还搭建不起来elk,你来打我。ELK使用elasticsearch+logstash+kibana三个开源插件实现,logstash负责收取日志信息,并将收取到的日志信息进行过滤,格式化,后存储到elasti...

logstash 命令行配置选项

命令行标记 Logstash有以下标记,你可以使用--help标志来显示此信息。 --node.name NAME 指定此Logstash实例的名称,如果没有赋值,它将默认为当前主机名。 -f, --path.config CONFIG_PATH 从特定的文件或目录加载Logstash配置,如果给定一个目录,则该目录中的所有文件将以字典顺序连接,然后作...

【转】 PostgreSQL数据类型

第六章数据类型 6.1概述 PostgreSQL提供了丰富的数据类型。用户可以使用CREATE TYPE命令在数据库中创建新的数据类型。PostgreSQL的数据类型被分为四种,分别是基本数据类型、复合数据类型、域和伪类型。 基本数据类型是数据库内置的数据类型,包括integer、char、varchar等数据类型。表6-1列出了PostgreSQL提供的...