【流数据处理】MySql/PG/Oracle+Kafka+Flink(CDC捕获) 部署及实时计算

摘要:
本文主要介绍了实时数据仓库的部署和计算,它主要分为三个部分:数据收集$color{red}{[E]}$关系数据库MySql/PG/Oracle+Debezium+KafkaConnector数据计算$color{red}}{[T]}$Flink数据存储$color red}{[L]}传输,关系数据库/柱状数据库clickhouse/hbase注:这里有两篇阿里巴巴文章可供参考FlinkJDBConnect
主要介绍实时数仓得部署、计算

文章主要分3部分
图片1

  • 数据采集
  • $color{red}{[E]}$ 关系型数据库MySql/PG/Oracle+Debezium+Kafka Connector
  • 数据计算
  • $color{red}{[T]}$ Flink
  • 数据存储
  • $color{red}{[L]}$ 传输,关系型数据库/列式数据库 clickhouse/hbase

注:这里贡献2篇阿里巴巴得文章供参考
Flink JDBC Connector:Flink 与数据库集成最佳实践
基于 Flink SQL CDC 的实时数据同步方案
Debezium监控MySql
Debezium监控Oracle
Debezium-Github
Oracle部署参考文档

1. 环境要求

软件要求:

  1. Kafka集群:本实验用得是CDH5.14版本得Kafka集群
  2. 数据库:Mysql 8.x/PG 10.x/Oracle11G docker搭建。(Mysql开启行日志模式,Oracle开启归档)
  3. 计算引擎:Flink 1.13
  4. Kafka Connector:
    debezium-connector-mysql-1.4.0.Final-plugin.tar.gz
    debezium-connector-postgres-1.4.0.Final-plugin.tar.gz
    debezium-connector-oracle-1.4.0.Final-plugin.tar.gz

注:以下操作都需要在3台Kafka集群中操作

Kafka配置目录:/opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist
Kafka Bin目录:/opt/cloudera/parcels/KAFKA/lib/kafka

1. 数据采集

1.1 Kafka部署(Mysql/PG/Oracle相同)

  • 下载软件[debezium-connector-mysql-1.4.0.Final-plugin.tar.gz],并解压,目录可以随便选。
  • 本人放得目录为:/opt/cloudera/parcels/KAFKA/lib/kafka/kafka_connect
  • 并把 debezium-connector-mysql 目录下得jar包都拷贝一份到${KAFKA_HOME}/libs中
  • 把Mysql/PG得jdbc包放入libs中 [mysql-connector-java-8.0.21.ja]
  • Oracle需要下载客户端并把jar包复制到${KAFKA_HOME}/libs
    下载地址
  • 修改 ${KAFKA_HOME}/bin 或者 [opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist] 中配置文件
  • 正常/CDH环境
  • 单机部署修改 [connect-standalone.properties]
  • 集群部署修改 [connect-distributed.properties]
  • 修改 Kafka cluster,打开 plugin.path 配置,并配置目录
    Kafka cluster
  • 如果有多个不同的数据库(Mysql/PG/Oracle)需要监控,目录之间用逗号分隔
    plugin.path
  • 启动Kafka集群,设置环境变量
export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/connect-log4j.properties #不设置后面kafka会报错
./bin/connect-distributed.sh ../../etc/kafka/conf.dist/connect-distributed.properties
  • 提交mysql-connector,监视Mysql数据库
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.58.172:8083/connectors/ -d '
{ 
"name" : "debezium-mysql",
"config":{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "10.20.60.44", #mysql的IP地址
"database.port": "3306", #mysql的端口号
"database.user": "yaowentao", #mysql的用户名
"database.password": "Sx202101", #mysql用户对应的密码
"database.server.id" :"1739",
"database.server.name": "Mysql", #mysql服务的逻辑名,例如Mysql
"database.history.kafka.bootstrap.servers": "192.168.58.171:9092,192.168.58.172:9092,192.168.58.177:9092", #Kafka集群地址
"database.history.kafka.topic": "dbhistory.mydb", #Kafka topic名称
"database.whitelist": "mydb", 
#"table.whitelist":"mydb.orders",
"include.schema.changes" : "true" ,
"decimal.handling.mode": "string", #处理浮点值
"mode" : "incrementing",
"incrementing.column.name" : "id",
"database.history.skip.unparseable.ddl" : "true"
}
}'
  • 提交Oracle-connector,监视Mysql数据库
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.58.172:8083/connectors/ -d '
{
"name": "debezium-oracle-yaowentao",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"database.server.name" : "helowin",
"database.hostname" : "10.20.60.44",
"database.port" : "1521",
"database.user" : "dbzuser",
"database.password" : "dbz",
"database.dbname" : "helowin",
"database.schema" : "scott",
"database.connection.adapter": "logminer", #1.4版本需要设置
"database.tablename.case.insensitive": "true",
"table.include.list" : "scott.*", #表白名单
"snapshot.mode" : "initial",
"schema.include.list" : "scott",#schema白名单
"database.history.kafka.bootstrap.servers" : "192.168.58.171:9092,192.168.58.172:9092,192.168.58.177:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}'
  • 查看是否启动成功,JPS
  • 如果是CDH集群,会报一个日志文件找不到得情况
    图3
    解决办法:将配置文件得路径指向
export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/connect-log4j.properties

图4

  • 查看connectors得情况,也可以在浏览器中查询。本案例在命令行中执行
查看创建的connector列表
curl -X GET http://192.168.58.171:8083/connectors
http://192.168.58.172:8083/connectors

图9

查看创建的connector状态
curl -X GET http://192.168.58.171:8083/connectors/debezium-mysql/status
http://192.168.58.172:8083/connectors

图9

查看创建的connector配置
curl -X GET http://192.168.58.171:8083/connectors/debezium-mysql/config

图9

删除connector
curl -X DELETE http://192.168.58.171:8083/connectors/debezium-mysql

图5

  • Kafka Connector启动后 会将监视得库中每个表都创建个一个topic,且该topic只包含该表得增删改(insert/delete/update)操作。DDL操作会统一写入以配置文件中得database.server.name参数的值为名称的topic内。命名方式:
  • DDL topic:serverName
  • DML topic:serverName.databaseName.tableName
    图6

2. 数据计算

2.1

# Flink执行sql语句
DROP TABLE ORDERS;
CREATE TABLE orders (
order_id INT,
order_date STRING,
customer_name STRING,
price double,
product_id INT,
order_status INT
) WITH (
'connector' = 'kafka',
'format' = 'debezium-json',
'topic' = 'Mysql.mydb.orders',
'properties.bootstrap.servers' = '192.168.58.171:9092,192.168.58.172:9092,192.168.58.177:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'debezium-json.schema-include' = 'true'
);
#时间需要转换
SELECT TO_TIMESTAMP_LTZ(cast(t.order_date as bigint),3) order_date_times,t.* from orders t;

图7

3. 数据导入

3.1 Flink中创建表,直接可以导入

4. 补充,Oralce数据库配置(11G往后的配置可参考官网)

alter system set db_recovery_file_dest_size=5G; #按要求修改,不然会报错

#Oracle 开启归档日志
alter database add supplemental log data (all) columns; #开启行模式

#创建 新得表空间与dbzuser,并赋予相应得权限
CREATE TABLESPACE LOGMINER_TBS DATAFILE '/home/oracle/app/oracle/oradata/helowin/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS ;

GRANT CREATE SESSION TO dbzuser;
GRANT SELECT ON V_$DATABASE TO dbzuser;
GRANT FLASHBACK ANY TABLE TO dbzuser;
GRANT SELECT ANY TABLE TO dbzuser;
GRANT SELECT_CATALOG_ROLE TO dbzuser;
GRANT EXECUTE_CATALOG_ROLE TO dbzuser;
GRANT SELECT ANY TRANSACTION TO dbzuser;
GRANT SELECT ANY DICTIONARY TO dbzuser;

GRANT CREATE TABLE TO dbzuser;
GRANT ALTER ANY TABLE TO dbzuser;
GRANT LOCK ANY TABLE TO dbzuser;
GRANT CREATE SEQUENCE TO dbzuser;

GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser;
GRANT SELECT ON V_$LOGMNR_LOGS to dbzuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser;
GRANT SELECT ON V_$LOGFILE TO dbzuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;

#暂时可以不用,官网有做要求,暂时没明白有什么用
CREATE USER debezium IDENTIFIED BY dbz DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS;
GRANT CONNECT TO debezium;
GRANT CREATE SESSION TO debezium;
GRANT CREATE TABLE TO debezium;
GRANT CREATE SEQUENCE to debezium;
ALTER USER debezium QUOTA 100M on users;

免责声明:文章转载自《【流数据处理】MySql/PG/Oracle+Kafka+Flink(CDC捕获) 部署及实时计算》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇JS日历插件小程序Page里的函数比app.js先执行的解决办法下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

出现 "System.Data.OracleClient 需要 Oracle 客户端软件 8.1.7 或更高版本" 错误的解决办法

出现 "System.Data.OracleClient 需要 Oracle 客户端软件 8.1.7 或更高版本" 错误的解决办法 1.问题: 在Windows SP2 + VS2005 + Oracle 9i +IIS5.1环境中运行ASP.NET网页的时候出现如下错误: System.Data.OracleClient 需要 Oracle 客户端软...

监控和管理Oracle UNDO表空间的使用

监控和管理Oracle UNDO表空间的使用                 对Oracle数据库UNDO表空间的监控和管理是我们日常最重要的工作之一,UNDO表空间通常都是Oracle自动化管理(通过undo_management初始化参数确定);UNDO表空间是用于存储DML操作的前镜像数据,它是实例恢复,数据回滚,一致性查询功能的重要组件;我...

php+mysql缓存技术的实现

本教程适合于那些对缓存SQL查询以减少数据库连接与执行的负载、提高脚本性能感兴趣的PHP程序员。概述许多站点使用数据库作为站点数据存储的容器。数据库包含了产器信息、目录结构、文章或者留言本,有些数据很可能是完全静态的,这些将会从一个缓存系统中得到的极大好处。这样一个系统通过把SQL查询的结果缓存到系统的一个文件中存储,从而阻止连接数据库,构造查询与取得返回...

PHP利用MySQL保存session(php5.4之前的处理)

简介 使用MySQL保存session,需要保存三个关键性的数据:session id、session数据、session生命期。 考虑到session的使用方式,没必要使用InnoDB引擎,MyISAM引擎可以获得更好的性能。如果环境允许,可以尝试使用MEMORY引擎。 保存session数据的列,有需要的话,可以使用utf8或utf8mb4字符集;保...

mysql日志文件相关的配置【1】

1、log_output=【file | table | none 】   这个参数指定general_log &slow_query_log 的输出目标、可以是table | file | none 这向个值;   如果取table 那么日志会输出到mysql.general_log 和mysql.slow_log表中去 2、general_lo...

mysql启动报错“NET HELPMSG 3534“的解决办法

原因: mysql安装步骤错误,从mysql5.7.6开始,mysql需要这样安装: mysqld --initialize-insecure或者mysqld --initialize mysqld -install net start mysql mysqld --initialize-insecure自动生成无密码的root用户,mysqld --...