1.在flume安装目录下的文件夹里面编写agent的配置文件,起名为flumekafka。
2.在flumekafka文件夹下面建立flume1kafka.conf配置文件:

# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# 描述和配置source组件:r1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/test.log
 
# 描述和配置sink组件:k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = jsonTopic
a1.sinks.k1.kafka.bootstrap.servers = hdp001:9092,hdp002:9092,hdp003:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy
 
# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.启动zookeeper集群和kafka集群。
4.启动xxl_job,产生日志文件。
5.启动刚才配置好的agent: bin/flume-ng agent -c conf -f 配置文件夹名/配置文件名 -n a1 -Dflume.root.logger=INFO,console
6.启动kafka消费者,消费数据:kafka-console-consumer.sh --bootstrap-server kafka1:9092 --from-beginning --topic jsonTopic

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐