Flink程序连接Kafka没输出也不报错

本人最近在使用Kafka作为数据源输出数据到Flink时遇到一个问题,那就是既没有结果输出,也没有报错

代码如下

package Source

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.api.scala._
import java.util.Properties

object SourceFromKafka {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //配置项
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    //这边是要读取Kafka的数据,所以算是一个消费者
    /**
     * 第一个泛型:读取过来的数据类型
     * 第一个参数:Kafka名
     * 第二个参数:序列化
     * 第三个参数:配置项
     */
    val stream =
      env.addSource(new FlinkKafkaConsumer011[String]
      ("first", new SimpleStringSchema(), properties))

    stream.print()
    env.execute()
  }
}

结果就和下图相似,不报错,也没有输出
在这里插入图片描述


这样的结果让人很困惑,于是我们准备看看更加详细的信息,我们在pom文件中添加下面的依赖

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

src/main/resources文件夹下创建一个名为log4j.properties的文件,在该文件里添加如下内容

log4j.rootLogger=info,console  

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=INFO
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n

这个时候在运行,我们就可以看到下面几行信息

[INFO ] 2022-03-23 10:34:33,892(26157) --> [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (5/6)] org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:607): Discovered coordinator slave2:9092 (id: 2147483645 rack: null) for group consumer-group.  
[INFO ] 2022-03-23 10:34:33,892(26157) --> [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (5/6)] org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:652): Marking the coordinator slave2:9092 (id: 2147483645 rack: null) dead for group consumer-group 

我们看到信息中出现了slave2这个正是我们安装Kafka集群的虚拟机的主机名,然后我们想到是否是windows上运行的kafka拿到的host是机器名而不是IP地址,因此我们赶紧去修改host文件,文件地址:C:\Windows\System32\drivers\etc\hosts

在这里插入图片描述
此时再运行程序,问题解决在这里插入图片描述

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐