Flink 消费kafka 自定义KafkaDeserializationSchema
kafka中的数据通常是键值对,所以我们自定义反序列化从kafka中消费键值对的消息实现需求: 读取多个topic数据进行不同处理输出,这里就需要自定义反序列化从kafka消费数据,然后分别处理。效果数据格式转换后效果:代码自定义反序列化类class LogDeserializationSchemaextendsKafkaDeserializationSchema[TopicBean] {// 是
·
kafka中的数据通常是键值对,所以我们自定义反序列化从kafka中消费键值对的消息
实现需求: 读取多个topic数据进行不同处理输出,这里就需要自定义反序列化从kafka消费数据,然后分别处理。
效果
数据格式
转换后效果:
代码
自定义反序列化类
class LogDeserializationSchema extends KafkaDeserializationSchema[TopicBean] {
// 是否最后一条数据,流是无界的
override def isEndOfStream(t: TopicBean): Boolean = false
// 反序列化
// value 是bytes
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): TopicBean = {
try {
TopicBean(consumerRecord.topic(),new String(consumerRecord.value(), StandardCharsets.UTF_8))
}catch {
case e:Exception =>
e.printStackTrace()
null
}
}
//数据类型 用于获取反序列化对象的类型
override def getProducedType: TypeInformation[TopicBean] = TypeInformation.of(new TypeHint[TopicBean] {})
}
TopicBean
case class TopicBean(topic:String,data:String)
KafkaUtils
object KafkaUtils {
/**
* multiple topics consumer
*
* @param prop properties
* @param topic topics name
* @return
*/
def getTopicsConsumer(prop: Properties, topic: String): FlinkKafkaConsumer[TopicBean] = {
val topics: util.List[String] = topic.split(",").toList.asJava
new FlinkKafkaConsumer[TopicBean](topics, new LogDeserializationSchema(), prop)
}
}
主函数
object SdkDataOdsToDwd {
def main(args: Array[String]): Unit = {
// val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() //本地环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val topics = "login,pay" // 读取多个topic 数据
val kafkaProps = new Properties()
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, LocalConfig.KAFKA_BOOTSTRAP_SERVERS) //添加配置
val kafkaConsumer = KafkaUtils.getTopicsConsumer(kafkaProps, topics).setStartFromGroupOffsets()
// source
val dataKafkaStream = env.addSource(kafkaConsumer)
// sink 输出
dataKafkaStream.print()
env.execute()
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)