Flink大数据实时标签实时ETL -- 主类代码介绍2
Flink实时标签,实时ETL,生产实时代码
1、项目介绍
由于上一个文档已经介绍了这个项目名。这里我就单独介绍一下这个文档主类。该文档主要是数据的主入口。同时也是可以熟悉整个代码的处理流程。
1、用户的操作日志数据(埋点数据),发送至kafka 。
2、运营人员在大数据平台配置好规则(如新用户,浏览了某一个页面…),存入mysql。
3、Flink代码定时(新增规则也能及时加载)加载mysql规则,根据规则处理日志。
4、将满足规则的数据存入ES(clickhouse)中。
5、Flink同时在根据mysql定义的规则处理数据(如新用户,浏览…),同时需要结合ES(clickhouse)查询。将满足要求的用户打上标签(特定规则有特定的标签)存入hbase中。
6、搭建API接口,开放给其他平台使用。
7、整个流程就是加载规则和处理规则,存入满足规则的用户,打上标签。
2、主类流程图
先不扯其他的,代码先上。后面在详细介绍。
3、主类代码
代码
package com.task
import java.text.SimpleDateFormat
import java.util.Locale
import java.util.concurrent.TimeUnit
import com.bean.{BuriedPointDetailBean, UserLastTimeBean}
import com.conf.Constants.{ENV, PARALLELISM, baseConf}
import com.conf.{BaseConf, Constants, LocalConf, OnlineConf, TestConf}
import com.func.{BroadcastProcessFunc, BroadcastProcessRuleFunc, ElasticsearchSinkFunc, HbaseSinkFunc, MysqlSourceFunc, ProcessETLWindowFunc}
import com.utils.StringUtils
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, WindowedStream, createTypeInformation}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.elasticsearch6.{ElasticsearchSink, RestClientFactory}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.util.Collector
import org.apache.http.HttpHost
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.client.RestClientBuilder
import scala.collection.immutable._
/**
* say
*/
object RealTimeLabel {
//设置用户
System.setProperty("HADOOP_USER_NAME", "root")
//主程序入口
def main(args: Array[String]): Unit = {
//获取参数
val params: ParameterTool = ParameterTool.fromArgs(args)
//创建流环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置checkPoint检查的时间间隔为60s
env.enableCheckpointing(60000L)
//获取执行环境
val run_env: String = params.get(ENV)
var conf: BaseConf = null
if (StringUtils.isNotEmpty(run_env)) {
if ("online".equals(run_env)) {
println("online init...")
conf = new OnlineConf
} else {
println("test init...")
conf = new TestConf
}
} else {
println("local init...")
conf = new LocalConf
}
Constants.initConf(conf)
//设置并行度
val parallelism: Int = params.getInt(PARALLELISM, 1)
env.setParallelism(parallelism)
//import conf._
val properties = params.getProperties
//设置基本的时间特性(该设置定义数据流源的行为方式)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// # Kafka集群地址
properties.setProperty(conf.BOOTSTRAP_SERVERS, conf.bootstrap_servers_value)
// # 消费组ID
properties.setProperty(conf.GROUP_ID, conf.group_id_label_platform)
// # 自动提交拉取到消费端的消息offset到kafka
properties.setProperty(conf.ENABLE_AUTO_COMMIT, conf.enable_auto_commit_value)
// # 自动提交offset到zookeeper的时间间隔单位(毫秒)
properties.setProperty(conf.AUTO_COMMIT_INTERVAL_MS, conf.auto_commit_interval_ms_value)
// # 每次消费最新的数据
properties.setProperty(conf.AUTO_OFFSET_RESET, conf.auto_offset_reset_value)
//设置仅一次语义
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确保检查点之间有至少2000 ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(2000)
//检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】 默认10分钟
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 同一时间只允许进行一个检查点
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置当Checkpoint执行过程中如果出现失败或者错误时,任务是否同时被关闭,默认为True
//现在这个方法被弃用了,通过重试次数来判断
//env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
//设置重启失败率
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, // 一个时间段内的最大失败次数
org.apache.flink.api.common.time.Time.of(5, TimeUnit.MINUTES), // 衡量失败次数的是时间段
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS))) // 间隔
//设置Checkpoint的存储位置
env.setStateBackend(new FsStateBackend(conf.checkPointPath, true))
/*
(1) 先建立MapStateDescriptor
MapStateDescriptor定义了状态的名称、Key和Value的类型。
这里,MapStateDescriptor中,key是String类型,value是Map<String, Tuple2<String,Int>>类型。
这里的 labelId,scourceBean
*/
val etlRule = new MapStateDescriptor[String, Tuple4[String, String, String, String]]("config", classOf[String], classOf[Tuple4[String, String, String, String]])
//mysql源数据,形成广播 BroadcastStream
val mysqlSource = env.addSource(new MysqlSourceFunc(baseConf))
val mysqlBroadcast = mysqlSource.broadcast(etlRule)
//kafka的流数据
val dwdEventLogTopic: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](conf.product_kafka_dwd_topic, new SimpleStringSchema(), properties)
val kafkaDataStream: DataStream[BuriedPointDetailBean] = env.addSource(dwdEventLogTopic)
.map(item => BuriedPointDetailBean(item))
.setParallelism(parallelism)
//连接mysql数据
val connMysql: DataStream[BuriedPointDetailBean] = kafkaDataStream.connect(mysqlBroadcast)
.process(new BroadcastProcessFunc())
val httpHosts = new java.util.ArrayList[HttpHost]
httpHosts.add(new HttpHost(conf.ES_HOST, 9200, "http"))
//创建sinkES
val esSinkBuilder: ElasticsearchSink.Builder[BuriedPointDetailBean] = new ElasticsearchSink.Builder[BuriedPointDetailBean](
httpHosts,
new ElasticsearchSinkFunc(conf.ES_INDEX, conf.ES_TYPE)
)
esSinkBuilder.setBulkFlushMaxActions(1) //无界流使用此方法可以来一条进行一次写,否则会进入缓冲区。
// provide a RestClientFactory for custom configuration on the internally created REST client
esSinkBuilder.setRestClientFactory(
new RestClientFactory {
override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
// elasticsearch username and password
val credentialsProvider = new BasicCredentialsProvider
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(conf.ES_USERNAME, conf.ES_PASSWORD))
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
//httpClientBuilder.setSSLContext(trustfulSslContext)
}
})
}
})
//sink Es
connMysql.addSink(esSinkBuilder.build).name("sinkES")
//connMysql.print()
//汇总埋点数据一分钟
val totalStream = connMysql
.keyBy(_.event_id)
.timeWindow(Time.seconds(5))
.process(new ProcessETLWindowFunc)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[UserLastTimeBean](Time.minutes(1)) {
override def extractTimestamp(element: UserLastTimeBean): Long = {
val fm = new SimpleDateFormat("yyyy-MM-dd HH:MM:SS.sss", Locale.ENGLISH)
val dt2 = fm.parse(element.event_time)
dt2.getTime
}
}).setParallelism(parallelism)
//关联 查询ES的规则
val hbaseStream = totalStream.connect(mysqlBroadcast)
.process(new BroadcastProcessRuleFunc(conf))
//hbaseStream.print()
hbaseStream.addSink(new HbaseSinkFunc(conf)).name("sinkHBase")
env.execute("real_time_label")
}
}
4、主类介绍
(1)通过传参来区分测试环境与实际生产。
ParameterTool.fromArgs(args).get(ENV)
然后初始化类。在初始化中配置不同环境代码。
(2)构建实时环境
StreamExecutionEnvironment.getExecutionEnvironment
通过这个类设置配置,我在这里就不介绍每一个设置的效果
(3)配置kafka源数据
new FlinkKafkaConsumer011[String](conf.product_kafka_dwd_topic, new SimpleStringSchema(), properties)
然后通过上面env.addSource() 一起配置源数据source
(4)mysql源数据
主要是为了加载过滤配置,和上一个文档讲解的一样,通过kafka 和 mysql 关联。
(5)配置ES sink
如何配置ES的配置可参考 Flink官网。
存入ES主要是为了查询数据核对数据,以及配合ES进行一些聚合查询。
addSink(esSinkBuilder.build).name("sinkES")
(6) ETL逻辑处理
通过mysql加载的规则进行处理。所以需要processFunction
(7)将处理结果存入hbase
存入hbase主要是给下游进行实时api查询。
addSink(new HbaseSinkFunc(conf)).name("sinkHBase")
5、传送门
下面在逐个分享各个类的代码。
持续更新
Flink大数据实时标签实时ETL --03加载规则类 (source Mysql)
更多推荐
所有评论(0)