Logstash过滤插件

摘要:
raise"Invalidformat"endret["sex"]=itembeg=msg.indexifbeg==nilraise"Invalidformat"endmsg[beg..-1].split().each{|item|key,value=item.split("=")ret[key]=value}$people.push}$result["peoples"]=$peopleputs$resultend#rubyruby.rb{"peoples"=˃[{"sex"=˃"MAN","name"=˃"fwd","age"=˃"12"},{"sex"=˃"WONMEN","name"=˃"xb","age"=˃"10"}]}将ruby脚本放入Logstash的filter插件中#vimruby.confinput{stdin{}}filter{ruby{code=˃'$result=Hash.new$people=[]beginmsgs=event.getmsgs.split("#").each{|msg|#分割后的字符串样例=˃[MAN]name=fwdage=12ret=Hash.new#匹配头部的[MAN]或[WONMEN]item=msg[/(?

filter初级

Logstash安装

### 设置YUM源
# rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
# tee /etc/yum.repos.d/elastic.repo << EOF
[logstash-5.x]
name=Elastic repository for 5.x packages
baseurl=https://artifacts.elastic.co/packages/5.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
# yum install -y logstash

基本使用

# tee filter.conf << EOF
input {
    stdin {
    }
}
filter {
    mutate {
        split => ["message", "|"]
    }
}
output {
    stdout {
    }
}
EOF

# /usr/share/logstash/bin/logstash -f filter.conf --path.settings /etc/logstash
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
The stdin plugin is now waiting for input:
12|fwd|343|dd
2017-09-18T01:35:03.342Z dnode [12, fwd, 343, dd]

ruby语法基本使用

# tee filter.conf << EOF
input {
    stdin {
    }
}
filter {
    mutate {
        split => ["message", "|"]
    }
    ruby {
        code => '
          msgs = event.get("message")
          puts msgs.length
        '
    }
}
output {
    stdout {
      codec => "rubydebug"
    }
}
EOF

# /usr/share/logstash/bin/logstash -f ruby.conf --path.settings /etc/logstash
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
The stdin plugin is now waiting for input:
r|g
2
{
      "@version" => "1",
          "host" => "dnode",
    "@timestamp" => 2017-09-18T09:06:12.546Z,
       "message" => [
        [0] "r",
        [1] "g"
    ]
}

filter高级用法

grok插件

  • 自定义正则: 将需要提取的正则表达式用()括起来,然后使用?<tag_name>的固定语法格式给匹配项打上标签
  • 内置正则: 使用%{WORD:tag_name}内置正则地址

如果想要给一串很长的字符的很多字段都打上标签,即多个自定义组合的情况,那么正则必须能完全匹配整个字符串(可以使用.*的方式跳过不关心的字段)

在线测试地址

Logstash过滤插件第1张

ruby插件

### 1. 先实现rb脚本,输入从变量读取,输出也保存到变量
### 2. 脚本的输入由变量改成event.get("name")
### 3. 脚本的输出由变量改成event.set("name", $value)

举例

样例字符串一

[NEW] tcp

  • 使用grok内置正则

Logstash过滤插件第2张

  • 自定义正则

Logstash过滤插件第3张

Logstash过滤插件第4张

样例字符串二

[MAN] name=fwd age=12#[WONMEN]name=xb age=10

将字符串转换成JSON

### 编写rb脚本实现所需功能
# vim ruby.rb
$result = Hash.new
$people = []
begin
  msgs = "[MAN] name=fwd age=12#[WONMEN]name=xb age=10"
  msgs.split("#").each { |msg|
    ret = Hash.new
    item = msg[/(?<=[)MAN(?=])|(?<=[)WONMEN(?=])/]
    if item.empty?
      raise "Invalid format"
    end
    ret["sex"] = item

    beg = msg.index("name")
    if beg == nil
      raise "Invalid format"
    end
    msg[beg..-1].split().each { |item|
      key, value = item.split("=")
      ret[key] = value
    }
    $people.push(ret)
  }
$result["peoples"] = $people
puts $result
end

# ruby ruby.rb
{"peoples"=>[{"sex"=>"MAN", "name"=>"fwd", "age"=>"12"}, {"sex"=>"WONMEN", "name"=>"xb", "age"=>"10"}]}

将ruby脚本放入Logstash的filter插件中

# vim ruby.conf
input {
    stdin {
    }
}
filter {
    ruby {
        code => '
          $result = Hash.new
          $people = []
          begin
            msgs = event.get("message")
            msgs.split("#").each { |msg|
              # 分割后的字符串样例 => [MAN] name=fwd age=12
              ret = Hash.new
              # 匹配头部的[MAN]或[WONMEN]
              item = msg[/(?<=[)MAN(?=])|(?<=[)WONMEN(?=])/]
              if item.empty?
                raise "Invalid format"
              end
              ret["sex"] = item

              # 获取从name到结束的字符串 => name=fwd age=12
              beg = msg.index("name")
              if beg == nil
                raise "Invalid format"
              end
              msg[beg..-1].split().each { |item|
                # 分割后的字符串样例 => name=fwd
                key, value = item.split("=")
                ret[key] = value
              }
              $people.push(ret)
            }
            $result["peoples"] = $people
            event.set("message", $result)
            event.set("[@metadata][drop]", false)
          rescue
            puts $!
            event.set("[@metadata][drop]", true)
          end
        '
    }
}
output {
    if ![@metadata][drop] {
        stdout {
          codec => rubydebug
        }
    }
}

# /usr/share/logstash/bin/logstash -f ruby.conf --path.settings /etc/logstash
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
The stdin plugin is now waiting for input:
[MAN] name=fwd age=12#[WONMEN]name=xb age=10

{
      "@version" => "1",
          "host" => "dnode",
    "@timestamp" => 2017-09-20T08:40:26.293Z,
       "message" => {
        "peoples" => [
            [0] {
                "name" => "fwd",
                 "age" => "12",
                 "sex" => "MAN"
            },
            [1] {
                "name" => "xb",
                 "age" => "10",
                 "sex" => "WONMEN"
            }
        ]
    }
}

参考文档

Logstash实践
关于Logstash中grok插件的正则表达式例子
elastic文档
elastic插件文档

免责声明:文章转载自《Logstash过滤插件》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇C#开发Android应用实战 读后感Netty入门下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Logstash消费Kafka输出至Elasticsearch配置文件示例

  input { kafka { bootstrap_servers => "192.168.32.36:9092,192.168.32.37:9092,192.168.32.38:9092" topics => "msa-log-prod" codec => "js...

logstash收集日志并写入Redis再到es集群

redis做数据缓存 图形架构: 环境准备 172.31.2.101 es1 + kibana 172.31.2.102 es2 172.31.2.103 es3 172.31.2.104 logstash1 172.31.2.105 logstash2 172.31.2.106 Redis 172.31.2.107 web1 安装redis [roo...

使用ElasticSearch服务从MySQL同步数据实现搜索即时提示与全文搜索功能

最近用了几天时间为公司项目集成了全文搜索引擎,项目初步目标是用于搜索框的即时提示。数据需要从MySQL中同步过来,因为数据不小,因此需要考虑初次同步后进行持续的增量同步。这里用到的开源服务就是ElasticSearch。 ElasticSearch是一个非常好用的开源全文搜索引擎服务,同事推荐之前我并没有了解过,但是看到亚马逊专门提供该服务的实例,没有...

ELK 架构之 Logstash 和 Filebeat 安装配置

上一篇:ELK 架构之 Elasticsearch 和 Kibana 安装配置 阅读目录: 1. 环境准备 2. 安装 Logstash 3. 配置 Logstash 4. Logstash 采集的日志数据,在 Kibana 中显示 5. 安装配置 Filebeat 6. Filebeat 采集的日志数据,在 Kibana 中显示 7. Filebeat...

设置高级的Logstash 管道

设置高级的Logstash 管道: 一个Logstash 管道在很多实用例子有一个或者多个输入,filter,和output 插件。 本节中 创建Logstash 配置文件来指定那些插件和讨论每个插件做什么 Logstash 配置文件定义你的Logstash 管道。当你开始一个Logstash 例子,实用 -f <path/to/file&g...

logstash 读取MySQL数据到elasticsearch 相差8小时解决办法

logstash和elasticsearch是按照UTC时间的,kibana却是按照正常你所在的时区显示的,是因为kibana中可以配置时区信息。 具体看这个: logstash 的配置文件添加 filter { ruby { code => "event.set('timestamp', event.get('@times...