记一次docker安装logstash,并且读取数据库数据到es

摘要:
Docker安装logstash。hub.docker官方网站上没有示例。回想一下在没有docker的情况下安装的logstash,经过比较,我有了大致的了解。这是版本5的conf文件夹内容。这是版本6的conf文件夹内容。这次,我想将数据库数据读入es并上传一个mysql驱动程序jar文件。它仍然不起作用。然后读取错误行上方的语句,并在logstash读取由驼峰命名的T时强制将其读取为小写T。据估计,默认情况下,logstash会将所有查询字段转换为小写。last_run_metadata_Path=˃“./last_record/logstash_users_last_time”这是存储上次更新时间的文件。到目前为止,我的logstash将数据库数据读取到es。

docker安装logstash,在hub.docker官网是没有示例的。查了文章,大部分复制黏贴,语焉不详的。看着懵,虽然经过复制黏贴操作启起来了,但还是很多不理解。回想下不用docker安装的logstash,对比了下大致有点理解了。可自己配置run,还是启动没一会自动停止了。懊恼不已。

刚才仔细对比,小心求证发现了问题所在。貌似logstash启动要使用交互模式,即启动语句里要加上 -it。否则就会启动后停止。

另外查看资料,官网说的需要配置两个文件,有一个叫pipelines.yml的。

按照说明配置了几次都有问题,看启动日志,貌似自己配的  pipelines文件都没有被读取。后来发现是因为版本问题。我自己用的5.6.12版本不存在此文件的。而官网在5版本也说要配置这个,我就纳闷了。下载了6版本的logstash,发现conf文件夹下存在这么个文件。这下就明了了,我的5.6.12只需要只需要写一个conf后缀的文件即可,只有6版本的才需要另外配置管道文件pipelines,来指定需要读取哪个conf文件。

这是5版本的conf文件夹内容

记一次docker安装logstash,并且读取数据库数据到es第1张

这是6版本的conf文件夹下内容:

记一次docker安装logstash,并且读取数据库数据到es第2张

我此次是要读取数据库的数据到es中,

 上传了一个mysql的驱动jar文件。自定义一个mysql01.conf文件:

input {
  jdbc {
    jdbc_driver_library => "/home/kf/soft/logstash-5.6.12/config/mysql/mysql-connector-java-5.1.25.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.0.4:3306/test?serverTimezone=Asia/Shanghai&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "root"
    schedule => "* * * * *"
    statement => "SELECT id,username,password,password_salt,status,insertTime,updateTime FROM users WHERE updateTime >= :sql_last_value"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./last_record/logstash_users_last_time"
    type => "users"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }
}
output {
    elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.88.130:9200"]
        # 索引名称 可自定义
        index => "users"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{id}"
        document_type => "users"
    }
    stdout {
        # JSON格式输出
        codec => json_lines
    }
}

因为elk是在虚拟机搭建的,而mysql是我本机windows的,一开始链接报错,链接被拒绝。mysql是默认只允许在本地链接的,需要在mysql更改权限:

例如,你想myuser使用mypassword从任何主机连接到mysql服务器的话。
%代表允许所有域的连接
GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'%' IDENTIFIED BY 'mypassword' WITH GRANT OPTION;
要是本机连mysql连不上,再添加localhost访问权限,本机就可以登录了
grant all privileges on luffy.* to 'luffy'@'localhost' identified by 'luffy';
设置完有权限限制的账号后一定要刷新权限,如果没刷新权限,该终端无法被通知,当然也可以直接重启cmd
FLUSH PRIVILEGES; 

 此时可以链接mysql了。但是一直报错,提示:

记一次docker安装logstash,并且读取数据库数据到es第3张

而我的数据库中insertTime,updateTime字段都是这样的,驼峰命名。百度了下,有说需要把配置文件mysql01.conf中的tracking_column => "updateTime",updateTime用双引号括起来的,我本身就是括起来的,有说需要把查询字段放在select语句中的,我也放了。还是不行,然后看报错行上方的语句,logstash读取时强制把驼峰命名的T读成了小写t。然后查资料在mysql01配置文件中改加了一句

# 是否将 字段(column) 名称转小写
lowercase_column_names => false

这时候可以读取了。估计是logstash默认把查询字段全部转小写了。

last_run_metadata_path => "./last_record/logstash_users_last_time"

这个是存储最后更新时间的文件的。开始没有创建last_record文件夹启动会报找不到文件,此时需要手工创建一下即可。

至此,我的logstash读取数据库数据到es完成。

在测试环境下配置连接的mysql也是docker容器的。

启动logstash容器是需要指定配置文件,命令是:

docker run -d -p 5044:5044 -p 9600:9600 -it --name logstash --network ELS -v /home/smartcity/logstash/config/:/usr/share/logstash/config/  logstash:5.6.13 -f /usr/share/logstash/config/union_blueplus.conf

启动后,logstash日志报错:

Unable to connect to database. Trying again {:error_message=>"Java::ComMysqlJdbcExceptionsJdbc4::CommunicationsException: Communications link failure The last packet successfully received from the server was 5,867 milliseconds ago.  The last packet sent successfully to the server was 5,867 milliseconds ago."}

查了好久,很多说是要该mysql配置的,明显不合理。到墙外看到将jdbc插件的,我也查了版本,是最新的4.3.13版本。

最后跟网上的做对比,排查出是因为url地址多加了ssl的参数导致的

多表导入:

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/union_blueplus?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
    #&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "Tsl@2018"
    #使用前验证连接是否有效
    jdbc_validate_connection => true
    #多久进行连接有效验证(4小时)
    jdbc_validation_timeout => 14400
    #连接失败后最大重试次数
    connection_retry_attempts => 50
    #连接失败后重试时间间隔
    connection_retry_attempts_wait_time => 1
    jdbc_page_size => "2000"    
    schedule => "* * * * *"
    statement => "select sal.alarmID, sal.villageID, 
            sal.deviceType,sal.alarmTypeName,sal.modelID,
            sal.alarmLevel,sal.alarmState,sal.alarmTime,
            sal.alarmContent,de.installAddr,sal.updateTime 
            from e_sense_alarm_log sal left join e_device de on de.deviceID = sal.deviceID 
            WHERE sal.updateTime >= :sql_last_value"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./last_record/logstash_alarm_last_time"
    type => "alarm"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }
  
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/union_blueplus?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
    #&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "Tsl@2018"
    #使用前验证连接是否有效
    jdbc_validate_connection => true
    #多久进行连接有效验证(4小时)
    jdbc_validation_timeout => 14400
    #连接失败后最大重试次数
    connection_retry_attempts => 50
    #连接失败后重试时间间隔
    connection_retry_attempts_wait_time => 1
    jdbc_page_size => "2000"    
    schedule => "* * * * *"
    statement => "select de.deviceID, de.isDelete, de.villageID as villageid, de.installAddr as installadd, de.type as devicetype,
            de.buildingID,bu.buildingNo as  buildingno, bu.name as buildingName, de.productModel as productmodel, de.name as deviceName, de.code as code,
            de.state,  de.updateTime as updatetime from e_device de left join b_building bu on de.buildingID = bu.buildingID 
            WHERE de.updateTime >= :sql_last_value"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./last_record/logstash_device_last_time"
    type => "device"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }
  
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/union_blueplus?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
    #&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "Tsl@2018"
    #使用前验证连接是否有效
    jdbc_validate_connection => true
    #多久进行连接有效验证(4小时)
    jdbc_validation_timeout => 14400
    #连接失败后最大重试次数
    connection_retry_attempts => 50
    #连接失败后重试时间间隔
    connection_retry_attempts_wait_time => 1
    jdbc_page_size => "2000"    
    schedule => "* * * * *"
    statement => "select  al.accessLogID, al.villageID as villageid, al.peopleName as peoplename, peo.gender, peo.phoneNo as phoneno,    
                  al.credentialNo as credentialno, lab.name as peoplelabel, bu.buildingNo as buildingno, al.buildingID as buildingid, 
                  al.cardNo as cardno, al.openType as opentype, al.updateTime as opentime                
                  from e_access_log  al
                  left join p_people peo on peo.credentialNo =al.credentialNo
                  left join p_people_label pl on pl.peopleID = peo.peopleID 
                  left join s_label lab on lab.labelID = pl.labelID
                  left join b_building bu on bu.buildingID = al.buildingID  
                WHERE al.updateTime >= :sql_last_value"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./last_record/logstash_accessLog_last_time"
    type => "accessLog"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }
  
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/union_blueplus?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
    #&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "Tsl@2018"
    #使用前验证连接是否有效
    jdbc_validate_connection => true
    #多久进行连接有效验证(4小时)
    jdbc_validation_timeout => 14400
    #连接失败后最大重试次数
    connection_retry_attempts => 50
    #连接失败后重试时间间隔
    connection_retry_attempts_wait_time => 1
    jdbc_page_size => "2000"    
    schedule => "* * * * *"
    statement => "select fl.faceLogID,io.type as faceinouttype, io.villageID as villageid, io.ioID as ioid,
                  fl.personType as persontype, peo.peopleName as peoplename, peo.phoneNo as phoneno,
                  peo.credentialNo as credentialno,  sl.name as peoplelabel, fl.updateTime as                   facecapturetime
                  from e_face_log  fl 
                  left join b_in_out io on io.ioID = fl.ioID
                  left join p_people peo on peo.credentialNo = fl.credentialNo
                  left join p_people_label pl on pl.peopleID = peo.peopleID 
                  left join s_label sl on sl.labelID = pl.labelID
                  where  fl.updateTime >= :sql_last_value  and fl.faceSource = 0"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./last_record/logstash_wkface_last_time"
    type => "wkface"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }  
    jdbc {
    jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/union_blueplus?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
    #&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "Tsl@2018"
    #使用前验证连接是否有效
    jdbc_validate_connection => true
    #多久进行连接有效验证(4小时)
    jdbc_validation_timeout => 14400
    #连接失败后最大重试次数
    connection_retry_attempts => 50
    #连接失败后重试时间间隔
    connection_retry_attempts_wait_time => 1
    jdbc_page_size => "2000"    
    schedule => "* * * * *"
    statement => "select pr.parkingReserveID, pr.villageID as villageid, io.ioID as inioid,
                    io.ioID as outioid, pr.inParkingLogID, pr.outParkingLogID, pr.carBrand as cartype,
                    pr.plateNo as plateno, peo.peopleName as peoplename, peo.phoneNo as phoneno,
                    peo.credentialNo as credentialno, pr.updateTime as intime
                    from e_parking_reserve pr
                    left join e_parking_channel pc on pc.parkingID = pr.parkingID
                    left join b_in_out io on io.ioID = pc.ioID
                    left join e_parking_car ec on ec.plateNo = pr.plateNo
                    left join p_people peo on peo.peopleID = ec.peopleID
                  where  pr.updateTime >= :sql_last_value"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./last_record/logstash_wkcar_last_time"
    type => "wkcar"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }

}
output {
    if [type] == "alarm"{
         elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.66.34:9200"]
        # 索引名称 可自定义
        index => "alarmlogindex"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{alarmID}"
        document_type => "alarm"
        }
    }
    if [type] == "device"{
         elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.66.34:9200"]
        # 索引名称 可自定义
        index => "deviceindex"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{deviceID}"
        document_type => "device"
        }
    }
    if [type] == "accessLog"{
         elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.66.34:9200"]
        # 索引名称 可自定义
        index => "accesslogindex"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{accessLogID}"
        document_type => "accessLog"
        }
    }
    if [type] == "wkface"{
         elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.66.34:9200"]
        # 索引名称 可自定义
        index => "facelogindex"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{faceLogID}"
        document_type => "wkface"
        }
    }
    if [type] == "wkcar"{
         elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.66.34:9200"]
        # 索引名称 可自定义
        index => "parkingreservelogindex"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{parkingReserveID}"
        document_type => "wkcar"
        }
    }  
    stdout {
        # JSON格式输出
        codec => json_lines
    }
}

 参考自:https://blog.csdn.net/u010887744/article/details/86708490

免责声明:文章转载自《记一次docker安装logstash,并且读取数据库数据到es》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇使用小书匠编辑器将文章快速发布到各大主要博客平台win10安装docker运行环境下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

python_18(Django基础)

第1章 web框架的本质 1.1 socket 1.2 空格后面是主体内容 1.3 HTTP协议 1.3.1 响应流程 1.4 HTTP请求方法 1.5 HTTP工作原理 1.6 URL 1.7 HTTP请求格式 1.8 HTTP响应格式 1.9 根据不同的路径返回不同内容 1.10 进阶版 1.11 根据访问的路径返回不同的动态页面 第2章 Django...

在linux环境下搭建java web测试环境(非常详细!!)

一.项目必备软件及基本思路 项目必备:虚拟机:VMware Workstation (已安装linux的 CentOS6.5版本) 项目:java web项目 (必须在本地部署编译后选择项目的webRoot,改为ROOT(ROOT包含下面四个关键文件),放到tomcat下的webapps下即可,因为tomcat启用一个工程的时候,就是发布了除了JSP以外的...

美团开源 SQL 优化工具 SQLAdvisor

https://www.oschina.net/news/82725/sqladvisor-opensource https://github.com/Meituan-Dianping/SQLAdvisor SQLAdvisor 是由美团点评公司北京 DBA 团队开发维护的 SQL 优化工具:输入SQL,输出索引优化建议,现已开源。 它基于 MySQL 原...

关系数据库和NoSQL结合使用:MySQL + MongoDB

Home Page作者使用一个案例来说明MySQL+MongoDB结合使用,发挥各自所长,并且认为他们互补性很强。当然,这其中不可避免引入DDD中的编程设计模式 Repository仓储模式,通过它能够将数据存储方式和应用分离开来,这样,我们的程序就不受限于任何存储方式,无论是NoSQL或关系数据库。这个案例是一个按效果付费Pay-for-use的分析案例...

PHP环境搭建

php环境分为两种:wanmplanmp PHP官网  w: windows 系统l: linux 系统a: apache 服务器n: nginx 服务器m: mysql 数据库p: php 服务器端的脚本语言 安装环境分为两种:集成环境 一键安装 集成环境 phpStudy wampserver apmserver xampp linux phpStu...

logstash收集日志并写入Redis再到es集群

redis做数据缓存 图形架构: 环境准备 172.31.2.101 es1 + kibana 172.31.2.102 es2 172.31.2.103 es3 172.31.2.104 logstash1 172.31.2.105 logstash2 172.31.2.106 Redis 172.31.2.107 web1 安装redis [roo...