Elasticsearch 常用操作

Document常用操作

更新

一般更新我们可以选择部分更新

PUT /my-articles/_update/id
{
    "doc":{
        "title":"这里是修改修改之后的文章标题"
    }
}

而对于根据查询更新,需要用到scripts,目前还没有场景用到scripts,所以这一部分暂时跳过

删除

DELETE /my-articles/_doc/id

Index常用操作

那么按照现有的场景,来完善一下,我们的article的基本数据结构是有了,那么能不能再添加一些场景呢。我们希望在查看文章数据时候,能够显示这篇文章来自于哪个国家,哪个城市,以便我们在Kibana可以在地图中方便的看到一个数据概览。那么我们需要修改现在my-articles这个index的mapping,需要为这个mapping添加一个表示地理位置的field。

前提知识

对于能够在Kibana进行绘图作用我们一般定义这么几个字段

  • field名字为ip,type为IP
  • field名字为location,type为Geo-point,包含维度-latitude,经度-longitude

mapping添加field

为my-article添加client.ip和client.geo.location两个filed,注意这里的代表为嵌套结构,而不是field名为client.ip.

更新mapping

PUT /my-articles/_mapping
{
  "properties":{
    "client":{
      "properties":{
        "ip":{
          "type":"ip"
        },
        "geo":{
          "properties":{
            "location":{
              "type":"geo_point"
            }
          }
        }
      }
    }
  }
}

修改完成之后,可以看下mapping结构是

GET /my-articles/_mapping

mapping修改field

同时我们觉得,article中的reading_count值为integer太小了,我们需要将它改为long。也就是针对已存在的field修改其数据类型

对于已经有数据的field,我们不能直接修改其index,需要新创建一个index,然后将reindex过去,类似于复制

PUT /articles
{
  "mappings": {
    "properties": {
      "reading_count":{
        "type": "long"
      }
    }
  }
}

reindex

使用reindex将my-articles重新复制到articles中,包括原来的数据

POST _reindx
{
    "source":{
        "index":"my-articles"
    },
    "dest":{
        "index":"articles"
    }
}

删除index

删除已经没用的index

DELETE /my-articles

使用index template

为什么要使用index template呢?

  • 一种情况是,在接入Logstash之后,如果我们想要按天生成index,并且生成index的时候我们希望有一些定制的field;
  • 或者公司的Elasticsearch有一些共同规范,需要有统一的前缀,并且有一些规定的field必须有

那么这个时候index template就派上用场了.

这里我们始终使用一个index,不会按天生成,那么可以对当前这个index进行一些规范。当还有类似于文章的数据时,我们要求也基本和文章数据的index保持一致

添加index template

以上面的场景继续,既然有了地理位置信息filed,那么将来需要从Logstash的filter中解析IP,再输出到Elasticsearch中.

问题

如果在Elasticsearch中我们没有提前定义包含geo的field,那么默认是text类型的,不是IP和geo_point类型的,所以为了防止这种问题出现,我们先定义一个index template

所以我们的template应该包含如下几个方面

  1. IP信息字段
  2. location字段
  3. 符合ECS规范的@timestamp字段,type为date。ECS规范下面会说到
  4. 对于json数据中的created、updated,数据格式为yyyy-MM-dd HH:mm:ss,为了能够正确映射为date字段(如果不处理,默认为text),需要定义动态的field映射。所谓动态field映射,第一篇博文曾经提到过,在我们没有预定义index的时候,Elasticsearch会根据数据格式动态定义index中的field,那么这时可能有的field处理的不是特别正确,所以就要我们提前声明,这样Elasticsearch在动态映射时,碰到我们声明的格式,就会直接使用我们预先声明好的,更多说明详见dynamic mapping
PUT /_index_template/article_template
{
  "priority":1,
  "index_patterns": ["articles*"],
  "template":{
    "mappings" : {
            "properties" : {
              "@version" : {
                "type" : "keyword"
              },
               "@timestamp": {
                  "type": "date"
       			 },
              "client" : {
                "properties" : {
                  "geo" : {
                    "properties" : {
                      "location" : {
                        "type" : "geo_point"
                      }
                    }
                  },
                  "ip" : {
                    "type" : "ip"
                  }
                }
              }
            },
            "dynamic_date_formats": ["yyyy-MM-dd HH:mm:ss"]
          },
          "aliases" : { }
  }
}
查看index template

查看我们刚才创建的index template

查找index template

GET /_index_template/article*
修改index template

修改index template

删除index template

删除index template

收集日志使用内置的index template

2022年9月2日更新

如果是仅仅是直接收集来自Logstash的日志,那么我非常推荐使用内置的index template,名字是ecs-logstash。当然如果想个性化定制,依旧可以使用上面的步骤。

不过如果想在ECS的基础上做定制化,那么建议在 ecs-logstash这个index template的基础上进行改进

接入Logstash

到现在为止,我们安装了ELK,并对Elasticsearch进行了基础的学习,同时也配置好了我们需要的index、index template。那么接下来就是对接Logstash,比较贴近于企业场景了

一句话理解Logstash

Logstash 完成的工作就是接收数据、处理数据、输出数据

场景设计

和最初一样,在实际企业开发中,如何让article数据进入Elasticsearch中?场景一般为

  • 程序里输出log
  • Logstash拿到log
  • 进行处理
  • 输出到Elasticsearch中

我们这里直接在程序中将日志发送到Logstash中,也就是Logstash接收使用tcp input plugin接收

准备示例程序

示例程序完整代码

示例程序的log4j2.xml配置

<Configuration>
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
<!--            直接使用%C %M 非常降低log4j2的性能-->
<!--            <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %C{36}#%M (%X{clientIp}, %X{id}) %m%n"/>-->
            <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %X{scn} %X{smn} (%X{clientIp}, %X{userId}) %m%n"/>
        </Console>
        <!--通过TCP直接发送到指定端口上-->
        <Socket name="tcp" protocol="tcp" host="192.168.0.104" port="12201" immediateFail="true" immediateFlush="false"
                bufferedIO="false">
            <!--输出信息为ECS JSON格式-->
            <JsonTemplateLayout eventTemplateUri="classpath:EcsLayout.json"/>
        </Socket>
    </Appenders>
    <Loggers>
        <Root level="info">
            <AppenderRef ref="Console"/>
            <AppenderRef ref="tcp"/>
        </Root>
    </Loggers>
</Configuration>

这里将日志所在的Java类,方法名以及clientIp,userId这些共同的属性,通过MDC进行输出

ECS规范

在示例程序中,在Socket appender中我们配置了EcsLayout.json,为什么选用它呢?ECS是一个数据规范,因为处理数据如果符合ECS规范的话,拓展性较好。ECS规范定义了一些核心field供我们使用,平时在定义ES数据结构时,应尽量按照此规范指导进行定义。

查看ECS的field以及其他规范,参阅ECS文档

接收程序日志

创建一个配置文件

touch artilce.conf

书写input部分

input {
   tcp {
      host => "localhost"
      port => 12201
      codec => json
   }
}

处理IP信息(及处理本地IP)

我们需要将JSON数据中clientIp的转为上面我们需要的地理信息

使用geoip filter

这里要提醒读者,对于本地IP geoip是无法解析的,那么会造成IP解析失败,为了处理情况,我们可以将本地IP进行过滤,如果IP不是本地IP再进行进入geoip的filter

如何判断IP是不是本地呢?使用正则即可,对于Logstash中条件的判断可参考Condition部分

fileter {
    if [clientIp] =~ "^10.0.*" or [clientIp] =~ "^127.0.*" or [clientIp] == "0.0.0.0" or [clientIp] =~ "^0:0:0:0*" { 
        mutate {
            add_tag => ["internal"]
        }
     }
     if [clientIp] and "internal" not in [tags] {
        geoip {
          source => "clientIp"
          target => "[client][geo]"
        }
     }
}

当IP为本地IP时,添加一个tag-internal表示IP为本地IP。

注意

这里经过geoip处理之后,target为[client][geo],这样才和上面定义的嵌套结构保持一致,千万不要写成target => client.geo,这样就是一个filed,而不是嵌套结构了。

关于Logstash中field的详细写法,可参考Field ReferenceField syntax

处理JSON

我们将完整的article的数据以json格式打印到日志message中,如果想要符合我们定义的article的数据结构,还需要将json数据解析,使用json filter,当不是json数据时,我们不处理;同时

filter {
  json {
    source => "message" 
    skip_on_invalid_json => true
  }
}

重命名field名字

2022年9月2日更新
由于Java中Log4j2输出的ECS格式的日志都是以点分隔的,例如log.logger
我们必须通过mutate的rename将其转化为嵌套格式,例如[log][logger]

那有没有更方便的方式呢?
有,官方提供了de-dot插件,可以完成上面的转换,但是此操作极为昂贵,官方建议,不是走投无路的情况下别用这个插件…所以还是老老实实的用rename吧

由于我们采用了ECS规范,所以要将JSON数据中的一些field重命名为符合ECS规范的字段,将多余的field删除

使用mutate filter,这里我们使用了rename和remove_field 这两个process

filter {
   mutate {
		rename => { 
		  "clientIp" => "[client][ip]" 
		  "userId" => "[client][user][id]"
		  "smn" => "[log][origin][function]"
		  "scn" => "[log][origin][file][name]"
		  "readingCount" => "reading_count"
      "articleContent" => "article_content"
      "articleGenre" => "article_genre"
		}
		remove_field => ["[client][geo][ip]","[client][geo][latitude]","[client][geo][longitude]","[message]"]
    }
}

说明:client.geo.ip并不是原始log中带的,是由于经过geoip处理之后,IP默认的位置为client.geo.ip,但是ECS规范是把IP放到了其他field下面的,所以这里经过rename之后,删除掉多余的field

最终的filter配置

完整的filter的部分如下

fileter {
     json {
        source => "message"
        skip_on_invalid_json =>true
    }
    if [clientIp] =~ "^10.0.*" or [clientIp] =~ "^127.0.*" or [clientIp] == "0.0.0.0" or [clientIp] =~ "^0:0:0:0*" { 
        mutate {
            add_tag => ["internal"]
        }
     }
     if [clientIp] and "internal" not in [tags] {
        geoip {
          source => "clientIp"
          target => "[client][geo]"
        }
     }
     mutate {
        rename => { 
          "clientIp" => "[client][ip]" 
          "userId" => "[client][user][id]"
          "smn" => "[log][origin][function]"
          "scn" => "[log][origin][file][name]"
          "readingCount" => "reading_count"
          "articleContent" => "article_content"
          "articleGenre" => "article_genre"
        }
			 remove_field => ["[client][geo][ip]","[client][geo][latitude]","[client][geo][longitude]","[message]"]
    }
}

输出到Elasticsearch中去

在测试环境,可以使用stdout plugin进行调试,看看最终的数据是什么样的

对于输出可以选择fileelasticseach

output {
  stdout {}
  file {
  # 生成年月日的格式
      path => "D:\ElkLogs\testEcs-%{+YYYY-MM-dd}.log"
  }
  elasticsearch {
	  hosts => ["localhost:9200"]
	  user => "elastic"
	  password => "password"
	  index => "articles"
  }
}

拓展:Elasticsearch Pipeline

其实对于filter的处理部分,Elasticsearch本身也有支持,就是Pipeline

基于这篇博文内容有点多,所以这里不做介绍,只是引出,后续博文再做介绍有兴趣可以了解下。

验证流程

此时所有准备都已经完成,现在所有数据都由我们请求web应用的接口,动态向Elasticsearch中输送。

为了验证整体流程的准确性,即

  • web应用输出日志
  • Logstash处理日志并输出到Elasticsearch中
  • Elasticsearch根据index template创建index
  • 展示数据

我们将前两篇所用到的index和其中的数据全部删掉

DELETE /articles

启动刚才准备的示例程序-正常启动无报错即可,

  1. 请求接口
    在这里插入图片描述

  2. 在Logstash控制台中查看
    在这里插入图片描述

  3. 在Elasticsearch中进行查看,看最终生成的数据是否符合我们的预期

在这里插入图片描述

可以看到,对于嵌套的格式数据也是正确的

  1. 在Kibana中查看,在Kibana之前,不要忘了创建index pattern
    在这里插入图片描述

    可以看到,数据格式和我们的期望是一致的,地理位置信息解析出来了,然后符合ECS规范的字段也是有的,其余的自定义的字段就是我们的article的数据结构了

思维导图

在这里插入图片描述

有了完整的流程,以及带有地理位置信息的数据。下一篇将在kibana中绘制地图

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐