flink 处理kafka中日志 json数据
import java.util.Propertiesimport com.alibaba.fastjson.JSONimport com.itheima.realprocess.bean.{ClickLog, Message}import com.itheima.realprocess.task._import com.itheima.realprocess.util.GlobalConfigU
·
import java.util.Properties import com.alibaba.fastjson.JSON import com.itheima.realprocess.bean.{ClickLog, Message} import com.itheima.realprocess.task._ import com.itheima.realprocess.util.GlobalConfigUtil import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 // 1.创建 App 单例对象,初始化Flink运行环境 object App { def main(args: Array[String]): Unit = { //2.创建main方法,获取StreamExecutionEnvironment运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //3.设置流处理的时间为 EventTime ,使用数据发生的时间来进行数据处理 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //4.将Flink默认的开发环境并行度设置为1 env.setParallelism(3) //保证程序长时间运行的安全性进行checkpoint操作 //5秒启动一次checkpoint env.enableCheckpointing(5000) // 设置checkpoint只checkpoint一次 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //设置两次checkpoint的最小时间间隔 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) // checkpoint超时的时长 env.getCheckpointConfig.setCheckpointTimeout(60000) // 允许的最大checkpoint并行度 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //当程序关闭的时,触发额外的checkpoint env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // 设置checkpoint的地址 env.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink-checkpoint/")) //整合Kafka val properties = new Properties() properties.setProperty("bootstrap.servers" , GlobalConfigUtil.bootstrapServers) properties.setProperty("zookeeper.connect" , GlobalConfigUtil.zookeeperConnect) properties.setProperty("group.id" , GlobalConfigUtil.groupId) properties.setProperty("enable.auto.commit" , GlobalConfigUtil.enableAutoCommit) properties.setProperty("auto.commit.interval.ms" , GlobalConfigUtil.autoCommitIntervalMs) properties.setProperty("auto.offset.reset" , GlobalConfigUtil.autoOffsetReset) //配置序列化和反序列化 properties.setProperty("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer") properties.setProperty("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer") val consumer: FlinkKafkaConsumer09[String] = new FlinkKafkaConsumer09[String]( GlobalConfigUtil.inputTopic, new SimpleStringSchema(), properties ) //拿到kafka的数据 val kafkaDataStream: DataStream[String] = env.addSource(consumer) //使用map算子,将kafka中消费到的数据 val messageDataStream = kafkaDataStream.map( mapJson=> { //使用FastJSON转换为JSON对象 val jSONObject = JSON.parseObject(mapJson) //将JSON的数据解析成一个元组 val count = jSONObject.getLong("count") val timestamp = jSONObject.getLong("timeStamp") val message = jSONObject.getString("message") //将数据封装到一个样例类中 //1.创建一个 Message 样例类,将ClickLog、时间戳、数量封装 //2.将Kafka中的数据整个封装到Message类中 Message(count,timestamp,ClickLog(message)) } ) //添加flink的水印处理 , 允许得最大延迟时间是2S val watermarkData: DataStream[Message] = messageDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message] { var currentTimestamp: Long = 0L val maxDelayTime = 2000L var watermark: Watermark = null //获取当前的水印 override def getCurrentWatermark = { watermark = new Watermark(currentTimestamp - maxDelayTime) watermark } //时间戳抽取操作 override def extractTimestamp(t: Message, l: Long) = { val timeStamp = t.timestamp currentTimestamp = Math.max(timeStamp, currentTimestamp) currentTimestamp } }) //在 App 中调用预处理任务的 process 方法,并打印测试 val clickLogWideDataSream = PreprocessTask.process(watermarkData) //热点分析 //ChannelRealHotTask.process(clickLogWideDataSream) //pvuv ChannelPvUvTask.process(clickLogWideDataSream) //时间维度的新鲜度分析 //ChannelFreshnessTask.process(clickLogWideDataSream) //地域分析业务开发 //ChannelAreaTask.process(clickLogWideDataSream) //业务开发 //ChannelNetWorkTask.proocess(clickLogWideDataSream) env.execute("App") } }
========
object ChannelPvUvTask { //1.创建频道PV、UV样例类 case class ChannelPvUv(channelID:String,date:String,pv:Long,uv:Long) def process(clickLogWideDataSream:DataStream[ClickLogWide])={ //2.将预处理后的数据, 转换 为要分析出来的数据(频道、PV、UV)样例类 //使用flatMap 来实现 小时,天,月三个维度的数据 val clickDataStream = clickLogWideDataSream.flatMap { clickLog => List( ChannelPvUv(clickLog.channelID, clickLog.yearMonnthDayHour, clickLog.count, clickLog.isHourNew), //小时维度 ChannelPvUv(clickLog.channelID,clickLog.yearMonthDay,clickLog.count,clickLog.isDayNew), //天维度 ChannelPvUv(clickLog.channelID,clickLog.yearMonth,clickLog.count,clickLog.isMonthNew)//月维度 ) } //3.按照频道 和时间 进行分组(分流) val groupedDataStream = clickDataStream.keyBy( pvuv=> pvuv.channelID + pvuv.date ) //4.划分时间窗口(3秒一个窗口) val windowDataStream = groupedDataStream.timeWindow(Time.seconds(3)) //5.进行合并计数统计 val resultDataStream = windowDataStream.reduce( (t1,t2) => ChannelPvUv(t2.channelID,t2.date,t1.pv+t2.pv,t2.uv+t1.uv) ) //6.打印测试 resultDataStream.print() //7.将计算后的数据下沉到Hbase resultDataStream.addSink{ pvuv => //构建hbase中所需要的参数 val tableName = "channel_pvuv" val rowkey = pvuv.channelID+":"+pvuv.date val cfName = "info" val channelColName = "channel" val dataColName = "data" val pvColName = "pv" val uvColName = "uv" val pvInHBase = HBaseUtil.getData(tableName,rowkey,cfName,pvColName) val uvInHBase = HBaseUtil.getData(tableName,rowkey,cfName,uvColName) var totalPv = 0L var totalUv = 0L //处理pv if (StringUtils.isNotBlank(pvInHBase)){ totalPv = pvuv.pv + pvInHBase.toLong }else{ totalPv = pvuv.pv } //处理uv if(StringUtils.isNotBlank(uvInHBase)){ totalUv = pvuv.uv + uvInHBase.toLong }else{ totalUv = pvuv.uv } HBaseUtil.putMapData(tableName,rowkey,cfName,Map( channelColName -> pvuv.channelID, dataColName -> pvuv.date, pvColName -> totalPv, uvColName -> totalUv )) } } }
====
case class ClickLog(browserType:String,//频道ID(channelID) categoryID:String,//产品类别ID(categoryID) channelID:String,//产品ID(produceID) city:String,//国家(country) country:String,//省份(province) entryTime:String,//城市(city) leaveTime:String,//网络方式(network) network:String,//来源方式(source) produceID:String,//浏览器类型(browserType) province:String,//进入网站时间(entryTime) source:String,//离开网站时间(leaveTime) userID:String)//用户的ID(userID) object ClickLog{ //2.在 ClickLog 伴生对象中实现 apply 方法 def apply(json:String): ClickLog ={ //3.使用FastJSON的 JSON.parseObject 方法将JSON字符串构建一个 ClickLog 实例对象 val jsonObject = JSON.parseObject(json) //4.使用map算子将数据封装到 ClickLog 样例类 ClickLog( jsonObject.getString("browserType"), jsonObject.getString("categoryID"), jsonObject.getString("channelID"), jsonObject.getString("city"), jsonObject.getString("country"), jsonObject.getString("entryTime"), jsonObject.getString("leaveTime"), jsonObject.getString("network"), jsonObject.getString("produceID"), jsonObject.getString("province"), jsonObject.getString("source"), jsonObject.getString("userID") ) } }
case class Message (count:Long,//点击次数 timestamp:Long,//点击事件戳 clickLog: ClickLog//点击流日志样例类 )
更多推荐
已为社区贡献1条内容
所有评论(0)