使用logstash同步Mysql数据表到ES的一点感悟

摘要:
useUnicode=true&characterEncoding=UT8&serverTimezone=UTC“jdbc_user=˃”root“jdbc_password=˃”xxxxx“jdbc_driver_library=˃”/opt/elasticsearch/lib/mysql-connector-java-8.0.20.jar“jdbc-driver_class=˃”com.mysql.cj.jdbc.driver.driver”jdbc_paging_enabled=˃“true”jdbc-page_size=˃“500”use_column_value=˃“true”record_last_run=˃“true“tracking_column=˃”id“last_run_metadata_path=˃“/opt/logstash/bin/logstash_xxy/cxxInfo“#存储监控clean_run的id字段值=˃”false“statement=˃”select*friends_tablewhereid˃:sql_last_value“schedule=˃”*****“type=˃”es_Table“}或起始值jdbc{jdbc_connection_string=˃”jdbc:mysql://192.168.0.145:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC“jdbc_user=˃”root“jdbc_password=˃”root”jdbc_validate_connection=˃truejdbc_driver_library=˃“”jdbc_diver_class=˃“com.mysql.cj.jdbc.driver”parameters=˃{“id”=˃“1”}statement=˃“SELECT*FROMes_tableWHEREid˃:id”type=˃“es_table”}要同步多个数据表,需要在jdbc中添加一个类型字段{},如上所示。但是,在第二个示例中,指定存储id值的文件不能相同,即last_run_metadata_path的值可以引用以下示例:input{jdbc{jdbc_connection_string=˃“jdbc:mysql://192.168.0.145:3306/db_example?

针对单独一个数据表而言,大致可以分如下两种情况:
1.该数据表中有一个根据当前时间戳更新的字段,此时监控的是这个时间戳字段
具体可以看这个文章:https://www.cnblogs.com/sanduzxcvbnm/p/12858967.html
示例:
modification_time就是表中要监控的时间戳字段

input {

  jdbc {   
    jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
    jdbc_user => "root"
    jdbc_password => "root"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_driver_library => ""
    jdbc_paging_enabled => true
    jdbc_page_size => "1000"
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
    type => "es_table"
  }
  
}

2.该数据表中没有根据当前时间戳更新的字段,此时监控的是这个表中的其他字段,比如id字段或者其他
参考一下这个文档:https://www.cnblogs.com/sanduzxcvbnm/p/12858474.html
db_example是数据库名,es_table是数据表名

jdbc {

  jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
  jdbc_user => "root"
  jdbc_password => "xxxxx"
  jdbc_driver_library => "/opt/elasticsearch/lib/mysql-connector-java-8.0.20.jar"
  jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "500"
  use_column_value => "true"
  record_last_run => "true"
  tracking_column => "id"
  last_run_metadata_path => "/opt/logstash/bin/logstash_xxy/cxx_info" # 存储监控的id字段值
  clean_run => "false"
  statement => "select * from es_table where id > :sql_last_value"
  schedule => "* * * * *"
  type => "es_table"

}

或者如下这种直接指定id的起始值

    jdbc {
       jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
       jdbc_user => "root"
       jdbc_password => "root"
       jdbc_validate_connection => true
       jdbc_driver_library => ""
       jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
       parameters => { "id" => "1" }
       statement => "SELECT * FROM es_table WHERE id > :id"
       type => "es_table"
    }
同步多个数据表

需要在jdbc{}中新增一个type字段,比如上面所示,但是在第二个示例中,指定存储id值的文件不能是同一个,也就是last_run_metadata_path的值

可以参考如下示例:

input {
  jdbc {   
    jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
    jdbc_user => "root"
    jdbc_password => "root"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_driver_library => ""
    jdbc_paging_enabled => true
    jdbc_page_size => "1000"
    record_last_run => "true"   
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
    type => "es_table"
  }
  
  
  jdbc {   
    jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
    jdbc_user => "root"
    jdbc_password => "root"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_driver_library => ""
    jdbc_paging_enabled => true
    jdbc_page_size => "1000"
    record_last_run => "true"   
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table1 WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
    type => "es_table1"
  }
}

filter {
  ruby {
     code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
  }
  ruby {
     code => "event.set('@timestamp',event.get('timestamp'))"
  }
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs","timestamp"]
  }
  
}
output {
 if [type]=="es_table" {
    elasticsearch {
        hosts => ["192.168.75.21:9200"]
     	index => "es_table_idx"
     	document_id => "%{[@metadata][_id]}"
        user => "elastic"
        password => "GmSjOkL8Pz8IwKJfWgLT"
    }
  }
 if [type]=="es_table1" {
    elasticsearch {
        hosts => ["192.168.75.21:9200"]
     	index => "es_table1_idx"
     	document_id => "%{[@metadata][_id]}"
        user => "elastic"
        password => "GmSjOkL8Pz8IwKJfWgLT"
    }
  }
}

免责声明:文章转载自《使用logstash同步Mysql数据表到ES的一点感悟》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇List集合流处理类型小结服务激活工具 ActivatorUtilities下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

FFMpeg笔记(八) 读取RTP数据时的PTS计算[转载]

结构体struct RTPDemuxContext中有若干时间戳相关的成员,含义如下 timestamp:上一个接收到的RTP时间戳base_timestamp:第一个接收到的RTP时间戳cur_timestamp:未知unwrapped_timestamp:假如rtp时间没有32位溢出的话,当前的rtp时间应该是多少range_start_offset...

MySQL-基础

SQL 是一门特殊的语言,专门用来操作关系数据库,不区分大小写 服务器端 安装服务器端:在终端中输入如下命令,回车后,然后按照提示输入 sudo apt-get install mysql-server 启动服务   sudo service mysql start 查看进程中是否存在mysql服务   ps ajx|grep mysql 停止服务   s...

ORACLE DBA命令

1 运行SQLPLUS工具sqlplus 2 以OS的默认身份连接/ as sysdba 3 显示当前用户名show user 4 直接进入SQLPLUS命令提示符sqlplus /nolog 5 在命令提示符以OS身份连接connect / as sysdba 6 以SYSTEM的身份连接connect system/xxxxxxx@服务名 7 显示当然...

MYSQL 导出excel bigint 精度丢失

  导出后: 变成了了0000 结尾 解决方案:导出时,通过查询导出 select *,concat(" ",id) as id from `gs_courses11.24` LIMIT 10   Ok啦:   文章来源:刘俊涛的博客欢迎关注公众号、留言、评论,一起学习。 _______________________________...

【服务后端】MySQL数据库与Django Models不一致问题解决

1 背景 1.8 Django的APP目录下有makemigrations文件夹,这个文件夹中的文件误删除过,后面从其他电脑的工程目录中拷贝了过来。 进行了python manage.py makemigrations和python manage.py migrate操作。 由于前后的makemigrations的文件不一致,导致数据库中的字段与Djang...

mysql字符串 转 int-double CAST与CONVERT 函数的用法

MySQL 的CAST()和CONVERT()函数可用来获取一个类型的值,并产生另一个类型的值。两者具体的语法如下: CAST(value as type); CONVERT(value, type); 就是CAST(xxx AS 类型), CONVERT(xxx,类型)。 Sql代码 mysql>SELECTCAST('3.35'ASsign...