目录

前言

一、Kafka的数据 结构

二、步骤

1.构建系统环境

2.将Kafka数据映射成表

3、映射TiDB表数据

4、数据处理

总结



前言

使用Flink SQL的功能实现对Kafka的数据进行处理,减少业务代码的开发工作量,业务处理逻辑使用SQL语句实现

一、Kafka的数据 结构

kafka数据示例,实际比这个要复杂很多

{
  "fileInfo": [
    {
      "fileId": 4169200573588,
      "fileSize": 21.0,
      "fileType": 5,
      "md5": "B54FA0BB16D9D1180619FC5D4D653494"
    },
    "subjectMap": {
        "1234": 0,
        "4455": 4373
      }
  ],
  "serverIp": "10.101.0.1"
}

二、步骤

1.构建系统环境

代码如下:

//构建系统环境,使用流处理模式
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()

    env.setParallelism(1);
    // 重启策略配置
    // 固定延迟重启(最多尝试3次,每次间隔10s)
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L));
    // 失败率重启(在10分钟内最多尝试3次,每次至少间隔1分钟)
    env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10), Time.minutes(1)));
    val tableEnv = StreamTableEnvironment.create(env,settings)

2.将Kafka数据映射成表

代码如下:

//读取kafka数据,将topic 映射成表
    var sqlInfo =
      """create table data_table (
        |serverIp STRING,
        |fileInfo ARRAY<ROW<fileId BIGINT,fileSize STRING,fileType INT,md5 STRING>,
        |subjectMap MAP<STRING, INT>>
        |)
        |WITH
        |(
        |'connector' = 'kafka',
        |'topic' = 'middle_vac_to_convert',
        |'properties.bootstrap.servers' = '10.101.1.101:9092,10.101.1.102:9092,10.101.1.103:9092',
        |'properties.group.id' = 'consumerGroup',
        |'format' = 'json',
        |'json.ignore-parse-errors' = 'true',
        |'scan.startup.mode' = 'latest-offset',
        |'json.fail-on-missing-field' = 'false')""".stripMargin
    tableEnv.executeSql(sqlInfo)

此处将创建Table  (data_table),后续可以对data_table进行SQL操作

3、映射TiDB表数据

tableEnv.executeSql("CREATE TABLE  FILEINFO_TEMP (" +
      "ID BIGINT ," +
      "FileType INT, " +
      "Content STRING, " +
      "HashCode STRING, " +
      "FileSize STRING, " +
      "InsertTime DATE, " +
      "UpdateTime DATE" +
            ") " +
      "WITH (" +
      "'connector' = 'jdbc'," +
      "'url' = 'jdbc:mysql://10.1.1.101:4000/MYDB?useUnicode=true&characterEncoding=utf-8&&useOldAliasMetadataBehavior=true&useSSL=false'," +
      "'table-name' = 'FILE_INFO'," +
      " 'username' = 'root'," +
      " 'password' = '123456'" +
      " )");

4、数据处理

//创建对应数据的查询语句
    var querySql =
      """
        |select  fileId1,fileSize2,fileType3,md54 from data_table CROSS JOIN UNNEST(fileInfo) AS t (fileId1,fileSize2,fileType3,md54)
        |""".stripMargin
    var fileInfoSql = tableEnv.sqlQuery(querySql)
//注册成中间表
    tableEnv.createTemporaryView("fileTableView", fileInfoSql)
    //构建新增sql语句
    var insertsql = "insert into FILEINFO_TEMP select fileId1, CAST(releaseTime AS DATE), fileType3, " +
      "CAST(NULL AS STRING), md54, fileSize2, CAST(NULL AS DATE),CAST(NULL AS DATE)," + System.nanoTime() +
      " from fileTableView"
    //单独提交任务
//    tableEnv.executeSql(insertsql .stripMargin)

使用 CROSS JOIN UNNEST 解析数组类型的数据


总结

该处理方式方便熟悉SQL语言的进行操作,方便简单,但是如果涉及多表事务处理,此方法就不支持了,此方法只能写入单表或者无数据关联的表

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐