具体报错如下:(Flume版本为1.6)

2022-11-07 11:23:13,005 (lifecycleSupervisor-1-9) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@5efb7d32 counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at org.apache.flume.sink.kafka.KafkaSink.start(KafkaSink.java:306)
        at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
        at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:456)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:303)
        at org.apache.flume.sink.kafka.KafkaSink.start(KafkaSink.java:291)
        ... 10 more
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
        at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
        at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98)
        at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:119)
        at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:413)
        ... 12 more

        尝试过许多办法,最后在确定好zookeeper与kafka正常后,认为是Flume出现问题,又想到Flume是单机版,于是上传了新的Flume-1.9.0版本。

flume-file-kafka.conf配置如下

# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
# For each one of the sources, the type is defined
#agent.sources.r1.type = spooldir
#agent.sources.r1.command = /opt/test/logs/data
#agent.sources.r1.fileHeader = true
#agent.sources.r1.channels = c1
agent.sources = r1
agent.sinks = s1
agent.channels = c1
agent.sources.r1.type = spooldir
agent.sources.r1.spoolDir = /opt/data/test/
agent.sources.r1.fileHeader = true
# Each sink's type must be defined
#agent.sinks.s1.type = logger
agent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.s1.topic = test
#agent.sinks.s1.brokerList = 192.168.0.82:9092
agent.sinks.s1.kafka.bootstrap.servers=192.168.0.82:9092
agent.sinks.s1.requiredAcks = 1
agent.sinks.s1.batchSize = 2
# Each channel's type is defined.
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.sources.r1.channels = c1
agent.sinks.s1.channel = c1

启动Kafka producer consumer

[root@node-str-coredJVL0001 bin]# kafka-console-consumer.sh --topic test --bootstrap-server 192.168.0.82:9092
[root@node-str-coredJVL0001 bin]# kafka-console-producer.sh --broker-list 192.168.0.82:9092 --topic test

启动flume

/bin/flume-ng agent --conf conf -f ./conf/flume-file-kafka.conf -n agent -Dflume.root.logger=INFO,console

 

 看到上述消费者消费到消息后,确保没有问题即可。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐