datax做增量数据迁移的思路:

1.先全量迁移一次
datax_quanliang.json文件是datax全量数据插入到目的端数据
存放在我安装dataX路径下/opt/datax/job/

2.获取目标库某一列的所有字段
datax_timemubiao.json文件是datax用来获取目标库一列的所有字段
存放在我安装dataX路径下/opt/datax/job/

3.获取源端库某一列的所有字段
datax_timeyuanduan.json文件是datax用来获取源端库一列的所有字段
存放在我安装dataX路径下/opt/datax/job/

4.新增数据通过脚本对源端和目标端某一列所有数据,进行一行一行做对比,筛选出不相等的行,使用dataX工具把“筛选出不相等的行”
插入到目的端,从而实现增量迁移。

准备dataX的json文件

1.datax_quanliang.json (获取全量迁移json文件)

{
	"job": {
		"setting": {
			"speed": {
				"byte": 1073741824
			},
			"errorLimit": {
				"record": 0,
				"percentage": 0.02
			}
		},
		"content": [
			{
				"reader": {
					"name": "postgresqlreader",
					"parameter": {
					    "fetchSize":"10000",
						"username": "postgres",
						"password": "postgres",
						"where": "",
						"column": ["*"],
						"connection": [
							{
								"jdbcUrl": [
									"jdbc:postgresql://10.0.30.10:5432/postgres"
								],
								"querySql": [
                                         			"select * from gis_location_user_track_201904 where 1=1 ;"
								]
							}
						]
					}
				},
				"writer": {
					"name": "postgresqlwriter",
					"parameter": {
					    "batchSize":"10000",
						"username": "postgres",
						"password": "postgres",
						"column": [
							"*"
						],
						"preSql": [
             						 "select * from gis_location_user_track_201904 where 2=1"
         					   ],						
						"connection": [
							{
								"jdbcUrl": "jdbc:postgresql://10.0.30.3:5432/postgres",
								"table": [
									"gis_location_user_track_201904"
								]
							}
						]
					}
				}
			}
		]
	}
}

2.datax_timemubiao.json (获取目标库字段)

{
	"job": {
		"setting": {
			"speed": {
				"byte": 1073741824
			},
			"errorLimit": {
				"record": 0,
				"percentage": 0.02
			}
		},
		"content": [
			{
				"reader": {
					"name": "postgresqlreader",
					"parameter": {
					    "fetchSize":"10000",
						"username": "postgres",
						"password": "postgres",
						"connection": [
							{
								"jdbcUrl": [
									"jdbc:postgresql://10.0.30.3:5432/postgres"
								],
								"querySql": [
                                         			"select locationtime from gis_location_user_track_201904 ;"
								]
							}
						]
					}
				},
				"writer": {
                    			"name": "txtfilewriter",
                    			"parameter": {
                        			"fileName": "pg_mubiao_result",
                        			"fileFormat": "csv",
                        			"path": "/opt/datax/job",
						"writeMode": "truncate",
                    					}
                				}
            				}
        			],
        			"setting": { 
            				"speed": {
                    				"channel": 2
                		}
        		}
    }
}

3.datax_timeyuanduan.json (获取源端库字段)

{
	"job": {
		"setting": {
			"speed": {
				"byte": 1073741824
			},
			"errorLimit": {
				"record": 0,
				"percentage": 0.02
			}
		},
		"content": [
			{
				"reader": {
					"name": "postgresqlreader",
					"parameter": {
					    "fetchSize":"10000",
						"username": "postgres",
						"password": "postgres",
						"connection": [
							{
								"jdbcUrl": [
									"jdbc:postgresql://10.0.30.10:5432/postgres"
								],
								"querySql": [
                                         			"select locationtime from gis_location_user_track_201904 ;"
								]
							}
						]
					}
				},
				"writer": {
                    			"name": "txtfilewriter",
                    			"parameter": {
                        			"fileName": "pg_yuanduan_result",
                        			"fileFormat": "csv",
                        			"path": "/opt/datax/job",
						"writeMode": "truncate",
                    					}
                				}
            				}
        			],
        			"setting": { 
            				"speed": {
                    				"channel": 2
                		}
        		}
    }
}

4.增量迁移脚本data.sh

#! /bin/bash
#获取目标数据库最大数据时间,并写入一个 csv 文件
python /opt/datax/bin/datax.py /opt/datax/job/datax_timeyuanduan.json 
if [ $? -ne 0 ]; then
 	echo "data.sh error, can not get yuanduan_time from target db!"
exit 1
fi

python /opt/datax/bin/datax.py /opt/datax/job/datax_timemubiao.json 
# $?是shell变量,表示"最后一次执行命令"的退出状态.0为成功,非0为失败, -ne 为不等于
if [ $? -ne 0 ]; then
  echo "data.sh error, can not get mubiao_time from target db!"
  exit 1
fi
# 找到 DataX 写入的文本文件,并将内容读取到一个变量中
MUDI_TIME=`ls /opt/datax/job/pg_mubiao_result__*`
YUAN_TIME=`ls /opt/datax/job/pg_yuanduan_result_*`
if [ ! -s "$MUDI_TIME" ]  ; then
	echo "++++++++++全量迁移+++++++++++++++++++++++"
	#全量迁移
	python /opt/datax/bin/datax.py /opt/datax/job/datax_quanliang.json
  	exit 1 
fi

#获取creat_time源端和目的端不同字段的行
grep -v -wvf $MUDI_TIME $YUAN_TIME > /opt/datax/job/datax.zengliang.txt
grep -wf $MUDI_TIME $YUAN_TIME > /opt/datax/job/datax.xiangtong.txt
ZL="/opt/datax/job/datax.zengliang.txt"
XT='/opt/datax/job/datax.xiangtong.txt'

#增量迁移
cat $ZL | while read line 
do
        /usr/local/pgsql/12/bin/psql -U postgres -w  -c "select id from gis_location_user_track_201904 where locationtime='$line' ;" > /opt/datax/job/id.txt
	yy=`awk 'NR==3{print}' /opt/datax/job/id.txt`
	/usr/local/pgsql/12/bin/psql -h 10.0.30.3 -U postgres -w  -c "delete from gis_location_user_track_201904 where id = '$yy';" 
	sed "s/1=1/locationtime = '$line'/g" /opt/datax/job/datax_quanliang.json > /opt/datax/job/datax_quanliang.tmp.json
	echo "+++++++增量更新+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"
	python /opt/datax/bin/datax.py /opt/datax/job/datax_quanliang.tmp.json
  # 删除临时文件
	rm -rf /opt/datax/job/datax_quanliang.tmp.json
done
exit

操作步骤:

1.先创建好需要预创建的文件,文件根据实际的datax安装路劲,需要在datax.sh脚本里面自行修改。
我的环境把/opt/datax/job/datax_quanliang.json;/opt/datax/job/datax_timeyuanduan.json ;/opt/datax/job/datax_timemubiao.json
放到我安装的dataX路径/opt/datax/job/目录下
在我安装的dataX路径/opt/datax/job/目录下创建/opt/datax/job/datax.zengliang.txt ;/opt/datax/job/datax.xiangtong.txt
/opt/datax/job/id.txt

2.第一次执行全量迁移,(脚本路劲存放随意)
./data.sh

3.进行增量迁移
./data.sh

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐