1.背景

Logstash 是一个开源数据收集引擎,具有实时流水线功能。通过输入、过滤、输出三个步骤Logstash 可以将不同来源的数据加工后同时输出至多个数据源。
logStash接收到的数据一般都是各个业务系统的日志,需要使用filter进行过滤加工之后进行输出,以下介绍过滤和输出过程中存在的坑(数据源为filebeat)。

2.grok/date插件介绍

2.1.grok插件

grok支持120种模式的内置变量,可以访问此处查看和调试所有的内置变量:点击查看
除了支持内置的匹配模式外还支持通过正则表达式自定义匹配规则:

(?< field_name >正则表达式模式)

field_name即为匹配到的变量的name,如:
(?\<num>\d+)匹配的结果是:定义了一个变量num,匹配规则是至少为一位数字

2.2.date插件

date插件有5个属性,如下:

属性类型说明
match数组格式如下:[ field,formats… ],field是要解析的字段的name,可以是内置属性也可以是自定义属性,从数组第二元素及以后的位置都是匹配的日期格式规则,如果解析的属性有多个日期格式,可以在这使用多个元素全部包含进去,日期格式可以参考:点此查看
locate字符串用于日期解析的区域设置,默认使用平台默认值
tag_on_failure数组默认值为 ["_dateparsefailure"],在没有成功匹配时将值附加到字段
target字符串将匹配的时间戳存储到给定的目标字段中。如果未提供,则默认替换@timestamp字段
timezone字符串用于日期解析的时区规范 ID。有效 ID 点此查看。这在无法从值中提取时区的情况下很有用,并且不是平台默认值。如果未指定,将使用平台默认值

3.埋坑

3.1.grok获取输入源文件名称

每个数据源都会携带自己的文件名称,filebeat作为数据源时,源文件路径所在字段为source,值如下:

/export/Logs/serverLog1.log

如果文件名的取值为字母数字下划线,正则匹配规则为\w+,则filter如下:

#过滤规则
filter{
    grok {
        match => {
            "source" => "/export/Logs/(?<logName>\w+).log"
        }
        overwrite => [ "source"]
    }
}
#输出规则
output {
#输出至控制台
    stdout {
        codec => rubydebug
    }
}

过滤之后通过控制台可以看到输出的结果中会多一个属性logName

3.2.grok解析多个日志文本字段输出自定义字段

解析多个字段,只需要在filter中写多个match即可,而且每个字段文本中可以解析多个自定义属性。注:如果一个文本字段中需要解析多个自定义属性,注意必须加上break_on_match属性,切值设置为false,这个属性默认为true即匹配到一个就返回。
如果日志如下:

sum= 2124 avg= 14.3 size=312343kB deal success…

filter{
    grok {
        match => {
            "message" => "sum=\s*(?<total>\d+)\s*avg=\s*(?<num>\d+[\.\d+]*)\s*.*size=\s*(?<fileSize>\d+[\.\d+]*)kB\s*.*"
        }
        match => {
            "source" => "/export/Logs/(?<logName>\w+).log"
        }
        overwrite => [ "message"]
        overwrite => [ "source"]
        break_on_match => false
    }
}

过滤之后通过控制台能看到新增属性total,num,fileSize,logName

3.3.grok、data插件搭配解析日志日期替换@timestamp

logstash解析之后会默认有个@timestamp属性,这个属性是当前时间戳,有些场景会需要替换这个为日志中的时间,但是日志中的时间格式显示的又是自定义格式,没法通过内置模式直接取到,那就需要使用grok和date插件配合使用。注:如果是输出至es,也可以再加上ruby插件将时间戳再加8小时,避免es中时间相差8小时问题。
如果日志格式如下:

11/24/2021, 15:22:13 0x2e1a230] UDP timeout, retrying with TCP…

filter{
    grok {
        match => {
            "message" => "(?<logTime>%{MONTHNUM}/%{MONTHDAY}/%{YEAR}[,]?\s+%{TIME}\s*.*"
        }
        match => {
            "source" => "/export/Logs/(?<logName>\w+).log"
        }
        overwrite => [ "message"]
        overwrite => [ "source"]
        break_on_match => false
    }
    date {
        match => ["logTime","MM/dd/yyyy, hh:mm:ss a","M/dd/yyyy, hh:mm:ss a"]
#加上target替换logTime的值,不加target替换@timestamp属性
#        target => "logTime"
    }
# 设置logTime属性增加8小时,解决es使用UTC时间相差8小时问题
#    ruby {
#        code => "event.set('logTime', event.get('logTime').time.localtime + 8*60*60)"
#    }
# 使用logTime属性替换@timestamp属性值
#    ruby {
#       code => "event.set('@timestamp',event.get('logTime'))"
#    }
    mutate {
      remove_field => ["logTime"]
    }
}

2.4.输出ES使用索引模板

如果业务数据量过大,es索引设计使用的是模板定时生成默认,那么logstash存储es时候就要使用动态索引支持动态的存储数据进去。


output {
# 输出至控制台
#    stdout {
#        codec => rubydebug
#    }
# 输出至es
    elasticsearch {
        hosts => [ "es1.demo.com:40000", "es2.demo.com:40000" ]
        user => "es-username"
        password => "1qaz2wsx3dc"
        index => "log_monitor_%{+yyyy_MM}"
        document_type => "log_monitor_type"
    }
}

如上配置中,es的连接地址有两个,es按照模板按月生成以log_monitor为前缀的索引,索引类型为log_monitor_type(es6.x之后一个索引只能有一个type,es7.x之后已经取消了type),其中%{+yyyy_MM}为动态获取内置模式的值来达到动态输出至不同索引。

4.总结

结尾附上一个完整版配置

input {
    beats {
        port => "5000"
    }
}

filter{
    grok {
        match => {
            "message" => "(?<logTime>%{MONTHNUM}/%{MONTHDAY}/%{YEAR}[,]?\s+%{TIME}\s+sum=\s*(?<total>\d+)\s*avg=\s*(?<num>\d+[\.\d+]*)\s*.*size=\s*(?<fileSize>\d+[\.\d+]*)kB\s*.*"
        }
        match => {
            "source" => "/export/Logs/(?<logName>\w+).log"
        }
        overwrite => [ "message"]
        overwrite => [ "source"]
        break_on_match => false
    }
    date {
        match => ["logTime","MM/dd/yyyy, hh:mm:ss a","M/dd/yyyy, hh:mm:ss a"]
#加上target替换logTime的值,不加target替换@timestamp属性
#        target => "logTime"
    }
# 设置logTime属性增加8小时,解决es使用UTC时间相差8小时问题
#    ruby {
#        code => "event.set('logTime', event.get('logTime').time.localtime + 8*60*60)"
#    }
# 使用logTime属性替换@timestamp属性值
#    ruby {
#       code => "event.set('@timestamp',event.get('logTime'))"
#    }
    mutate {
      remove_field => ["logTime"]
    }
}


output {
#    stdout {
#        codec => rubydebug
#    }
    elasticsearch {
        hosts => [ "es1.demo.com:40000", "es2.demo.com:40000" ]
        user => "es-username"
        password => "1qaz2wsx3dc"
        index => "log_monitor_%{+yyyy_MM}"
        document_type => "log_monitor_type"
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐