一、MongoDB

1. 时区概念

GMT 就是格林威治标准时间的英文缩写(Greenwich Mean Time 格林尼治标准时间),是世界标准时间,GMT+8 是格林威治时间+8小时,中国所在时区就是gmt+8 。在国内,本地时间和“GMT+8”时区无区别。在国外,本地时间每个地方都不相同,所以只能用一条一条的经线计算时间。

然而,mongodb默认就是UTC时间,服务器端貌似无法设置,万能的网友提供的解决方案很多,网上一大堆,其根本原因就是使用JAVA写入的时候mongo驱动里面做了转换。
参考https://blog.csdn.net/jsdxshi/article/details/72831116
解决方案: https://blog.csdn.net/weixin_26782843/article/details/114094264

2. Mongo常用命令
mongo mongodb://user:pwd@dds-xx.mongodb.rds.aliyuncs.com:3717/act_operation_message
MongoDB shell version v4.2.3
Implicit session: session { "id" : UUID("f293288e-5750-4898-95cf-5fa406376bea") }
MongoDB server version: 4.2.10
mgset-12038389:PRIMARY> show collections;
db.sms_notify_send_record_202004.find({"messageId":"0ebfa545-6bd6-4e3a-bfa5-456bd60e3a2f"}).pretty();
db.messageResponseResult.find({"_id":ObjectId("60462f0e2d23b30f871dcef9")}).pretty();
{
	"_id" : ObjectId("60462f0e2d23b30f871dcef9"),
	"eventType" : "E_SEND_UNACTIVATED_MSG",
	"message" : "设备不在线或内部处理异常",
	"traceId" : "08380500-bfce-41ee-931f-e2c7e6403e49",
	"receiverId" : "wxid_bz5tgkhxnegp12",
	"senderWeChatId" : "wxid_og03p4hvd9so12",
	"stuId" : 1444271,
	"selId" : 42711,
	"status" : 2,
	"createTime" : ISODate("2021-03-08T22:05:02.952Z"),
	"updateTime" : ISODate("2021-03-08T22:05:02.952Z")
}

db.messageResponseResult.find({"updateTime":{"$gte":ISODate("2021-03-16"),"$lt":ISODate("2021-03-17")}}).count(true) 
db.messageResponseResult.find({"updateTime":{"$gte":ISODate("2021-03-16T00:00:00.000Z"),"$lt":ISODate("2021-03-17T00:00:00.000Z")}}).count(true)
3. MongoDB & DataX类型对比:

在这里插入图片描述

4. MongoDB Java API 构造数据记录:
import com.alibaba.fastjson.JSONObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;

import java.util.*;

/**
 * mongo mongodb://user:pwd@dds-xx.mongodb.rds.aliyuncs.com:3717/test
 */
public class Mongo2hive {
    public static void main(String[] args) {
        try {
            ServerAddress serverAddress = new ServerAddress("dds-uf6f9645182d5f242.mongodb.rds.aliyuncs.com", 3717);
            List<ServerAddress> addrs = new ArrayList<ServerAddress>();
            addrs.add(serverAddress);

            //MongoCredential.createScramSha1Credential()三个参数分别为 用户名 数据库名称 密码
            MongoCredential credential = MongoCredential.createScramSha1Credential("user", "db", "pwd".toCharArray());
            List<MongoCredential> credentials = new ArrayList<MongoCredential>();
            credentials.add(credential);
            //通过连接认证获取MongoDB连接
            MongoClient mongoClient = new MongoClient(addrs, credentials);

            //连接到数据库
            MongoDatabase mongoDatabase = mongoClient.getDatabase("zm-ai-test");
            // 创建collection
            // mongoDatabase.createCollection("test_table_01");

            MongoCollection<Document> collection = mongoDatabase.getCollection("test_table_01");

            JSONObject jsonObject = new JSONObject();
            jsonObject.put("name","李华");
            jsonObject.put("age",22);

            //插入文档 构造测试数据
            List<Document> documents = new ArrayList<Document>(2048);
            for (int i = 0; i < 1000; i++) {
                Document document = new Document("title", "course-" + i).
                        append("uuid", UUID.randomUUID()).
                        append("price", 999.99).
                        append("info",jsonObject.toJSONString()).
                        append("created_time",new Date().getTime());
                documents.add(document);
            }
            collection.insertMany(documents);

            // 遍历文档
            // FindIterable<Document> findIterable = collection.find();
            // MongoCursor<Document> mongoCursor = findIterable.iterator();
            // while(mongoCursor.hasNext()){
            //     System.out.println(mongoCursor.next());
            // }

            // 获取列类型
            Set<String> keys = new HashSet<>();
            for (Document doc : collection.find().limit(5)) {
                keys.addAll(doc.keySet());
            }
            keys.stream().forEach( k -> System.out.println("col = " +k));


            System.out.println("Count ---> " + collection.countDocuments());

        } catch (Exception e) {
            System.err.println(e.getClass().getName() + ": " + e.getMessage());
        }


    }
}

二、全量抽取

仅仅是设置query为空即可,可以参考官网 ,这里需要说明的是mongo的时间会在抽取过来的时候加上8个小时,也就是UTC时间变为了GMT+8,以下是个对比
在这里插入图片描述

Hive中的时间转换如下:

---- 将mongodb的UTC时间转化为GMT+8时间  28800/3600=8 小时
hive> select from_unixtime(unix_timestamp(regexp_replace('2021-03-08T22:05:02.952Z', 'T|Z', ' '))+28800,'yyyy-MM-dd HH:mm:ss');
OK
2021-03-09 06:05:02
Time taken: 0.295 seconds, Fetched: 1 row(s)

这说明,DataX在抽取的时候已经自动对其进行了转化。

三、增量抽取

示例脚本如下:

{
    "job":{
        "content":[
            {
                "reader":{
                    "name":"mongodbreader",
                    "parameter":{
                        "address":[
                            "dds-xxx.mongodb.rds.aliyuncs.com:3717",
                            "dds-xxx.mongodb.rds.aliyuncs.com:3717"
                        ],
                        "collectionName":"messageResponseResult",
                        "column":[
                            {
                                "index":0,
                                "name":"_id",
                                "type":"string"
                            },
                            {
                                "index":1,
                                "name":"eventType",
                                "type":"string"
                            },
                            {
                                "index":2,
                                "name":"message",
                                "type":"string"
                            },
                            {
                                "index":3,
                                "name":"traceId",
                                "type":"string"
                            },
                            {
                                "index":4,
                                "name":"receiverId",
                                "type":"string"
                            },
                            {
                                "index":5,
                                "name":"senderWeChatId",
                                "type":"string"
                            },
                            {
                                "index":6,
                                "name":"stuId",
                                "type":"int"
                            },
                            {
                                "index":7,
                                "name":"les_id",
                                "type":"int"
                            },
                            {
                                "index":8,
                                "name":"status",
                                "type":"int"
                            },
                            {
                                "index":9,
                                "name":"createTime",
                                "type":"date"
                            },
                            {
                                "index":10,
                                "name":"updateTime",
                                "type":"date"
                            }
                        ],
                        "dbName":"act_operation_message",
                        "userName":"xxxx",
                        "userPassword":"xxx",
                        "query":"{\"updateTime\":{\"$gte\":ISODate(\"2021-03-16T00:00:00.000Z\"),\"$lt\":ISODate(\"2021-03-17T00:00:00.000Z\")}}"
                    }
                },
                "writer":{
                    "name":"hdfswriter",
                    "parameter":{
                        "column":[
                        {
                                "name":"_id",
                                "type":"string"
                            },
                            {
                                "name":"eventType",
                                "type":"string"
                            },
                            {
                                "name":"message",
                                "type":"string"
                            },
                            {
                                "name":"traceId",
                                "type":"string"
                            },
                            {
                                "name":"receiverId",
                                "type":"string"
                            },
                            {
                                "name":"senderWeChatId",
                                "type":"string"
                            },
                            {
                                "name":"stuId",
                                "type":"int"
                            },
                            {
                                "name":"les_id",
                                "type":"int"
                            },
                            {
                                "name":"status",
                                "type":"int"
                            },
                            {
                                "name":"createTime",
                                "type":"string"
                            },
                            {
                                "name":"updateTime",
                                "type":"string"
                            }
                        ],
                        "defaultFS":"hdfs://nameservice1",
                        "haveKerberos":"true",
                        "kerberosKeytabFilePath":"/home/zmbigdata/kerberos/zm_app_prd.keytab",
                        "kerberosPrincipal":"zm_app_prd@FAYSON.COM",
                        "encoding":"UTF-8",
                        "fileType":"orc",
                        "fileName":"ods_message_response_result_df",
                        "path":"/user/hive/warehouse/test_ods.db/ods_message_response_result_df/pt=2021-03-17",
                        "writeMode":"append",
                         "compress":"SNAPPY",
                         "fieldDelimiter":"\u0001"
                    }
                }
            }
        ],
        "setting":{
            "speed":{
                "channel":"2"
            },
            "errorLimit":{
                "record":0
            }
        }
    }
}

增量数据抽取数据量对比:

#mongdb查询:
db.messageResponseResult.find({"updateTime":{"$gte":ISODate("2021-03-16T00:00:00.000Z"),"$lt":ISODate("2021-03-17T00:00:00.000Z")}}).count(true)
6781

#datax同步:
"query":"{\"updateTime\":{\"$gte\":ISODate(\"2021-03-16T00:00:00.000Z\"),\"$lt\":ISODate(\"2021-03-17T00:00:00.000Z\")}}"

2021-03-17 17:40:36.978 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-03-17 17:40:25
任务结束时刻                    : 2021-03-17 17:40:36
任务总计耗时                    :                 11s
任务平均流量                    :          105.99KB/s
记录写入速度                    :            678rec/s
读出记录总数                    :                6781
读写失败总数                    :                   0

# 数据量可以看出是一致的。

将updateTime type改为Date后对结果进行了再次check,发现时间还是多了8个小时,如下:

hive> select `_id`,message,updatetime from ods_message_response_result_df where pt = '2021-03-17' and `_id`='60462f0e2d23b30f871dcef9';
OK
60462f0e2d23b30f871dcef9	设备不在线或内部处理异常	2021-03-09 06:05:02
Time taken: 23.031 seconds, Fetched: 1 row(s)

总结

关于时间差问题,可以在开发的时候就已经转化好,这样在提取的时候就是正常东八区时间了。

参考:
https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md

mongoDB数据类型:https://www.cnblogs.com/YWDCB/p/9453887.html

MongoDB存取时间差问题解决方案:https://www.jb51.net/article/147182.htm

MongDB时区问题:https://cloud.tencent.com/developer/article/1446798

如果对你有帮助,不妨点个赞👍🏻

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐