目录

嵌套Json格式数据

JSON格式一

JSON格式二

 JSON格式三

组件解析:

时间相关操作

调整logstash @timestamp

时间戳转换


嵌套Json格式数据

JSON格式一

 有如下JSON日志(position下是一个JSON)

{
    "RequestTime":1637737587605,
    "timestamp":"2021-11-24T15:06:42.681Z",
    "position":{
        "LogType":"请求日志",
        "TopDirectory":"stream_ad_v1",
        "RequestIp":"127.0.0.1"
    }
}

将其解析为:

{
       "RequestIp" => "127.0.0.1",
    "TopDirectory" => "stream_ad_v1",
       "timestamp" => "2021-11-24T15:06:42.681Z",
     "RequestTime" => 1637737587605,
         "LogType" => "请求日志"
}

思路:

1、新建一个新的字段(position_su),并将position赋值给新字段

2、对新字段进行JSON解析

3、移除多余字段(可选

 详细logstash.conf:

input{
        stdin {
               codec => json {
                        charset => ["UTF-8"]
               }
             }
}
filter{
 mutate {
                add_field => {"position_su" => "%{position}"} #先新建一个新的字段,并将position赋值给它
        }
        json {
                source => "position_su" # 再对该字段进行解析
                remove_field => ["position_su","position"] # 最后移出多余字段(可选)
        }
}
output{
    stdout { codec => rubydebug }
}

JSON格式二

有如以下JSON格式(position下是一个JSON数组,数组元素为1)

{
    "RequestTime":1637737587605,
    "timestamp":"2021-11-24T15:06:42.681Z",
    "position":[
        {
            "LogType":"请求日志",
            "TopDirectory":"stream_ad_v1",
            "RequestIp":"127.0.0.1"
        }
    ]
}

如果不做改变,沿用上述方法,将解析失败,如图:

 此时可以先不进行JSON转换,将“[]”替换之后,在进行JSON转换,

logstash.conf:

input{
        stdin {}
}
filter{
        mutate {
                gsub => [ "message","\[","" ]
                gsub => [ "message","\]","" ]
        }
        json { source => "message" }
        mutate {
                add_field => {"position_su" => "%{position}"} #先新建一个新的字段,并将position赋值给它
        }
        json {
                source => "position_su" # 再对该字段进行解析
                remove_field => ["position_su","position"] # 最后移出多余字段(可选)
        }
}
output{
    stdout { codec => rubydebug }
}

 JSON格式三

有如下JSON数据格式(position下是一个JSON数组,数组元素为n)

{
    "RequestTime":1637737587605,
    "timestamp":"2021-11-24T15:06:42.681Z",
    "position":[
        {
            "LogType":"请求日志",
            "TopDirectory":"stream_ad_v1",
            "RequestIp":"127.0.0.1"
        },
        {
            "LogType":"请求日志",
            "TopDirectory":"stream_ad_v2",
            "RequestIp":"127.0.0.1"
        },
        {
            "LogType":"请求日志",
            "TopDirectory":"stream_ad_v3",
            "RequestIp":"127.0.0.1"
        }
    ]
}

此时将“[]”替换就不能解决问题,观察此JSON经过logstash处理后的输出格式,如图:

我们发现,logstash支持数组元素按索引位置获取元素,因此,logstash.conf可以如下改造:

input{
        stdin {
                codec =>json {
                charset => ["UTF-8"]
                }
            }
}
filter{ 
        mutate {
                add_field => {"position_json" => "%{[position][0]}"}
        }
        mutate {
                add_field => {"position_su" => "%{position_json}"} #先新建一个新的字段,并将position赋值给它
        }
        json {
                source => "position_su" # 再对该字段进行解析
                remove_field => ["position_su","position"] # 最后移出多余字段(可选)
        }
} 
output{
    stdout { codec => rubydebug }
}

执行结果如图:

备注: 如果遇到以下复杂的版本,LogType下嵌套一个或多个JSON数组。

{
    "RequestTime":1637737587605,
    "timestamp":"2021-11-24T15:06:42.681Z",
    "position":[
        {
            "LogType":[
                {
                    "a":"请求日志a"
                },
                {
                    "b":"请求日志b"
                },
                {
                    "c":"请求日志c"
                }
            ],
            "TopDirectory":"stream_ad_v1",
            "RequestIp":"127.0.0.1"
        },
        {
            "LogType":"请求日志",
            "TopDirectory":"stream_ad_v2",
            "RequestIp":"127.0.0.1"
        },
        {
            "LogType":"请求日志",
            "TopDirectory":"stream_ad_v3",
            "RequestIp":"127.0.0.1"
        }
    ]
}

        目前我能想到的方法是抽丝剥茧,依次向下取数据。但实际工作中,这样的需求是不合理的,类似的操作应该放到ElasticSearch中来完成。

组件解析:

codec

        编码插件(codec)可以在logstash输入或输出时处理不同类型的数据,同时,还可以更好更方便的与其他自定义格式的数据产品共存,比如:fluent、netflow、collectd等通用数据格式的其他产品。

        因此,logstash不只是一个input→filter→output的数据流,而是一个input→decode→filter→encode→output的数据流。

        codec支持的编码格式常见有plain、json、json_lines等。

  • plain 是最简单的编码插件,你输入什么信息,就返回什么信息。使用样例:
    • output { stdout { codec => plain } }
  • json 有时候logstash采集的日志是JSON格式,那我们可以在input字段加入codec => json来进行解析,这样就可以根据具体内容生成字段,方便分析和储存。如果想让logstash输出为json格式,可以在output字段加入codec=>json。
  • json_lines(即JSON的格式化) 使用json插件时,每个字段都是key:value格式,多个字段之间通过逗号分隔。这种输出比较长,因此我们可以采用json_lines编码格式稍微好一点。

mutate (过滤器)

mutate过滤器能够帮助你修改指定字段的内容。

  • 转变参数类型 convert

mutate {
    convert => {
          "TopDirectory" => "string"
         "RequestIp" => "string"
    }
}

  • 复制字段 copy

复制一个已存在的字段到另外一个字段,已存在的字段会被重新写到一个新的字段,新的字段不需要单独添加。
mutate {
    copy => {"TopDirectory" => "TopDirectory2"}
}

  • 正则表达式替换 gsub

这里只针对string类型字段,如下把name字段中的“o”替换为“p”
mutate {
    gsub => ["name","o","p"]
}

  • 大小写转换 lowercase&uppercase

mutate {

    #lowercase => [ "TopDirectory" ]    #小写

    uppercase => [ "TopDirectory" ]        #大写

}

  • 字段重命名 rename

mutate {

     rename => {"TopDirectory" => "TopDirectory3"}

}

  • 除去字段值前后空格 strip

mutate {

    strip => ["TopDirectory"]

}

  • 更新字段值 update

mutate {

    update => {"TopDirectory" => "li"}

}

  • 修改字段 replace

作用和 update 类似,但是当字段不存在的时候,它会起到 add_field 参数一样的效果,自动添加新的字段。

  • 移除字段 remove_field

mutate {

    remove_field => ["TopDirectory"]

}

  • 增加字段 add_field

mutate {

    add_field => {"testField1" => "0"}

    add_field => {"testField2" => "%{TopDirectory}"}   #引用TopDirectory中的值

}

时间相关操作

我们先来看一下logstash的标准输出:

需要解决的问题:

  •  目前系统时间为2021-11-26T11:31:16, 而logstash显示时间明显与服务器相差8个小时
  •  使用日志时间替换logstash中的时间

调整logstash @timestamp

问题:logstash时间与服务器时间相差8小时

解决思路:

将“@timestamp”时间加8个小时(8 * 60 * 60)

在filter中加入:

filter{
        ruby {
                code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
        }
        ruby {   code => "event.set('@timestamp',event.get('timestamp'))" }
        mutate {
                remove_field => ["timestamp"]

        }
}

时间戳转换

需求:用日志时间替换logstash时间

解决方法:

调用 date { } 方法

 在filter中加入:

filter {
       date {
                match => ["RequestTime", "UNIX_MS", "yyyy-MM-dd HH:mm:ss,SSS"] 
                target => "@timestamp"
                locale => "cn"
       }
}

备注:

  • match中参数:字段名,格式化模式,要转换的时间格式
  • target:值类型是字符串,默认值是“@timestamp”。将匹配的时间戳存储到给定的目标字段中。
  • locale:值类型是字符串,这个设置没有默认值。使用IETF-BCP47或POSIX语言标记指定用于日期分析的区域设置。 简单的例子是:
    • en,美国的BCP47
    • en_US的POSIX。

logstash中支持的时间格式如下:

  • ISO8601 - 将解析任何有效的ISO8601时间戳,如2021-11-30T17:44:01.103Z
  • UNIX - 将解析float或int值,表示自1346149001.132以及1326149001.132以来的秒数(以秒为单位)
  • UNIX_MS - 将分析int值表示unix时间(以毫秒为单位),如1366125117000TAI64N - 将解析tai64n时间值

路漫漫其修远兮,吾将上下而求索。                ---屈原 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐