flume+kafka整合采集数据简单应用案例(详解)
本案例使用flume监控源头文件夹下的文件,当有新的文件时,自动采集文件数据到kafka。.flume配置采集文件是核心,配置好source、channel、sink三个组件。
·
本案例使用flume监控源头文件夹下的文件,当有新的文件时,自动采集文件数据到kafka。
- 此目录下,存放源文件 users.csv
[root@cy event_source]# pwd
/root/kb18/event_source
- 如下命令查看文件前两行内容,可发现存在头文件
[root@cy event_source]# head -n2 ./users.csv
user_id,locale,birthyear,gender,joinedAt,location,timezone
3197468391,id_ID,1993,male,2012-10-02T06:40:55.524Z,Medan Indonesia,480
- 如下命令查看文件行数
[root@cy event_source]# wc -l users.csv
38210 users.csv
- 配置flume文件
#为source、channel、sink起名
users.sources = usersSource
users.channels = usersChannel
users.sinks = usersSink
#指定我们的source数据收集策略
users.sources.usersSource.type = spooldir
users.sources.usersSource.spoolDir = /opt/kb18tmp/sqooplog/users
users.sources.usersSource.deserializer = LINE
users.sources.usersSource.deserializer.maxLineLength = 320000
users.sources.usersSource.includePattern = users_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
users.sources.usersSource.interceptors = head_filter
users.sources.usersSource.interceptors.head_filter.type = regex_filter
users.sources.usersSource.interceptors.head_filter.regex = ^user_id*
users.sources.usersSource.interceptors.head_filter.excludeEvents = true
users.channels.usersChannel.type = file
users.channels.usersChannel.checkpointDir = /opt/kb18tmp/checkpoint/users/ck
users.channels.usersChannel.dataDirs = /opt/kb18tmp/checkpoint/users/data
#指定我们的sink为 kafka sink
users.sinks.usersSink.type = org.apache.flume.sink.kafka.KafkaSink
users.sinks.usersSink.batchSize = 640 #批量提交数量
users.sinks.usersSink.brokerList = 192.168.89.140:9092
users.sinks.usersSink.topic = users #kafka主题名字
#指定我们的source收集到的数据发送到哪个管道
users.sources.usersSource.channels = usersChannel
#指定我们的sink从哪个channel当中读取数据
users.sinks.usersSink.channel = usersChannel
- fflume 监控 /opt/kb18tmp/sqooplog/users 目录
- maxLineLength设置的是 一行读取的最大字符长度(wc -L 可查看文件最长行字符数)
- includePattern 模式匹配,正则。匹配监控目录下的文件名格式
- interceptors 设置拦截器(正则过滤首行)
- 在/opt/kb18tmp/目录下,创建spoolDir、checkpointDir(检查点)、dataDirs目录文件夹。
mkdir -p ./sqooplog/users
mkdir -p ./checkpoint/users/ck
mkdir -p ./checkpoint/users/data
- 启动kafka服务,建议手动创建topic,开启消费
kafka-console-consumer.sh --bootstrap-server 192.168.89.140:9092 --topic users
- /opt/software/apache-flume-1.9.0-bin 目录下启动监控
[root@cy apache-flume-1.9.0-bin]# ./bin/flume-ng agent --name users --conf ./conf/ --conf-file ./conf/kb18/users.conf
- 拷贝 源文件至监控目录下,并修改文件名为能够匹配正则的文件名,否则监控过滤。
cp ./users.csv /opt/kb18tmp/sqooplog/users/users_2022_09_08.csv
- 拷贝成功,kafka即能消费出数据
总结
flume配置采集文件是核心,配置好source、channel、sink三个组件。
更多推荐
已为社区贡献1条内容
所有评论(0)