import java.util.{Properties, Random}
import com.kaishu.tools.{DateUtils, KafkaSink, SparkManager}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
/**
* 创建人: wxt
* 创建日期: Created on 2022/7/05
* 数据开发功能描述:
* 2020-0101 - 2022-0629活跃的用户 排除2022-0601-0629活跃的用户历史到20220629的播放数据
* * 去除游客的 去重主账号数据同步到kafka
* TOPIC_PROD_MEDIA_HISTORY_DATA_SYN_KAFKA
* * 0 1278603
* * 1 1278092
* * 2 1278497
* * 3 1278032
* * 4 1276398
* * 5 1275332
* * 6 1278197
* * 7 1276094
* * 8 1276802
* * 9 1278275
*/
object Xunzhan2ToKafka {
def main(args: Array[String]): Unit = {
//服务端Kafka线上环境
val BOOTSTRAP_SERVERS: String = ""
val ssc = SparkManager.getInstance_hive(
new SparkConf().setAppName("XunzhanToKafka")
.set("hive.new.job.grouping.set.cardinality", "128")
.set("hive.exec.dynamic.partition", "true")
.set("hive.exec.max.dynamic.partitions", "10000")
.set("hive.exec.dynamic.partition.mode", "nonstrict")
.set("isOnline", "true"))
//TODO 默认写到服务端生产环境
var outTopics = "TOPIC_PROD_MEDIA_HISTORY_DATA_SYN_KAFKA"
var bootstrapServers = BOOTSTRAP_SERVERS
var querySql = ""
val i = args(0)
querySql =
s"""
|select master_user_id as userId --用户id(主账号id)
|from tmp.tmp_xunzhan_history_play_2year_active_sum_a
|where user_part = '$i'
|group by master_user_id
|
|""".stripMargin
}
// 广播KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", bootstrapServers)
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p.setProperty("ack", "-1")
//指定分区器为自定义分区器,值是自定义分区器的全路径,如果不指定就是默认的分区器
// p.setProperty("partitioner.class","com.kaishu.tools.BananaPartitioner");
p
}
ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}
println("开始读取hive数据")
val dataFrame = ssc.sql(querySql)
println("分区"+i+",要写入的数据量"+dataFrame.count())
//dataFrame.show()
println("foreachPartition")
dataFrame.repartition(5).toJSON.foreachPartition(
jsonItr => {
val random = new Random()
jsonItr.foreach(
jsonStr => {
// val jsonObject: JSONObject = JSON.parseObject(jsonStr)
var userId = random.nextInt().toString
println("send数据")
kafkaProducer.value.send(outTopics, userId, jsonStr)
}
)
})
}
}
更多推荐