使用DataX将Hive表数据导出至MongoDB(2022年)
使用DataX将Hive表数据导出至MongoDB
目录
背景:
在网上查了很多,都是关于MongoDB导入至Hive仓库的,很少有从Hive导出至MongDB的文章。最近因为兄弟部门在做【用户画像】,使用的存储介质是MongoDB(便于字段扩展),现在要用到数仓里面某些表的数据,这样问题就出现了:“怎么将Hive表的数据导出至MongoDB?”。
因为MySQL导入到HDFS用的是Sqoop,网上查了Sqoop对关系型的数据库比较友好,对MongoDB这样的NoSQL不是太支持,所以想到了用【阿里的DataX】。经过调研,dataX确实可以解决此场景。
需要解决的问题:
【使用DataX将Hive表数据导出至MongoDB】
安装DataX
1)前置条件
- Linux
- JDK(1.8以上,推荐1.8)
- Python(推荐Python2.6.X)
2)下载地址
http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
3)上传jar包
cd /opt/software
--上传jar包
4)解压
tar -zxvf datax.tar.gz -C /opt/module/
5)自动检测脚本
cd datax/bin/
python datax.py /opt/module/datax/job/job.json
说明安装
配置环境变量
#DATAX_HOME
export DATAX_HOME=/opt/module/datax
export PATH=$PATH:$DATAX_HOME/bin
使用
1. 读取hdfs数据写入mongodb
1)查看官方模板
python bin/datax.py -r hdfsreader -w mongodbwriter
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
Please refer to the hdfsreader document:
https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md
Please refer to the mongodbwriter document:
https://github.com/alibaba/DataX/blob/master/mongodbwriter/doc/mongodbwriter.md
Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"column": [],
"defaultFS": "",
"encoding": "UTF-8",
"fieldDelimiter": ",",
"fileType": "orc",
"path": ""
}
},
"writer": {
"name": "mongodbwriter",
"parameter": {
"address": [],
"collectionName": "",
"column": [],
"dbName": "",
"upsertInfo": {
"isUpsert": "",
"upsertKey": ""
},
"userName": "",
"userPassword": ""
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
2)编写配置文件
-配置文件生成脚本:
gen_export_config.py
# coding=utf-8
import json
import getopt
import os
import sys
import pymongo
#MongoDB相关配置,需根据实际情况作出修改
mongodb_host = "xxxx.mongodb.rds.aliyuncs.com"
mongodb_port = "3717"
mongodb_user = "xxx"
mongodb_passwd = "xxx"
#HDFS NameNode相关配置,需根据实际情况作出修改
#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/export"
def generate_json(target_database, target_table):
job = {
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "${exportdir}",
"defaultFS": "hdfs://xx",
"hadoopConfig":{
"dfs.nameservices": "xx",
"dfs.ha.namenodes.mycluster": "nn1,nn2,nn3",
"dfs.namenode.rpc-address.xxx": "xxx",
"dfs.namenode.rpc-address.xxx": "xxx",
"dfs.namenode.rpc-address.xxx": "xxx",
"dfs.client.failover.proxy.provider.mycluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
},
"column": ["*"],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": "\t",
"nullFormat": "\\N"
}
},
"writer": {
"name": "mongodbwriter",
"parameter": {
"address": ["xxx"],
"collectionName": "xxx",
"column": [
{
"name":"id",
"type":"string"
},
{
"name":"channel",
"type":"string"
},
{
"name":"platform",
"type":"string"
}
],
"dbName": "xxx",
"writeMode": "replace",
"userName": "xxx",
"userPassword": "xxx"
}
}
}]
}
}
if not os.path.exists(output_path):
os.makedirs(output_path)
with open(os.path.join(output_path, ".".join([target_database, target_table, "json"])), "w") as f:
json.dump(job, f)
def main(args):
target_database = ""
target_table = ""
options, arguments = getopt.getopt(args, '-d:-t:', ['targetdb=', 'targettbl='])
for opt_name, opt_value in options:
if opt_name in ('-d', '--targetdb'):
target_database = opt_value
if opt_name in ('-t', '--targettbl'):
target_table = opt_value
generate_json(target_database, target_table)
if __name__ == '__main__':
main(sys.argv[1:])
--安装Python Mongodb驱动
--三台机器都需要安装
sudo yum install -y pymongo
--生成datax同步文件
python gen_export_config.py -d xxx -t xxx
-编写生成脚本
gen_export_config.sh
vim ~/bin/gen_export_config.sh
#!/bin/bash
python ~/bin/gen_export_config.py -d xxx -t xxx
添加执行权限
chmod +x ~/bin/gen_export_config.sh
--生成配置文件
gen_export_config.sh
3)测试生成的dataX配置文件
执行命令
python /opt/module/datax/bin/datax.py -p"-Dexportdir=/warehouse/xxx/dwd/xxx/dt=2022-04-19" /opt/module/datax/job/export/xxx.xxx.json
观察mongodb结果
同步了mongodb数据
4) 编写导出脚本
vim hdfs_to_mongodb.sh
#! /bin/bash
DATAX_HOME=/opt/module/datax
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d "-1 day" +%F`
fi
#DataX导出路径不允许存在空文件,该函数作用为清理空文件
handle_export_path(){
for i in `hadoop fs -ls -R $1 | awk '{print $8}'`; do
hadoop fs -test -z $i
if [[ $? -eq 0 ]]; then
echo "$i文件大小为0,正在删除"
hadoop fs -rm -r -f $i
fi
done
}
#数据导出
export_data() {
datax_config=$1
export_dir=$2
handle_export_path $export_dir
$DATAX_HOME/bin/datax.py -p"-Dexportdir=$export_dir" $datax_config
}
case $1 in
"dwd_event_log")
export_data /opt/module/datax/job/export/xxx.json /warehouse/xxx/dwd/xxx/dt=${do_date}
;;
"all")
export_data /opt/module/datax/job/export/xxx.json /warehouse/xxx/dwd/xxx/dt=${do_date}
;;
esac
--添加权限
chmod +x hdfs_to_mongodb.sh
--执行
hdfs_to_mongodb.sh all
成功导入300137条数据至mongodb!!!
注意错误:
ERROR WriterRunner - Writer Runner Received Exceptions:
com.alibaba.fastjson.JSONException: syntax error, pos 1
2022-04-20 16:53:00.138 [0-0-0-reader] INFO UnstructuredStorageReaderUtil - CsvReader使用默认值[{"captureRawRecord":true,"columnCount":0,"comment":"#","currentRecord":-1,"delimiter":"\t","escapeMode":1,"headerCount":0,"rawRecord":"","recordDelimiter":"\u0000","safetySwitch":false,"skipEmptyRecords":true,"textQualifier":"\"","trimWhitespace":true,"useComments":false,"useTextQualifier":true,"values":[]}],csvReaderConfig值为[null]
2022-04-20 16:53:00.205 [0-0-2-writer] ERROR WriterRunner - Writer Runner Received Exceptions:
com.alibaba.fastjson.JSONException: syntax error, pos 1
at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1295) ~[fastjson-1.1.46.sec01.jar:na]
at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1205) ~[fastjson-1.1.46.sec01.jar:na]
at com.alibaba.fastjson.JSON.parse(JSON.java:108) ~[fastjson-1.1.46.sec01.jar:na]
at com.alibaba.fastjson.JSON.parse(JSON.java:99) ~[fastjson-1.1.46.sec01.jar:na]
at com.alibaba.fastjson.JSON.parseObject(JSON.java:170) ~[fastjson-1.1.46.sec01.jar:na]
at com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriter$Task.init(MongoDBWriter.java:331) ~[mongodbwriter-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:44) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
2022-04-20 16:53:00.206 [0-0-1-writer] ERROR WriterRunner - Writer Runner Received Exceptions:
com.alibaba.fastjson.JSONException: syntax error, pos 1
at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1295) ~[fastjson-1.1.46.sec01.jar:na]
at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1205) ~[fastjson-1.1.46.sec01.jar:na]
at com.alibaba.fastjson.JSON.parse(JSON.java:108) ~[fastjson-1.1.46.sec01.jar:na]
at com.alibaba.fastjson.JSON.parse(JSON.java:99) ~[fastjson-1.1.46.sec01.jar:na]
at com.alibaba.fastjson.JSON.parseObject(JSON.java:170) ~[fastjson-1.1.46.sec01.jar:na]
at com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriter$Task.init(MongoDBWriter.java:331) ~[mongodbwriter-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:44) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
2022-04-20 16:53:00.208 [0-0-0-writer] ERROR WriterRunner - Writer Runner Received Exceptions:
com.alibaba.fastjson.JSONException: syntax error, pos 1
at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1295) ~[fastjson-1.1.46.sec01.jar:na]
at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1205) ~[fastjson-1.1.46.sec01.jar:na]
at com.alibaba.fastjson.JSON.parse(JSON.java:108) ~[fastjson-1.1.46.sec01.jar:na]
at com.alibaba.fastjson.JSON.parse(JSON.java:99) ~[fastjson-1.1.46.sec01.jar:na]
at com.alibaba.fastjson.JSON.parseObject(JSON.java:170) ~[fastjson-1.1.46.sec01.jar:na]
at com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriter$Task.init(MongoDBWriter.java:331) ~[mongodbwriter-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:44) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
解决方法是:
将配置文件中的:
"writeMode": "replace",
修改成了:
"isReplace": "true",
"replaceKey": "id",
问题就解决了!!!
好了,如果你感觉有用,请帮忙点赞吧!!!!
更多推荐
所有评论(0)