需要实时采集MongoDB中的数据,所以考虑使用flink cdc mongodb,在flink cdc2.1版本后也支持了MongoDB的数据采集,是通过oplog.

MongoDB中的存储数据的文档结构(JSON):

{"_id": {"$numberLong": "375756968729522176"}, "tenantId": "1000000001", "grade": 1, "name": "VIP1", "growth": "0", "enable": true, "createdAt": {"$date": 1623849842573}, "createdBy": "system", "lastUpdatedAt": {"$date": 1656424909977}, "_class": "com.xiaoshouyi.member.service.document.GradeSetting", "updatedBy": "707", "gradeRightList": [{"gradeRightType": "POSTAGE_DISCOUNT", "postageRight": {"consumeAmount": "10", "postageType": 1, "discountPostageAmt": "10"}}, {"gradeRightType": "MULT_CREDIT", "creditMultRight": {"isMult": true, "multiple": "12", "productScopeList": [{"scopeType": "0", "isIncluding": true, "elementList": [{"name": "Fancl防晒霜", "code": "C-3002"}, {"name": "沈欣牌特大龙虾500g(±3g)", "code": "S-05030001-500"}, {"name": "测试PLU拦截01", "code": "01010005"}, {"name": "测试PLU拦截的", "code": "01010004"}, {"name": "文档测试2", "code": "01010003"}, {"name": "路飞的帽子", "code": "06010001"}, {"name": "沪溪河", "code": "02010001"}, {"name": "鲍师傅", "code": "01010002"}]}]}}], "growthTo": "50"}

这个文档数据结构中包含一个比较复杂的嵌套JSON数据:

 所以本文最主要就是介绍如何解析复杂monggo中复杂嵌套JSON数据

flink SQL

create table members(
_id bigint,
_class string,
createdAt date,
createdBy string,
enable int,
grade int,
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
growth String,
lastUpdatedAt date,
name string,
tenantId string,
updateBy string,
PRIMARY KEY(_id) NOT ENFORCED
)with(
'connector' = 'mongodb-cdc',
'hosts' = '10.150.20.12:27017',
'username' = 'readonly',
'password' = 'y5Gi2BjbK3',
'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
'database' = 'member_db',
'collection' = 'm_grade_setting'
)

gradeRightList 字段就是嵌套JSON复杂的字段,如果有一样的复杂JSON嵌套,可以参考对应的解析,应该可以包括所有了。

gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,

对应的查询解析:

 tableEnv.executeSql(mongoDBKafaSql)
 tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()

当然也可以参考:Flink sql 对 array map ROW的使用和解析

完整的SQL 代码:

object FlinkMongoConnect {

  def main(args: Array[String]): Unit = {

    var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)


    var mongoDBKafaSql:String =
      """
        |create table members(
        |_id bigint,
        |_class string,
        |createdAt date,
        |createdBy string,
        |enable int,
        |grade int,
        |gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
        |growth String,
        |lastUpdatedAt date,
        |name string,
        |tenantId string,
        |updateBy string,
        |PRIMARY KEY(_id) NOT ENFORCED
        |)with(
        |'connector' = 'mongodb-cdc',
        |'hosts' = '10.150.20.12:27017',
        |'username' = 'readonly',
        |'password' = 'y5Gi2BjbK3',
        |'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
        |'database' = 'member_db',
        |'collection' = 'm_grade_setting'
        |)
      """.stripMargin

    /**
      * sql方式 375756968729522176
      */
        tableEnv.executeSql(mongoDBKafaSql)
        tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()



    env.execute()

  }
}

Flink 代码方式:

这种方式不多做介绍,代码方式处理JSON太容易了...

object FlinkMongoConnect {

  def main(args: Array[String]): Unit = {

    var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)



    var value: SourceFunction[String] = MongoDBSource.builder()
      .hosts("10.150.20.12:27017")
      .username("readonly")
      .password("y5Gi2BjbK3")
      .databaseList("member_db")
      .collectionList("member_db.m_grade_setting")
      .copyExisting(true)
      .deserializer(new JsonDebeziumDeserializationSchema())
      .build()

      env.addSource(value)
      .print().setParallelism(1)



    env.execute()

  }
}

参考官方文章:

1.CDC

        

        本文主要就是介绍了MongoDB CDC的使用,后续会介绍flink SQL方式写入MongoDB,官方还没有很好地支持这一点,也是需要修改一些代码实现这一功能。更多原理精彩文章可以关注《迪答》公众号

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐