本案例使用flume监控源头文件夹下的文件,当有新的文件时,自动采集文件数据到kafka。

  • 此目录下,存放源文件 users.csv
[root@cy event_source]# pwd
/root/kb18/event_source

  •   如下命令查看文件前两行内容,可发现存在头文件
[root@cy event_source]#  head -n2 ./users.csv
user_id,locale,birthyear,gender,joinedAt,location,timezone
3197468391,id_ID,1993,male,2012-10-02T06:40:55.524Z,Medan  Indonesia,480
  • 如下命令查看文件行数
[root@cy event_source]# wc -l users.csv
38210 users.csv
  • 配置flume文件
#为source、channel、sink起名
users.sources = usersSource
users.channels = usersChannel
users.sinks = usersSink

#指定我们的source数据收集策略
users.sources.usersSource.type = spooldir
users.sources.usersSource.spoolDir = /opt/kb18tmp/sqooplog/users
users.sources.usersSource.deserializer = LINE
users.sources.usersSource.deserializer.maxLineLength = 320000
users.sources.usersSource.includePattern = users_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
users.sources.usersSource.interceptors = head_filter
users.sources.usersSource.interceptors.head_filter.type = regex_filter
users.sources.usersSource.interceptors.head_filter.regex = ^user_id*
users.sources.usersSource.interceptors.head_filter.excludeEvents = true

users.channels.usersChannel.type = file
users.channels.usersChannel.checkpointDir = /opt/kb18tmp/checkpoint/users/ck
users.channels.usersChannel.dataDirs = /opt/kb18tmp/checkpoint/users/data

#指定我们的sink为 kafka  sink
users.sinks.usersSink.type = org.apache.flume.sink.kafka.KafkaSink
users.sinks.usersSink.batchSize = 640    #批量提交数量
users.sinks.usersSink.brokerList = 192.168.89.140:9092
users.sinks.usersSink.topic = users        #kafka主题名字

#指定我们的source收集到的数据发送到哪个管道
users.sources.usersSource.channels = usersChannel
#指定我们的sink从哪个channel当中读取数据
users.sinks.usersSink.channel = usersChannel
  1. fflume 监控 /opt/kb18tmp/sqooplog/users 目录
  2. maxLineLength设置的是 一行读取的最大字符长度(wc -L 可查看文件最长行字符数)
  3. includePattern 模式匹配,正则。匹配监控目录下的文件名格式
  4. interceptors 设置拦截器(正则过滤首行)
  • 在/opt/kb18tmp/目录下,创建spoolDir、checkpointDir(检查点)、dataDirs目录文件夹。 
mkdir -p ./sqooplog/users
mkdir -p ./checkpoint/users/ck
mkdir -p ./checkpoint/users/data
  • 启动kafka服务,建议手动创建topic,开启消费
kafka-console-consumer.sh --bootstrap-server 192.168.89.140:9092 --topic users
  • /opt/software/apache-flume-1.9.0-bin        目录下启动监控
[root@cy apache-flume-1.9.0-bin]# ./bin/flume-ng agent --name users --conf ./conf/ --conf-file ./conf/kb18/users.conf
  • 拷贝 源文件至监控目录下,并修改文件名为能够匹配正则的文件名,否则监控过滤。
cp ./users.csv /opt/kb18tmp/sqooplog/users/users_2022_09_08.csv
  • 拷贝成功,kafka即能消费出数据

 总结

flume配置采集文件是核心,配置好source、channel、sink三个组件。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐