1、项目介绍

由于上一个文档已经介绍了这个项目名。这里我就单独介绍一下这个文档主类。该文档主要是数据的主入口。同时也是可以熟悉整个代码的处理流程。
1、用户的操作日志数据(埋点数据),发送至kafka 。
2、运营人员在大数据平台配置好规则(如新用户,浏览了某一个页面…),存入mysql。
3、Flink代码定时(新增规则也能及时加载)加载mysql规则,根据规则处理日志。
4、将满足规则的数据存入ES(clickhouse)中。
5、Flink同时在根据mysql定义的规则处理数据(如新用户,浏览…),同时需要结合ES(clickhouse)查询。将满足要求的用户打上标签(特定规则有特定的标签)存入hbase中。

6、搭建API接口,开放给其他平台使用。
7、整个流程就是加载规则和处理规则,存入满足规则的用户,打上标签。

2、主类流程图

先不扯其他的,代码先上。后面在详细介绍。
在这里插入图片描述

3、主类代码

代码

package com.task

import java.text.SimpleDateFormat
import java.util.Locale
import java.util.concurrent.TimeUnit

import com.bean.{BuriedPointDetailBean, UserLastTimeBean}
import com.conf.Constants.{ENV, PARALLELISM, baseConf}
import com.conf.{BaseConf, Constants, LocalConf, OnlineConf, TestConf}
import com.func.{BroadcastProcessFunc, BroadcastProcessRuleFunc, ElasticsearchSinkFunc, HbaseSinkFunc, MysqlSourceFunc, ProcessETLWindowFunc}
import com.utils.StringUtils
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, WindowedStream, createTypeInformation}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.elasticsearch6.{ElasticsearchSink, RestClientFactory}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.util.Collector
import org.apache.http.HttpHost
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.client.RestClientBuilder

import scala.collection.immutable._


/**
 * say
 */
object RealTimeLabel {

    //设置用户
    System.setProperty("HADOOP_USER_NAME", "root")

    //主程序入口
    def main(args: Array[String]): Unit = {
        //获取参数
        val params: ParameterTool = ParameterTool.fromArgs(args)
        //创建流环境
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        //设置checkPoint检查的时间间隔为60s
        env.enableCheckpointing(60000L)
        //获取执行环境
        val run_env: String = params.get(ENV)
        var conf: BaseConf = null
        if (StringUtils.isNotEmpty(run_env)) {
            if ("online".equals(run_env)) {
                println("online init...")
                conf = new OnlineConf
            } else {
                println("test init...")
                conf = new TestConf
            }
        } else {
            println("local init...")
            conf = new LocalConf
        }
        Constants.initConf(conf)
        //设置并行度
        val parallelism: Int = params.getInt(PARALLELISM, 1)
        env.setParallelism(parallelism)
        //import conf._
        val properties = params.getProperties
        //设置基本的时间特性(该设置定义数据流源的行为方式)
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
        //    # Kafka集群地址
        properties.setProperty(conf.BOOTSTRAP_SERVERS, conf.bootstrap_servers_value)
        //    # 消费组ID
        properties.setProperty(conf.GROUP_ID, conf.group_id_label_platform)
        //    # 自动提交拉取到消费端的消息offset到kafka
        properties.setProperty(conf.ENABLE_AUTO_COMMIT, conf.enable_auto_commit_value)
        //    # 自动提交offset到zookeeper的时间间隔单位(毫秒)
        properties.setProperty(conf.AUTO_COMMIT_INTERVAL_MS, conf.auto_commit_interval_ms_value)
        //    # 每次消费最新的数据
        properties.setProperty(conf.AUTO_OFFSET_RESET, conf.auto_offset_reset_value)
        //设置仅一次语义
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
        // 确保检查点之间有至少2000 ms的间隔【checkpoint最小间隔】
        env.getCheckpointConfig.setMinPauseBetweenCheckpoints(2000)
        //检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】 默认10分钟
        env.getCheckpointConfig.setCheckpointTimeout(60000)
        // 同一时间只允许进行一个检查点
        env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
        // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
        env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
        //设置当Checkpoint执行过程中如果出现失败或者错误时,任务是否同时被关闭,默认为True
        //现在这个方法被弃用了,通过重试次数来判断
        //env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
        //设置重启失败率
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, // 一个时间段内的最大失败次数
            org.apache.flink.api.common.time.Time.of(5, TimeUnit.MINUTES), // 衡量失败次数的是时间段
            org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS))) // 间隔
        //设置Checkpoint的存储位置
        env.setStateBackend(new FsStateBackend(conf.checkPointPath, true))

        /*
              (1) 先建立MapStateDescriptor
              MapStateDescriptor定义了状态的名称、Key和Value的类型。
              这里,MapStateDescriptor中,key是String类型,value是Map<String, Tuple2<String,Int>>类型。
              这里的  labelId,scourceBean
             */
        val etlRule = new MapStateDescriptor[String, Tuple4[String, String, String, String]]("config", classOf[String], classOf[Tuple4[String, String, String, String]])

        //mysql源数据,形成广播 BroadcastStream
        val mysqlSource = env.addSource(new MysqlSourceFunc(baseConf))
        val mysqlBroadcast = mysqlSource.broadcast(etlRule)

        //kafka的流数据
        val dwdEventLogTopic: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](conf.product_kafka_dwd_topic, new SimpleStringSchema(), properties)
        val kafkaDataStream: DataStream[BuriedPointDetailBean] = env.addSource(dwdEventLogTopic)
          .map(item => BuriedPointDetailBean(item))
              .setParallelism(parallelism)



        //连接mysql数据
        val connMysql: DataStream[BuriedPointDetailBean] = kafkaDataStream.connect(mysqlBroadcast)
          .process(new BroadcastProcessFunc())


        val httpHosts = new java.util.ArrayList[HttpHost]
        httpHosts.add(new HttpHost(conf.ES_HOST, 9200, "http"))
        //创建sinkES
        val esSinkBuilder: ElasticsearchSink.Builder[BuriedPointDetailBean] = new ElasticsearchSink.Builder[BuriedPointDetailBean](
            httpHosts,
            new ElasticsearchSinkFunc(conf.ES_INDEX, conf.ES_TYPE)
        )
        esSinkBuilder.setBulkFlushMaxActions(1) //无界流使用此方法可以来一条进行一次写,否则会进入缓冲区。

        // provide a RestClientFactory for custom configuration on the internally created REST client
        esSinkBuilder.setRestClientFactory(
            new RestClientFactory {
                override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
                    restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                        override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
                            // elasticsearch username and password
                            val credentialsProvider = new BasicCredentialsProvider
                            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(conf.ES_USERNAME, conf.ES_PASSWORD))
                            httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
                            //httpClientBuilder.setSSLContext(trustfulSslContext)
                        }
                    })
                }
            })
        //sink  Es
        connMysql.addSink(esSinkBuilder.build).name("sinkES")
        //connMysql.print()
        //汇总埋点数据一分钟
        val totalStream = connMysql
          .keyBy(_.event_id)
          .timeWindow(Time.seconds(5))
          .process(new ProcessETLWindowFunc)
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[UserLastTimeBean](Time.minutes(1)) {
            override def extractTimestamp(element: UserLastTimeBean): Long = {
                val fm = new SimpleDateFormat("yyyy-MM-dd HH:MM:SS.sss", Locale.ENGLISH)
                val dt2 = fm.parse(element.event_time)
                dt2.getTime
            }
        }).setParallelism(parallelism)


        //关联  查询ES的规则
        val hbaseStream = totalStream.connect(mysqlBroadcast)
          .process(new BroadcastProcessRuleFunc(conf))

        //hbaseStream.print()
        hbaseStream.addSink(new HbaseSinkFunc(conf)).name("sinkHBase")

        env.execute("real_time_label")

    }

}

4、主类介绍

(1)通过传参来区分测试环境与实际生产。

ParameterTool.fromArgs(args).get(ENV)
 然后初始化类。在初始化中配置不同环境代码。

(2)构建实时环境

StreamExecutionEnvironment.getExecutionEnvironment

通过这个类设置配置,我在这里就不介绍每一个设置的效果

(3)配置kafka源数据

new FlinkKafkaConsumer011[String](conf.product_kafka_dwd_topic, new SimpleStringSchema(), properties)

然后通过上面env.addSource() 一起配置源数据source

(4)mysql源数据
主要是为了加载过滤配置,和上一个文档讲解的一样,通过kafka 和 mysql 关联。

(5)配置ES sink
如何配置ES的配置可参考 Flink官网
存入ES主要是为了查询数据核对数据,以及配合ES进行一些聚合查询。

addSink(esSinkBuilder.build).name("sinkES")

(6) ETL逻辑处理
通过mysql加载的规则进行处理。所以需要processFunction

(7)将处理结果存入hbase
存入hbase主要是给下游进行实时api查询。

addSink(new HbaseSinkFunc(conf)).name("sinkHBase")

5、传送门

下面在逐个分享各个类的代码。
持续更新
Flink大数据实时标签实时ETL --03加载规则类 (source Mysql)

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐