代码如下

 val df: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "mypc01:9092,mypc02:9092,mypc03:9092")
      .option("subscribe", "cat")
      // 从头消费
      .option("staringOffsets", "earliest")
      // 设置消费者组
      .option("kafka.consumer.commit.groupid", "test1")
      .load()

执行时报错,说是consumer.commit.groupid不是一个已知的配置,就是没有这个配置项,删掉即可,有哪些配置项要参考官方文档

org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
‘consumer.commit.groupid’ was supplied but isn’t a known config.

参考

Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.0.1 Documentation
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐