本篇博客说说DataX如何进行全量和增量数据同步,虽然用演示oracle同步到mysql,但其他数据库之间的同步都差不多

1.DataX介绍

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
在这里插入图片描述
Github主页地址:https://github.com/alibaba/DataX
DataX工具下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

DataX采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件
在这里插入图片描述

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

下面是DataX3.0支持的插件,也就是能在这些数据源直接根据json脚本配置互相同步数据

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库MySQL
Oracle
OceanBase
SQLServer
PostgreSQL
DRDS
达梦
通用RDBMS(支持所有关系型数据库)
阿里云数仓数据存储ODPS
ADS
OSS
OCS
NoSQL数据存储OTS
Hbase0.94
Hbase1.1
MongoDB
Hive
无结构化数据存储TxtFile
FTP
HDFS
Elasticsearch

2.DataX实战

2.1.DataX基础环境搭建
  • 1.把下载好的 datax.tar.gz 上传到Linux

  • 2.解压 tar -xzvf datax.tar.gz ,会有/datax 目录,进入cd datax 目录
    在这里插入图片描述

  • 3.先删除datax目录中的所有隐藏文件,否则执行脚本会失败

 find ./ -name '._*' -print0 |xargs -0 rm -rf
  • 4.执行测试脚本:./bin/datax.py job/job.json ,看到下图的效果,说明环境正常
    在这里插入图片描述
2.2.DataX 全量同步数据,oracle 到 mysql

从上面的介绍中可以知道,datax是通过不同的插件去同步数据的,每个插件都有 reader 和 writer ,要从oracle同步数据到mysql,执行: ./bin/datax.py -r oraclereader -w mysqlwriter ,获取示例json配置,然后去修改里面的参数

#在job 目录中创建 vi oracle_to_mysql.json,这是改完后能同步的参数配置
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "oraclereader", 
                    "parameter": {
                        "column": ["INVESTOR_ID","INVESTOR_NAME","ID_TYPE","ID_NO","CREATE_TIME"], 
                        "splitPk": "INVESTOR_ID",
                        "where" : "INVESTOR_ID is not null",
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:oracle:thin:@172.17.112.177:1521:helowin"], 
                                "table": ["CXX.CUSTOMER"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "admin"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [ 
                            "customer_no",
                            "customer_name",
                            "id_type",
                            "id_no",
                            "create_time"
                           
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://172.17.112.176:3306/customer_db?useUnicode=true&characterEncoding=UTF-8", 
                                "table": ["customer_datax"]
                            }
                        ], 
                        "username": "admin", 
                        "password": "123456", 
                        "preSql": [], 
                        "session": ["set session sql_mode='ANSI'"], 
                        "writeMode": "update"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "3"
            }
        }
    }
}

执行全量同步:./bin/datax.py job/oracle_to_mysql.json ,可以看到有 1045条记录被同步到mysql了
在这里插入图片描述

2.3.DataX 增量同步数据,oracle 到 mysql

增量同步需要Linux的crontab定时任务配合,再通过shell脚本计算时间,并传递到json脚本的Where条件中,"where" : "CREATE_TIME > unix_to_oracle(${create_time}) and CREATE_TIME <= unix_to_oracle(${end_time})"

${create_time} 和 ${end_time}由shell脚本计算
vi /home/datax/servers/datax/job/oracle_to_mysql.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "oraclereader", 
                    "parameter": {
                        "column": ["INVESTOR_ID","INVESTOR_NAME","ID_TYPE","ID_NO","CREATE_TIME"], 
                        "splitPk": "INVESTOR_ID",
                        "where" : "CREATE_TIME > unix_to_oracle(${create_time}) and CREATE_TIME <= unix_to_oracle(${end_time})",
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:oracle:thin:@172.17.112.177:1521:helowin"], 
                                "table": ["CXX.CUSTOMER"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "admin"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [ 
                            "customer_no",
                            "customer_name",
                            "id_type",
                            "id_no",
                            "create_time"
                           
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://172.17.112.176:3306/customer_db?useUnicode=true&characterEncoding=UTF-8", 
                                "table": ["customer_datax"]
                            }
                        ], 
                        "username": "admin", 
                        "password": "123456", 
                        "preSql": [], 
                        "session": ["set session sql_mode='ANSI'"], 
                        "writeMode": "update"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "3"
            }
        }
    }
}

vi /home/datax/servers/datax/job/increment_sync.sh

#!/bin/bash
source /etc/profile
# 截至时间设置为当前时间戳
end_time=$(date +%s)
# 开始时间设置为300s前时间戳
create_time=$(($end_time - 300))
# 执行datax脚本,传入时间范围
/home/datax/servers/datax/bin/datax.py /home/datax/servers/datax/job/oracle_to_mysql.json -p "-Dcreate_time=$create_time -Dend_time=$end_time" &

并给increment_sync.sh赋可执行权限:chmod -R 777 increment_sync.sh

然后设置 crontab 定时任务,每5分钟执行一次,和上面脚本中的300s对应

crontab -e
 */5 * * * * /home/datax/servers/datax/job/increment_sync.sh >/dev/null 2>&1
* * * * * command
| | | | |
| | | | +----- day of week (0 - 6) (Sunday=0)
| | | +------- month (1 - 12)
| | +--------- day of month (1 - 31)
| +----------- hour (0 - 23)
+------------- min (0 - 59)

例如:

每小时执行一次命令: 0 * * * * command
每天 8 点执行一次命令: 0 8 * * * command
每月 1 号的 8 点执行一次命令: 0 8 1 * * command

tip:oralce是没有 unix_to_oracle 函数的,需要自行在oracle中创建

 create or replace function unix_to_oracle(in_number NUMBER) return date is
 begin
  return(TO_DATE('19700101','yyyymmdd') + in_number/86400 +TO_NUMBER(SUBSTR(TZ_OFFSET(sessiontimezone),1,3))/24);
 end unix_to_oracle;

好了,此时就完成了增量同步

3.DataX同步流程:

  • 1.第一次部署datax时,手动执行全量同步脚本,同步已有客户数据
  • 2.再进行增量同步,用Linux的crontab和脚本配合,能按时间进行增量同步
  • 3.oracle同步mysql时,有几种同步模式,建议"writeMode"设置为"update":
    • 3.1.mysql的"writeMode"设置为"insert",在有重复数据记录时,不会同步,直接跳过,就算oracle中该条数据已经修改了,也不会同步
    • 3.2.mysql的"writeMode"设置为"replace",在有重复数据记录时,会先删除mysql中的记录,再把oracle中的记录新增进去
    • 3.3.mysql的"writeMode"设置为"update",在有重复数据记录时,会把oracle中的列,覆盖mysql中的列,未配置同步的列,不会覆盖
  • 4.需要在oracle库创建uninx_to_date() 函数

4.增量同步方式优化

上面需要在oracle库创建一个uninx_to_date()函数,下面用shell脚本把uninx时间戳转为yyyy-MM-dd hh:mm:ss 类型,然后传到oracle_to_mysql.json配置中,就不用创建这个uninx_to_date()函数了

increment_sync.sh 脚本获取字符串类型时间:

#!/bin/bash
source /etc/profile
#当前时间戳
cur_time=$(date +%s)
#结束时间
end_time="'$(date -d @$cur_time +"%Y-%m-%d %H:%M:%S")'"
#开始时间,为当前时间的前300s
create_time="'$(date -d @$(($cur_time-120)) +"%Y-%m-%d %H:%M:%S")'"

# 执行datax脚本,传入时间范围
/home/datax/servers/datax/bin/datax.py /home/datax/servers/datax/job/oracle_to_mysql.json -p "-Dcreate_time=$create_time -Dend_time=$end_time" &

修改oracle_to_mysql.json的where参数,去掉uninx_to_date()函数

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "oraclereader", 
                    "parameter": {
                        "column": ["INVESTOR_ID","INVESTOR_NAME","ID_TYPE","ID_NO","CREATE_TIME"], 
                        "splitPk": "INVESTOR_ID",
                        "where" : "CREATE_TIME >to_date('${create_time}','yyyy-mm-dd hh24:mi:ss')  and CREATE_TIME <= to_date('${end_time}','yyyy-mm-dd hh24:mi:ss')",
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:oracle:thin:@172.17.112.177:1521:helowin"], 
                                "table": ["CXX.CUSTOMER"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "admin"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [ 
                            "customer_no",
                            "customer_name",
                            "id_type",
                            "id_no",
                            "create_time"
                           
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://172.17.112.176:3306/customer_db?useUnicode=true&characterEncoding=UTF-8", 
                                "table": ["customer_datax"]
                            }
                        ], 
                        "username": "admin", 
                        "password": "123456", 
                        "preSql": [], 
                        "session": ["set session sql_mode='ANSI'"], 
                        "writeMode": "update"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "3"
            }
        }
    }
}

Windows操作系统,Python3使用datax需要下载 Python3版的datax.py,点我下载

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐