1 读kafka参考代码

/**
 * flink-sql形式读取(flink1.14支持多个topic), 适合json格式统一的情况
 */
def readKafka(tab_env: StreamTableEnvironment, topics: String, servers: String, group_id: String): Unit = {
    // 连接kafka
    val kafka_view =
        s"""
           |CREATE TABLE kafka_source (
           |sts      BIGINT   ,
           |ins      STRING   ,
           |isfl     BOOLEAN  ,
           |events   ARRAY< ROW<tid BIGINT, event STRING, ts BIGINT, net INT, eparam STRING> > 
           |)
           |WITH (
           |    'connector' = 'kafka',
           |    'topic' = '${topics}',  --多个间用逗号分隔
           |    'properties.bootstrap.servers' = '${servers}',
           |    'properties.group.id' = '${group_id}',
           |    'scan.startup.mode' = 'latest-offset',
           |    'format' = 'json',
           |    'json.fail-on-missing-field' = 'true',
           |    'json.ignore-parse-errors' = 'false'
           |)
           |""".stripMargin
    tab_env.executeSql(kafka_view)
}


/**
 * flink-sql形式读取(flink1.14支持多个topic), 适合直接获取json串
 */
def readKafka(tab_env: StreamTableEnvironment, topics: String, servers: String, group_id: String): Unit = {
    // 连接kafka
    val kafka_view =
        s"""
           |CREATE TABLE kafka_source (
           |json_str      string
           |)
           |WITH (
           |    'connector' = 'kafka',
           |    'topic' = '${topics}',  --多个间用逗号分隔
           |    'properties.bootstrap.servers' = '${servers}',
           |    'properties.group.id' = '${group_id}',
           |    'scan.startup.mode' = 'latest-offset',
           |    'format' = 'json',
           |    'json.fail-on-missing-field' = 'true',
           |    'json.ignore-parse-errors' = 'false'
           |)
           |""".stripMargin
    tab_env.executeSql(kafka_view)
}


/**
 * datastream形式读取, 适合直接获取json串
 */
def getSdk(env: StreamExecutionEnvironment, tab_env: StreamTableEnvironment, topics_str: String, servers: String, group_id: String): Unit = {
    // 从kafka读取数据
    val topics = topics_str.split(",").toList
    val source = KafkaSource
            .builder[String]
            .setBootstrapServers(servers)
            .setTopics(topics: _*)
            .setGroupId(group_id)
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build
    val input_stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka_source")

    // 格式化数据, 并创建临时视图
    // todo : 为什么需要flatMap, 没有按行读取?
    val input_tab = input_stream
            .flatMap(_.split("\n"))
            .map(x => JavaTool.formateJson(x))
    // input_tab.print()
    tab_env.createTemporaryView("input_tab", input_tab)

    // 解析json数据
    val sql =
        """
          |SELECT
          |    JSON_VALUE(f0,'$.gaid' RETURNING STRING)   AS  gaid   ,
          |    JSON_VALUE(f0,'$.vid' RETURNING STRING)    AS  vid
          |FROM
          |    input_tab
          |""".stripMargin
    tab_env.createTemporaryView("kafka_source", tab_env.sqlQuery(sql))
}

2 写kafka参考代码

/**
 * 数据写出到kafka
 */
def writeToKafka(tab_env: StreamTableEnvironment, topic: String, servers: String): Unit = {
    // 连接kafka
    val kafka_view =
        s"""
           |CREATE TABLE kafka_in (
           |mcc      STRING   ,
           |mnc      STRING
           |)
           |WITH (
           |    'connector' = 'kafka',
           |    'topic' = '${topic}',
           |    'properties.bootstrap.servers' = '${servers}',
           |    'format' = 'json'
           |)
           |""".stripMargin
    tab_env.executeSql(kafka_view)
}

flink写入kafka时指定压缩格式

'properties.compression.type'='gzip'


Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐