文章目录

logstash配置kafka

根据不同topic输出到不同的ES索引

# input插件需要监听Logstash进程所在节点的端口,请使用8000~9000范围内的端口。
input {
  kafka { 
    bootstrap_servers => '10.0.24.111:9092'
    # 设置分组
    group_id => 'elk-dev'
    # 多个客户端同时消费需要设置不同的client_id,注意同一分组的客户端数量≤kafka分区数量
    client_id => 'elk-dev'
    # 正则匹配topic
    topics_pattern  => "elk-.*"
    codec => "json"
    #默认为false,只有为true的时候才会获取到元数据
    decorate_events => true
  }
}
filter {
  mutate {
    #从kafka的key中获取数据并按照"-"切割
    split => ["[@metadata][kafka][topic]", "-"]
    add_field => {
        #将切割后的第一位数据放入自定义的“index”字段中
        "index" => "%{[@metadata][kafka][topic][1]}"
    }
  }
}
output {
  # 支持output中添加file_extend output配置,即可在管道部署完成后直接查看输出结果,进行结果验证与调试
  # 请勿修改系统指定路径,注释或删除file_extend output部分配置,可关闭配置调试。详情见下方提示
  #file_extend {
  #  path => "/ssd/1/ls-cn-st221ygwu002/logstash/logs/debug/dev"
  #}
  elasticsearch {
    hosts => ["es-cn-zxxxxxxbm.elasticsearch.aliyuncs.com"]
    user => "elastic"
    password => "Super111$"
    #使用上面的index用作ES的索引
    index => "systemlog-dev-%{index}-%{+YYYY.MM.dd}"
  }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐