import java.util.Properties

import com.alibaba.fastjson.JSON
import com.itheima.realprocess.bean.{ClickLog, Message}
import com.itheima.realprocess.task._
import com.itheima.realprocess.util.GlobalConfigUtil
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09

  // 1.创建 App 单例对象,初始化Flink运行环境
object App {
  def main(args: Array[String]): Unit = {
    //2.创建main方法,获取StreamExecutionEnvironment运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //3.设置流处理的时间为 EventTime ,使用数据发生的时间来进行数据处理
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //4.将Flink默认的开发环境并行度设置为1
    env.setParallelism(3)

    //保证程序长时间运行的安全性进行checkpoint操作
    //5秒启动一次checkpoint
    env.enableCheckpointing(5000)
    // 设置checkpoint只checkpoint一次
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    //设置两次checkpoint的最小时间间隔
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
    // checkpoint超时的时长
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    // 允许的最大checkpoint并行度
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    //当程序关闭的时,触发额外的checkpoint
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    // 设置checkpoint的地址
    env.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink-checkpoint/"))

    //整合Kafka
    val properties = new Properties()
    properties.setProperty("bootstrap.servers" , GlobalConfigUtil.bootstrapServers)
    properties.setProperty("zookeeper.connect" , GlobalConfigUtil.zookeeperConnect)
    properties.setProperty("group.id" , GlobalConfigUtil.groupId)
    properties.setProperty("enable.auto.commit" , GlobalConfigUtil.enableAutoCommit)
    properties.setProperty("auto.commit.interval.ms" , GlobalConfigUtil.autoCommitIntervalMs)
    properties.setProperty("auto.offset.reset" , GlobalConfigUtil.autoOffsetReset)
    //配置序列化和反序列化
    properties.setProperty("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer")
    properties.setProperty("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer")
    val consumer: FlinkKafkaConsumer09[String] = new FlinkKafkaConsumer09[String]( GlobalConfigUtil.inputTopic,
      new SimpleStringSchema(),
      properties
    )

    //拿到kafka的数据
    val kafkaDataStream: DataStream[String] = env.addSource(consumer)

    //使用map算子,将kafka中消费到的数据
    val messageDataStream = kafkaDataStream.map(
      mapJson=> {
        //使用FastJSON转换为JSON对象
        val jSONObject = JSON.parseObject(mapJson)
        //将JSON的数据解析成一个元组
        val count = jSONObject.getLong("count")
        val timestamp = jSONObject.getLong("timeStamp")
        val message = jSONObject.getString("message")
        //将数据封装到一个样例类中
        //1.创建一个 Message 样例类,将ClickLog、时间戳、数量封装
        //2.将Kafka中的数据整个封装到Message类中
        Message(count,timestamp,ClickLog(message))
      }
    )

    //添加flink的水印处理 , 允许得最大延迟时间是2S
    val watermarkData: DataStream[Message] = messageDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message] {
      var currentTimestamp: Long = 0L
      val maxDelayTime = 2000L
      var watermark: Watermark = null
      //获取当前的水印
      override def getCurrentWatermark = {
        watermark = new Watermark(currentTimestamp - maxDelayTime)
        watermark
      }
      //时间戳抽取操作
      override def extractTimestamp(t: Message, l: Long) = {
        val timeStamp = t.timestamp
        currentTimestamp = Math.max(timeStamp, currentTimestamp)
        currentTimestamp
      }
    })
    //在 App 中调用预处理任务的 process 方法,并打印测试
    val clickLogWideDataSream = PreprocessTask.process(watermarkData)
    //热点分析
    //ChannelRealHotTask.process(clickLogWideDataSream)
    //pvuv
    ChannelPvUvTask.process(clickLogWideDataSream)
    //时间维度的新鲜度分析
    //ChannelFreshnessTask.process(clickLogWideDataSream)
    //地域分析业务开发
    //ChannelAreaTask.process(clickLogWideDataSream)
    //业务开发
    //ChannelNetWorkTask.proocess(clickLogWideDataSream)
    env.execute("App")
  }
}

========

object ChannelPvUvTask {
  //1.创建频道PV、UV样例类
  case class ChannelPvUv(channelID:String,date:String,pv:Long,uv:Long)
  def process(clickLogWideDataSream:DataStream[ClickLogWide])={
    //2.将预处理后的数据, 转换 为要分析出来的数据(频道、PV、UV)样例类
    //使用flatMap 来实现 小时,天,月三个维度的数据
    val clickDataStream = clickLogWideDataSream.flatMap {
      clickLog =>
        List(
          ChannelPvUv(clickLog.channelID, clickLog.yearMonnthDayHour, clickLog.count, clickLog.isHourNew), //小时维度
          ChannelPvUv(clickLog.channelID,clickLog.yearMonthDay,clickLog.count,clickLog.isDayNew), //天维度
          ChannelPvUv(clickLog.channelID,clickLog.yearMonth,clickLog.count,clickLog.isMonthNew)//月维度
        )
    }
    //3.按照频道 和时间 进行分组(分流)
    val groupedDataStream = clickDataStream.keyBy(
      pvuv=> pvuv.channelID + pvuv.date
    )
    //4.划分时间窗口(3秒一个窗口)
    val windowDataStream = groupedDataStream.timeWindow(Time.seconds(3))
    //5.进行合并计数统计
    val resultDataStream = windowDataStream.reduce(
      (t1,t2) =>
        ChannelPvUv(t2.channelID,t2.date,t1.pv+t2.pv,t2.uv+t1.uv)
    )
    //6.打印测试
    resultDataStream.print()
    //7.将计算后的数据下沉到Hbase
    resultDataStream.addSink{
      pvuv =>
        //构建hbase中所需要的参数
        val tableName = "channel_pvuv"
        val rowkey = pvuv.channelID+":"+pvuv.date
        val cfName = "info"
        val channelColName = "channel"
        val dataColName = "data"
        val pvColName = "pv"
        val uvColName = "uv"
        val pvInHBase = HBaseUtil.getData(tableName,rowkey,cfName,pvColName)
        val uvInHBase = HBaseUtil.getData(tableName,rowkey,cfName,uvColName)

        var totalPv = 0L
        var totalUv = 0L

        //处理pv
        if (StringUtils.isNotBlank(pvInHBase)){
          totalPv = pvuv.pv + pvInHBase.toLong
        }else{
          totalPv = pvuv.pv
        }
        //处理uv
        if(StringUtils.isNotBlank(uvInHBase)){
          totalUv = pvuv.uv + uvInHBase.toLong
        }else{
          totalUv = pvuv.uv
        }

        HBaseUtil.putMapData(tableName,rowkey,cfName,Map(
          channelColName -> pvuv.channelID,
          dataColName -> pvuv.date,
          pvColName -> totalPv,
          uvColName -> totalUv
        ))
    }
  }
}

====

case class ClickLog(browserType:String,//频道ID(channelID)
                    categoryID:String,//产品类别ID(categoryID)
                    channelID:String,//产品ID(produceID)
                    city:String,//国家(country)
                    country:String,//省份(province)
                    entryTime:String,//城市(city)
                    leaveTime:String,//网络方式(network)
                    network:String,//来源方式(source)
                    produceID:String,//浏览器类型(browserType)
                    province:String,//进入网站时间(entryTime)
                    source:String,//离开网站时间(leaveTime)
                    userID:String)//用户的ID(userID)

object ClickLog{
    //2.在 ClickLog 伴生对象中实现 apply 方法
  def apply(json:String): ClickLog ={
    //3.使用FastJSON的 JSON.parseObject 方法将JSON字符串构建一个 ClickLog 实例对象
    val jsonObject = JSON.parseObject(json)
    //4.使用map算子将数据封装到 ClickLog 样例类
    ClickLog(
    jsonObject.getString("browserType"),
    jsonObject.getString("categoryID"),
    jsonObject.getString("channelID"),
    jsonObject.getString("city"),
    jsonObject.getString("country"),
    jsonObject.getString("entryTime"),
    jsonObject.getString("leaveTime"),
    jsonObject.getString("network"),
    jsonObject.getString("produceID"),
    jsonObject.getString("province"),
    jsonObject.getString("source"),
    jsonObject.getString("userID")
    )
  }
}

 

case class Message (count:Long,//点击次数
                    timestamp:Long,//点击事件戳
                    clickLog: ClickLog//点击流日志样例类
                   )
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐