logstash配置kafka
文章目录logstash配置kafkalogstash配置kafka根据不同topic输出到不同的ES索引# input插件需要监听Logstash进程所在节点的端口,请使用8000~9000范围内的端口。input {kafka {bootstrap_servers => '10.0.24.111:9092'group_id => 'elk-dev'# 正则匹配topictopics
·
文章目录
logstash配置kafka
根据不同topic输出到不同的ES索引
# input插件需要监听Logstash进程所在节点的端口,请使用8000~9000范围内的端口。
input {
kafka {
bootstrap_servers => '10.0.24.111:9092'
# 设置分组
group_id => 'elk-dev'
# 多个客户端同时消费需要设置不同的client_id,注意同一分组的客户端数量≤kafka分区数量
client_id => 'elk-dev'
# 正则匹配topic
topics_pattern => "elk-.*"
codec => "json"
#默认为false,只有为true的时候才会获取到元数据
decorate_events => true
}
}
filter {
mutate {
#从kafka的key中获取数据并按照"-"切割
split => ["[@metadata][kafka][topic]", "-"]
add_field => {
#将切割后的第一位数据放入自定义的“index”字段中
"index" => "%{[@metadata][kafka][topic][1]}"
}
}
}
output {
# 支持output中添加file_extend output配置,即可在管道部署完成后直接查看输出结果,进行结果验证与调试
# 请勿修改系统指定路径,注释或删除file_extend output部分配置,可关闭配置调试。详情见下方提示
#file_extend {
# path => "/ssd/1/ls-cn-st221ygwu002/logstash/logs/debug/dev"
#}
elasticsearch {
hosts => ["es-cn-zxxxxxxbm.elasticsearch.aliyuncs.com"]
user => "elastic"
password => "Super111$"
#使用上面的index用作ES的索引
index => "systemlog-dev-%{index}-%{+YYYY.MM.dd}"
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)