目录

背景:

需要解决的问题:

安装DataX

2)下载地址

3)上传jar包

4)解压

5)自动检测脚本

配置环境变量

使用

1. 读取hdfs数据写入mongodb

1)查看官方模板

2)编写配置文件

3)测试生成的dataX配置文件

4) 编写导出脚本

注意错误:

解决方法是:


背景:

        在网上查了很多,都是关于MongoDB导入至Hive仓库的,很少有从Hive导出至MongDB的文章。最近因为兄弟部门在做【用户画像】,使用的存储介质是MongoDB(便于字段扩展),现在要用到数仓里面某些表的数据,这样问题就出现了:“怎么将Hive表的数据导出至MongoDB?”。

        因为MySQL导入到HDFS用的是Sqoop,网上查了Sqoop对关系型的数据库比较友好,对MongoDB这样的NoSQL不是太支持,所以想到了用【阿里的DataX】。经过调研,dataX确实可以解决此场景。

需要解决的问题:

【使用DataX将Hive表数据导出至MongoDB】

安装DataX

1)前置条件

- Linux

- JDK(1.8以上,推荐1.8)

- Python(推荐Python2.6.X)

2)下载地址

http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

3)上传jar包

cd /opt/software

--上传jar包

4)解压

tar -zxvf datax.tar.gz -C /opt/module/

5)自动检测脚本

cd datax/bin/

python datax.py /opt/module/datax/job/job.json

说明安装

配置环境变量

#DATAX_HOME

export DATAX_HOME=/opt/module/datax

export PATH=$PATH:$DATAX_HOME/bin

使用

1. 读取hdfs数据写入mongodb

1)查看官方模板

python bin/datax.py -r hdfsreader -w mongodbwriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the hdfsreader document:
     https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md

Please refer to the mongodbwriter document:
     https://github.com/alibaba/DataX/blob/master/mongodbwriter/doc/mongodbwriter.md

Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "column": [],
                        "defaultFS": "",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ",",
                        "fileType": "orc",
                        "path": ""
                    }
                },
                "writer": {
                    "name": "mongodbwriter",
                    "parameter": {
                        "address": [],
                        "collectionName": "",
                        "column": [],
                        "dbName": "",
                        "upsertInfo": {
                            "isUpsert": "",
                            "upsertKey": ""
                        },
                        "userName": "",
                        "userPassword": ""
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

2)编写配置文件

-配置文件生成脚本:

gen_export_config.py

# coding=utf-8
import json
import getopt
import os
import sys
import pymongo

#MongoDB相关配置,需根据实际情况作出修改
mongodb_host = "xxxx.mongodb.rds.aliyuncs.com"
mongodb_port = "3717"
mongodb_user = "xxx"
mongodb_passwd = "xxx"

#HDFS NameNode相关配置,需根据实际情况作出修改

#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/export"

def generate_json(target_database, target_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "path": "${exportdir}",
                        "defaultFS": "hdfs://xx",
    "hadoopConfig":{
                            "dfs.nameservices": "xx",
                            "dfs.ha.namenodes.mycluster": "nn1,nn2,nn3",
                            "dfs.namenode.rpc-address.xxx": "xxx",
                            "dfs.namenode.rpc-address.xxx": "xxx",
                            "dfs.namenode.rpc-address.xxx": "xxx",
                            "dfs.client.failover.proxy.provider.mycluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                        },
                        "column": ["*"],
                        "fileType": "text",
                        "encoding": "UTF-8",
                        "fieldDelimiter": "\t",
                        "nullFormat": "\\N"
                    }
                },
                "writer": {
                    "name": "mongodbwriter",
                    "parameter": {
                        "address": ["xxx"],
                        "collectionName": "xxx",
                        "column": [
    {
                            "name":"id",
                            "type":"string"
                            },
                            {
                            "name":"channel",
                            "type":"string"
                            },
    {
                            "name":"platform",
                            "type":"string"
                            }
                        ], 
                        "dbName": "xxx",
                        "writeMode": "replace",
                        "userName": "xxx",
                        "userPassword": "xxx"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([target_database, target_table, "json"])), "w") as f:
        json.dump(job, f)


def main(args):
    target_database = ""
    target_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:', ['targetdb=', 'targettbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--targetdb'):
            target_database = opt_value
        if opt_name in ('-t', '--targettbl'):
            target_table = opt_value

    generate_json(target_database, target_table)

if __name__ == '__main__':
    main(sys.argv[1:])

--安装Python Mongodb驱动

--三台机器都需要安装

sudo yum install -y pymongo

--生成datax同步文件

python gen_export_config.py -d xxx -t xxx

-编写生成脚本

gen_export_config.sh

vim ~/bin/gen_export_config.sh

#!/bin/bash

python ~/bin/gen_export_config.py -d xxx -t xxx

 添加执行权限

chmod +x ~/bin/gen_export_config.sh

--生成配置文件

gen_export_config.sh 

3)测试生成的dataX配置文件

执行命令

python /opt/module/datax/bin/datax.py -p"-Dexportdir=/warehouse/xxx/dwd/xxx/dt=2022-04-19" /opt/module/datax/job/export/xxx.xxx.json

观察mongodb结果

同步了mongodb数据

4) 编写导出脚本

vim hdfs_to_mongodb.sh 

#! /bin/bash

DATAX_HOME=/opt/module/datax

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
    do_date=$2
else 
    do_date=`date -d "-1 day" +%F`
fi

#DataX导出路径不允许存在空文件,该函数作用为清理空文件
handle_export_path(){
  for i in `hadoop fs -ls -R $1 | awk '{print $8}'`; do
    hadoop fs -test -z $i
    if [[ $? -eq 0 ]]; then
      echo "$i文件大小为0,正在删除"
      hadoop fs -rm -r -f $i
    fi
  done
}

#数据导出
export_data() {
  datax_config=$1
  export_dir=$2
  handle_export_path $export_dir
  $DATAX_HOME/bin/datax.py -p"-Dexportdir=$export_dir" $datax_config
}

case $1 in
  "dwd_event_log")
    export_data /opt/module/datax/job/export/xxx.json /warehouse/xxx/dwd/xxx/dt=${do_date}
  ;;
  

"all")
  export_data /opt/module/datax/job/export/xxx.json /warehouse/xxx/dwd/xxx/dt=${do_date}
  ;;
esac

--添加权限

chmod +x hdfs_to_mongodb.sh 

--执行

hdfs_to_mongodb.sh all

成功导入300137条数据至mongodb!!! 

注意错误:

ERROR WriterRunner - Writer Runner Received Exceptions:
com.alibaba.fastjson.JSONException: syntax error, pos 1

2022-04-20 16:53:00.138 [0-0-0-reader] INFO  UnstructuredStorageReaderUtil - CsvReader使用默认值[{"captureRawRecord":true,"columnCount":0,"comment":"#","currentRecord":-1,"delimiter":"\t","escapeMode":1,"headerCount":0,"rawRecord":"","recordDelimiter":"\u0000","safetySwitch":false,"skipEmptyRecords":true,"textQualifier":"\"","trimWhitespace":true,"useComments":false,"useTextQualifier":true,"values":[]}],csvReaderConfig值为[null]
2022-04-20 16:53:00.205 [0-0-2-writer] ERROR WriterRunner - Writer Runner Received Exceptions:
com.alibaba.fastjson.JSONException: syntax error, pos 1
        at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1295) ~[fastjson-1.1.46.sec01.jar:na]
        at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1205) ~[fastjson-1.1.46.sec01.jar:na]
        at com.alibaba.fastjson.JSON.parse(JSON.java:108) ~[fastjson-1.1.46.sec01.jar:na]
        at com.alibaba.fastjson.JSON.parse(JSON.java:99) ~[fastjson-1.1.46.sec01.jar:na]
        at com.alibaba.fastjson.JSON.parseObject(JSON.java:170) ~[fastjson-1.1.46.sec01.jar:na]
        at com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriter$Task.init(MongoDBWriter.java:331) ~[mongodbwriter-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:44) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
2022-04-20 16:53:00.206 [0-0-1-writer] ERROR WriterRunner - Writer Runner Received Exceptions:
com.alibaba.fastjson.JSONException: syntax error, pos 1
        at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1295) ~[fastjson-1.1.46.sec01.jar:na]
        at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1205) ~[fastjson-1.1.46.sec01.jar:na]
        at com.alibaba.fastjson.JSON.parse(JSON.java:108) ~[fastjson-1.1.46.sec01.jar:na]
        at com.alibaba.fastjson.JSON.parse(JSON.java:99) ~[fastjson-1.1.46.sec01.jar:na]
        at com.alibaba.fastjson.JSON.parseObject(JSON.java:170) ~[fastjson-1.1.46.sec01.jar:na]
        at com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriter$Task.init(MongoDBWriter.java:331) ~[mongodbwriter-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:44) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
2022-04-20 16:53:00.208 [0-0-0-writer] ERROR WriterRunner - Writer Runner Received Exceptions:
com.alibaba.fastjson.JSONException: syntax error, pos 1
        at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1295) ~[fastjson-1.1.46.sec01.jar:na]
        at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1205) ~[fastjson-1.1.46.sec01.jar:na]
        at com.alibaba.fastjson.JSON.parse(JSON.java:108) ~[fastjson-1.1.46.sec01.jar:na]
        at com.alibaba.fastjson.JSON.parse(JSON.java:99) ~[fastjson-1.1.46.sec01.jar:na]
        at com.alibaba.fastjson.JSON.parseObject(JSON.java:170) ~[fastjson-1.1.46.sec01.jar:na]
        at com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriter$Task.init(MongoDBWriter.java:331) ~[mongodbwriter-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:44) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]

解决方法是:

将配置文件中的:

"writeMode": "replace",

修改成了:

"isReplace": "true",
"replaceKey": "id",

问题就解决了!!!

好了,如果你感觉有用,请帮忙点赞吧!!!!

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐