DataX 实现增量同步pg数据库
准备dataX的json文件1.datax_quanliang.json (获取全量迁移json文件){"job": {"setting": {"speed": {"byte": 1073741824},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "postgresqlread
datax做增量数据迁移的思路:
1.先全量迁移一次
datax_quanliang.json文件是datax全量数据插入到目的端数据
存放在我安装dataX路径下/opt/datax/job/
2.获取目标库某一列的所有字段
datax_timemubiao.json文件是datax用来获取目标库一列的所有字段
存放在我安装dataX路径下/opt/datax/job/
3.获取源端库某一列的所有字段
datax_timeyuanduan.json文件是datax用来获取源端库一列的所有字段
存放在我安装dataX路径下/opt/datax/job/
4.新增数据通过脚本对源端和目标端某一列所有数据,进行一行一行做对比,筛选出不相等的行,使用dataX工具把“筛选出不相等的行”
插入到目的端,从而实现增量迁移。
准备dataX的json文件
1.datax_quanliang.json (获取全量迁移json文件)
{
"job": {
"setting": {
"speed": {
"byte": 1073741824
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"fetchSize":"10000",
"username": "postgres",
"password": "postgres",
"where": "",
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:postgresql://10.0.30.10:5432/postgres"
],
"querySql": [
"select * from gis_location_user_track_201904 where 1=1 ;"
]
}
]
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"batchSize":"10000",
"username": "postgres",
"password": "postgres",
"column": [
"*"
],
"preSql": [
"select * from gis_location_user_track_201904 where 2=1"
],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://10.0.30.3:5432/postgres",
"table": [
"gis_location_user_track_201904"
]
}
]
}
}
}
]
}
}
2.datax_timemubiao.json (获取目标库字段)
{
"job": {
"setting": {
"speed": {
"byte": 1073741824
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"fetchSize":"10000",
"username": "postgres",
"password": "postgres",
"connection": [
{
"jdbcUrl": [
"jdbc:postgresql://10.0.30.3:5432/postgres"
],
"querySql": [
"select locationtime from gis_location_user_track_201904 ;"
]
}
]
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"fileName": "pg_mubiao_result",
"fileFormat": "csv",
"path": "/opt/datax/job",
"writeMode": "truncate",
}
}
}
],
"setting": {
"speed": {
"channel": 2
}
}
}
}
3.datax_timeyuanduan.json (获取源端库字段)
{
"job": {
"setting": {
"speed": {
"byte": 1073741824
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"fetchSize":"10000",
"username": "postgres",
"password": "postgres",
"connection": [
{
"jdbcUrl": [
"jdbc:postgresql://10.0.30.10:5432/postgres"
],
"querySql": [
"select locationtime from gis_location_user_track_201904 ;"
]
}
]
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"fileName": "pg_yuanduan_result",
"fileFormat": "csv",
"path": "/opt/datax/job",
"writeMode": "truncate",
}
}
}
],
"setting": {
"speed": {
"channel": 2
}
}
}
}
4.增量迁移脚本data.sh
#! /bin/bash
#获取目标数据库最大数据时间,并写入一个 csv 文件
python /opt/datax/bin/datax.py /opt/datax/job/datax_timeyuanduan.json
if [ $? -ne 0 ]; then
echo "data.sh error, can not get yuanduan_time from target db!"
exit 1
fi
python /opt/datax/bin/datax.py /opt/datax/job/datax_timemubiao.json
# $?是shell变量,表示"最后一次执行命令"的退出状态.0为成功,非0为失败, -ne 为不等于
if [ $? -ne 0 ]; then
echo "data.sh error, can not get mubiao_time from target db!"
exit 1
fi
# 找到 DataX 写入的文本文件,并将内容读取到一个变量中
MUDI_TIME=`ls /opt/datax/job/pg_mubiao_result__*`
YUAN_TIME=`ls /opt/datax/job/pg_yuanduan_result_*`
if [ ! -s "$MUDI_TIME" ] ; then
echo "++++++++++全量迁移+++++++++++++++++++++++"
#全量迁移
python /opt/datax/bin/datax.py /opt/datax/job/datax_quanliang.json
exit 1
fi
#获取creat_time源端和目的端不同字段的行
grep -v -wvf $MUDI_TIME $YUAN_TIME > /opt/datax/job/datax.zengliang.txt
grep -wf $MUDI_TIME $YUAN_TIME > /opt/datax/job/datax.xiangtong.txt
ZL="/opt/datax/job/datax.zengliang.txt"
XT='/opt/datax/job/datax.xiangtong.txt'
#增量迁移
cat $ZL | while read line
do
/usr/local/pgsql/12/bin/psql -U postgres -w -c "select id from gis_location_user_track_201904 where locationtime='$line' ;" > /opt/datax/job/id.txt
yy=`awk 'NR==3{print}' /opt/datax/job/id.txt`
/usr/local/pgsql/12/bin/psql -h 10.0.30.3 -U postgres -w -c "delete from gis_location_user_track_201904 where id = '$yy';"
sed "s/1=1/locationtime = '$line'/g" /opt/datax/job/datax_quanliang.json > /opt/datax/job/datax_quanliang.tmp.json
echo "+++++++增量更新+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"
python /opt/datax/bin/datax.py /opt/datax/job/datax_quanliang.tmp.json
# 删除临时文件
rm -rf /opt/datax/job/datax_quanliang.tmp.json
done
exit
操作步骤:
1.先创建好需要预创建的文件,文件根据实际的datax安装路劲,需要在datax.sh脚本里面自行修改。
我的环境把/opt/datax/job/datax_quanliang.json;/opt/datax/job/datax_timeyuanduan.json ;/opt/datax/job/datax_timemubiao.json
放到我安装的dataX路径/opt/datax/job/目录下
在我安装的dataX路径/opt/datax/job/目录下创建/opt/datax/job/datax.zengliang.txt ;/opt/datax/job/datax.xiangtong.txt
/opt/datax/job/id.txt
2.第一次执行全量迁移,(脚本路劲存放随意)
./data.sh
3.进行增量迁移
./data.sh
更多推荐
所有评论(0)