import java.util.{Properties, Random}
import com.kaishu.tools.{DateUtils, KafkaSink, SparkManager}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast

/**
 * 创建人:  wxt
 * 创建日期: Created on  2022/7/05
 * 数据开发功能描述:
 * 2020-0101 - 2022-0629活跃的用户 排除2022-0601-0629活跃的用户历史到20220629的播放数据
 * *      去除游客的 去重主账号数据同步到kafka
 * TOPIC_PROD_MEDIA_HISTORY_DATA_SYN_KAFKA
 * * 0       1278603
 * * 1       1278092
 * * 2       1278497
 * * 3       1278032
 * * 4       1276398
 * * 5       1275332
 * * 6       1278197
 * * 7       1276094
 * * 8       1276802
 * * 9       1278275
 */
object Xunzhan2ToKafka {

  def main(args: Array[String]): Unit = {

    //服务端Kafka线上环境
    val BOOTSTRAP_SERVERS: String   = ""

    val ssc = SparkManager.getInstance_hive(
      new SparkConf().setAppName("XunzhanToKafka")
        .set("hive.new.job.grouping.set.cardinality", "128")
        .set("hive.exec.dynamic.partition", "true")
        .set("hive.exec.max.dynamic.partitions", "10000")
        .set("hive.exec.dynamic.partition.mode", "nonstrict")
        .set("isOnline", "true"))

    //TODO 默认写到服务端生产环境
    var outTopics = "TOPIC_PROD_MEDIA_HISTORY_DATA_SYN_KAFKA"
    var bootstrapServers = BOOTSTRAP_SERVERS

    var querySql = ""
    val i = args(0)
        querySql =
          s"""
             |select master_user_id as userId --用户id(主账号id)
             |from tmp.tmp_xunzhan_history_play_2year_active_sum_a
             |where user_part = '$i'
             |group by master_user_id
             |
             |""".stripMargin

    }

    // 广播KafkaSink
    val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
      val kafkaProducerConfig = {
        val p = new Properties()
        p.setProperty("bootstrap.servers", bootstrapServers)
        p.setProperty("key.serializer", classOf[StringSerializer].getName)
        p.setProperty("value.serializer", classOf[StringSerializer].getName)
        p.setProperty("ack", "-1")
        //指定分区器为自定义分区器,值是自定义分区器的全路径,如果不指定就是默认的分区器
        //        p.setProperty("partitioner.class","com.kaishu.tools.BananaPartitioner");
        p
      }
      ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))

    }
    println("开始读取hive数据")
    val dataFrame = ssc.sql(querySql)
    println("分区"+i+",要写入的数据量"+dataFrame.count())
    //dataFrame.show()
    println("foreachPartition")
    dataFrame.repartition(5).toJSON.foreachPartition(

      jsonItr => {
        val random = new Random()

        jsonItr.foreach(
          jsonStr => {
            //            val jsonObject: JSONObject = JSON.parseObject(jsonStr)

            var userId = random.nextInt().toString
            println("send数据")
            kafkaProducer.value.send(outTopics, userId, jsonStr)
          }

        )
      })

  }

}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐