datax实战

摘要:
{“setting”:“percentage”:“content”:{“settings”:{“record”:“百分比”:“内容”:[{”reader“:”parameter“:”connection“:[”jdbc:“parameter”:”用户名“:”密码“:”列“:”

一、全量同步

1.简单字段同步

本文以mysql -> mysql为示例:

   本次测试的表为mysql的系统库-sakila中的actor表,由于不支持目的端自动建表,此处预先建立目的表:

CREATE TABLE `actor_copy` (
  `actor_id` smallint(5) unsigned NOT NULL AUTO_INCREMENT,
  `first_name` varchar(45) NOT NULL,
  `last_name` varchar(45) NOT NULL,
  `last_update` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`actor_id`),
  KEY `idx_actor_last_name` (`last_name`)
) ENGINE=InnoDB AUTO_INCREMENT=201 DEFAULT CHARSET=utf8;

  通过官方快速开始提供的命令,可以查看配置模板:

 python datax.py -r {YOUR_READER} -w {YOUR_WRITER}
 python datax.py -r streamreader -w streamwriter

  打开dataX的mysqlreader以及mysqlwriter文档,编写JSON配置文件:(此处经过试验,即使是自增主键,同样需要配置,否则会报输入输出不匹配的错),加上JSON配置文件的x权限

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "Zcc170821#",
                        "column": [
                            "actor_id",
                            "first_name",
                            "last_name",
                            "last_update"
                        ],
                        "splitPk": "actor_id",
                        "connection": [
                            {
                                "table": [
                                    "actor"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://192.168.19.129:3306/sakila"
                                ]
                            }
                        ]
                    }
                },
              "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "Zcc170821#",
                        "column": [
                            "actor_id",
                            "first_name",
                            "last_name",
                            "last_update"
                        ],
                        "preSql": [
                            "truncate table actor_copy"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.19.129:3306/sakila",
                                "table": [
                                    "actor_copy"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

这样,单表的最基本全量同步就完成了!

  通过python 命令运行即可:

python datax.py ../job/mysqltest.json

  2.增加常量与插入时间字段

    原表正常字段,目标表多出两列:来源部门,插入时间。json配置如下:

      常量使用单引号,时间暂时未摸索到变量如何使用(以下通过启动脚本已更新方式),通过时间函数实现

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column": [
                            "actor_id",
                            "first_name",
                            "last_name",
                            "last_update",
                "'自动生成'",
                "NOW()"
                        ],
                        "splitPk": "actor_id",
                        "connection": [
                            {
                                "table": [
                                    "actor"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://hadoop01:3306/sakila"
                                ]
                            }
                        ]
                    }
                },
              "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "root",
                        "column": [
                            "actor_id",
                            "first_name",
                            "last_name",
                            "last_update",
                "src",
                "load_time"
                        ],
                        "preSql": [
                            "truncate table actor_copy"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://hadoop01:3306/sakila",
                                "table": [
                                    "actor_copy"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

   2020.1.11,更新通过启动脚本控制时间戳:

    首先Json配置更改为变量:(注意变量有个单引号!

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column": [
                            "actor_id",
                            "first_name",
                            "last_name",
                            "last_update",
                "'${src}'",
                "'${systime}'"
                        ],
                        "splitPk": "actor_id",
                        "connection": [
                            {
                                "table": [
                                    "actor"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://hadoop01:3306/sakila"
                                ]
                            }
                        ]
                    }
                },
              "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "root",
                        "column": [
                            "actor_id",
                            "first_name",
                            "last_name",
                            "last_update",
                "src",
                "load_time"
                        ],
                        "preSql": [
                            "truncate table actor_copy"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://hadoop01:3306/sakila",
                                "table": [
                                    "actor_copy"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

    在datax的srcipts文件下新建一个启动脚本:

#coding:UTF-8
from datetime import datetime
import os
import sys

configFilePath = sys.argv[1]
src = '自动生成'
currentTime  = format(datetime.now(), '%Y-%m-%d %H:%M:%S')
script2execute  = "python /opt/datax/bin/datax.py {0} -p "-Dsrc='{1}' -Dsystime='{2}'"".format( configFilePath, src, currentTime)
os.system(script2execute)

  在srcipts下的启动命令为:

    

python ./datax_start.py '/opt/datax/job/mysql_actor_copy_arg.json'

 二、增量同步

  增量同步的核心思路是时间戳,需要同步的表中要有Update_time字段:

  参考实现:https://www.jianshu.com/p/34b3a084d7d8

      https://blog.csdn.net/quadimodo/article/details/82186788

  增量数据和全量数据如何合并?使用full join

    https://blog.csdn.net/kx306_csdn/article/details/89508323

  当然如果有例如更新时间,修改时间字段,可以直接将增量表INTO入昨日全量,然后根据ID去重,取最新时间也是可以的

免责声明:文章转载自《datax实战》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇mysql基本认识【关系型数据库和nosql、mysql操作流程和体系,库操作,表操作,数据的操作,字符集的操作,以及php作为client操作数据库】对连接本身没有疑问使用QtXlsx来读写excel文件下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

chrome 49 版本bug: flex父元素设置flex:1 , 子元素用height:100%无法充满父元素

1 <div class="container"> 2 <div class="item"> 3 <div class="item-inner"> 4 <a> 5 Botón 6 &l...

PostgreSQL常用初级技能树

1.创建表需要id自增 设置serial即可,示例: id serial not null  2.创建表没有设置后面想要再设置自增 给test表设置一个自增序列test_id_seq  CREATE SEQUENCE test_id_seq START 10;  然后在设计表中添加 nextval('test_id_seq'::regclass)  tes...

java 字符串如何直接转LocalDateTime?

  1.情景展示   在实际开发过程中,可能会遇到将前端传的日期格式转成LocalDateTime插入到数据库的情况,如何将日期转成LocalDateTime呢? 2.原因分析   在Java8中,日期类不同于以前版本的java.util.Date工具类,Date类可以存日期也可以存时间,还能存日期+时间,统统都能塞进去;   但java8中将日期与时间...

MySQL 按照日期格式查询带有时间戳数据

按照日期格式查询带有时间戳数据一般在MSQL数据库中的时间都是以时间戳的格式来存储时间的,但是对于我们来说,时间戳格式具体表示的是什么时间,我们很难一眼看出来,所以当我们要具体查询某一个时间或时间段的数据时,就要进行日期到时间戳的转换。我们常会用到这两个函数:FROM_UNIXTIME()和UNIX_TIMESTAMP()函数1. FROM_UNIXTIM...

K8S系统学习(一)

参考链接:https://blog.csdn.net/HsiaChubby/article/details/90442170 参考链接:https://segmentfault.com/a/1190000018741112?utm_source=tag-newest 一、K8S架构图。(K8S的背景,作用什么的百度可以查阅,我主要一下跟实战相关的) 构成介...

mysql按照年月日查询,导出每日数据数量

mysql没有提供unix时间戳的专门处理函数,所以,如果遇到时间分组,而你用的又是整型unix时间戳,则只有转化为mysql的其他日期类型!FROM_UNIXTIM()将unix时间戳转为datetime等日期型!一、年度查询查询 本年度的数据SELECT *FROM blog_articleWHERE year( FROM_UNIXTIME( Blo...