Elasticsearch 常用操作、对接Logstash以及处理本地IP、ECS规范(干货满满)
经过前面两篇博文之后,现在已经有了article数据、可以查询相关数据。那接下来就是在这个基础上继续完善
文章目录
Elasticsearch 常用操作
Document常用操作
更新
一般更新我们可以选择部分更新
PUT /my-articles/_update/id
{
"doc":{
"title":"这里是修改修改之后的文章标题"
}
}
而对于根据查询更新,需要用到scripts,目前还没有场景用到scripts,所以这一部分暂时跳过
删除
DELETE /my-articles/_doc/id
Index常用操作
那么按照现有的场景,来完善一下,我们的article的基本数据结构是有了,那么能不能再添加一些场景呢。我们希望在查看文章数据时候,能够显示这篇文章来自于哪个国家,哪个城市,以便我们在Kibana可以在地图中方便的看到一个数据概览。那么我们需要修改现在my-articles这个index的mapping,需要为这个mapping添加一个表示地理位置的field。
前提知识
对于能够在Kibana进行绘图作用我们一般定义这么几个字段
mapping添加field
为my-article添加client.ip和client.geo.location两个filed,注意这里的点代表为嵌套结构,而不是field名为client.ip.
PUT /my-articles/_mapping
{
"properties":{
"client":{
"properties":{
"ip":{
"type":"ip"
},
"geo":{
"properties":{
"location":{
"type":"geo_point"
}
}
}
}
}
}
}
修改完成之后,可以看下mapping结构是
GET /my-articles/_mapping
mapping修改field
同时我们觉得,article中的reading_count值为integer太小了,我们需要将它改为long。也就是针对已存在的field修改其数据类型
对于已经有数据的field,我们不能直接修改其index,需要新创建一个index,然后将reindex过去,类似于复制
PUT /articles
{
"mappings": {
"properties": {
"reading_count":{
"type": "long"
}
}
}
}
reindex
使用reindex将my-articles重新复制到articles中,包括原来的数据
POST _reindx
{
"source":{
"index":"my-articles"
},
"dest":{
"index":"articles"
}
}
删除index
DELETE /my-articles
使用index template
为什么要使用index template呢?
- 一种情况是,在接入Logstash之后,如果我们想要按天生成index,并且生成index的时候我们希望有一些定制的field;
- 或者公司的Elasticsearch有一些共同规范,需要有统一的前缀,并且有一些规定的field必须有
那么这个时候index template就派上用场了.
这里我们始终使用一个index,不会按天生成,那么可以对当前这个index进行一些规范。当还有类似于文章的数据时,我们要求也基本和文章数据的index保持一致
添加index template
以上面的场景继续,既然有了地理位置信息filed,那么将来需要从Logstash的filter中解析IP,再输出到Elasticsearch中.
问题
如果在Elasticsearch中我们没有提前定义包含geo的field,那么默认是text类型的,不是IP和geo_point类型的,所以为了防止这种问题出现,我们先定义一个index template
所以我们的template应该包含如下几个方面
- IP信息字段
- location字段
- 符合ECS规范的@timestamp字段,type为date。ECS规范下面会说到
- 对于json数据中的created、updated,数据格式为yyyy-MM-dd HH:mm:ss,为了能够正确映射为date字段(如果不处理,默认为text),需要定义动态的field映射。所谓动态field映射,第一篇博文曾经提到过,在我们没有预定义index的时候,Elasticsearch会根据数据格式动态定义index中的field,那么这时可能有的field处理的不是特别正确,所以就要我们提前声明,这样Elasticsearch在动态映射时,碰到我们声明的格式,就会直接使用我们预先声明好的,更多说明详见dynamic mapping
PUT /_index_template/article_template
{
"priority":1,
"index_patterns": ["articles*"],
"template":{
"mappings" : {
"properties" : {
"@version" : {
"type" : "keyword"
},
"@timestamp": {
"type": "date"
},
"client" : {
"properties" : {
"geo" : {
"properties" : {
"location" : {
"type" : "geo_point"
}
}
},
"ip" : {
"type" : "ip"
}
}
}
},
"dynamic_date_formats": ["yyyy-MM-dd HH:mm:ss"]
},
"aliases" : { }
}
}
查看index template
查看我们刚才创建的index template
GET /_index_template/article*
修改index template
删除index template
收集日志使用内置的index template
2022年9月2日更新
如果是仅仅是直接收集来自Logstash的日志,那么我非常推荐使用内置的index template,名字是ecs-logstash。当然如果想个性化定制,依旧可以使用上面的步骤。
不过如果想在ECS的基础上做定制化,那么建议在 ecs-logstash这个index template的基础上进行改进
接入Logstash
到现在为止,我们安装了ELK,并对Elasticsearch进行了基础的学习,同时也配置好了我们需要的index、index template。那么接下来就是对接Logstash,比较贴近于企业场景了
一句话理解Logstash
Logstash 完成的工作就是接收数据、处理数据、输出数据
场景设计
和最初一样,在实际企业开发中,如何让article数据进入Elasticsearch中?场景一般为
- 程序里输出log
- Logstash拿到log
- 进行处理
- 输出到Elasticsearch中
我们这里直接在程序中将日志发送到Logstash中,也就是Logstash接收使用tcp input plugin接收
准备示例程序
示例程序的log4j2.xml配置
<Configuration>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<!-- 直接使用%C %M 非常降低log4j2的性能-->
<!-- <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %C{36}#%M (%X{clientIp}, %X{id}) %m%n"/>-->
<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %X{scn} %X{smn} (%X{clientIp}, %X{userId}) %m%n"/>
</Console>
<!--通过TCP直接发送到指定端口上-->
<Socket name="tcp" protocol="tcp" host="192.168.0.104" port="12201" immediateFail="true" immediateFlush="false"
bufferedIO="false">
<!--输出信息为ECS JSON格式-->
<JsonTemplateLayout eventTemplateUri="classpath:EcsLayout.json"/>
</Socket>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
<AppenderRef ref="tcp"/>
</Root>
</Loggers>
</Configuration>
这里将日志所在的Java类,方法名以及clientIp,userId这些共同的属性,通过MDC进行输出
ECS规范
在示例程序中,在Socket appender中我们配置了EcsLayout.json,为什么选用它呢?ECS是一个数据规范,因为处理数据如果符合ECS规范的话,拓展性较好。ECS规范定义了一些核心field供我们使用,平时在定义ES数据结构时,应尽量按照此规范指导进行定义。
查看ECS的field以及其他规范,参阅ECS文档
接收程序日志
创建一个配置文件
touch artilce.conf
书写input部分
input {
tcp {
host => "localhost"
port => 12201
codec => json
}
}
处理IP信息(及处理本地IP)
我们需要将JSON数据中clientIp的转为上面我们需要的地理信息
这里要提醒读者,对于本地IP geoip是无法解析的,那么会造成IP解析失败,为了处理情况,我们可以将本地IP进行过滤,如果IP不是本地IP再进行进入geoip的filter
如何判断IP是不是本地呢?使用正则即可,对于Logstash中条件的判断可参考Condition部分
fileter {
if [clientIp] =~ "^10.0.*" or [clientIp] =~ "^127.0.*" or [clientIp] == "0.0.0.0" or [clientIp] =~ "^0:0:0:0*" {
mutate {
add_tag => ["internal"]
}
}
if [clientIp] and "internal" not in [tags] {
geoip {
source => "clientIp"
target => "[client][geo]"
}
}
}
当IP为本地IP时,添加一个tag-internal表示IP为本地IP。
注意
这里经过geoip处理之后,target为[client][geo],这样才和上面定义的嵌套结构保持一致,千万不要写成target => client.geo,这样就是一个filed,而不是嵌套结构了。
关于Logstash中field的详细写法,可参考Field Reference和Field syntax
处理JSON
我们将完整的article的数据以json格式打印到日志message中,如果想要符合我们定义的article的数据结构,还需要将json数据解析,使用json filter,当不是json数据时,我们不处理;同时
filter {
json {
source => "message"
skip_on_invalid_json => true
}
}
重命名field名字
2022年9月2日更新
由于Java中Log4j2输出的ECS格式的日志都是以点分隔的,例如log.logger
我们必须通过mutate的rename将其转化为嵌套格式,例如[log][logger]
那有没有更方便的方式呢?
有,官方提供了de-dot插件,可以完成上面的转换,但是此操作极为昂贵,官方建议,不是走投无路的情况下别用这个插件…所以还是老老实实的用rename吧
由于我们采用了ECS规范,所以要将JSON数据中的一些field重命名为符合ECS规范的字段,将多余的field删除
使用mutate filter,这里我们使用了rename和remove_field 这两个process
filter {
mutate {
rename => {
"clientIp" => "[client][ip]"
"userId" => "[client][user][id]"
"smn" => "[log][origin][function]"
"scn" => "[log][origin][file][name]"
"readingCount" => "reading_count"
"articleContent" => "article_content"
"articleGenre" => "article_genre"
}
remove_field => ["[client][geo][ip]","[client][geo][latitude]","[client][geo][longitude]","[message]"]
}
}
说明:client.geo.ip并不是原始log中带的,是由于经过geoip处理之后,IP默认的位置为client.geo.ip,但是ECS规范是把IP放到了其他field下面的,所以这里经过rename之后,删除掉多余的field
最终的filter配置
完整的filter的部分如下
fileter {
json {
source => "message"
skip_on_invalid_json =>true
}
if [clientIp] =~ "^10.0.*" or [clientIp] =~ "^127.0.*" or [clientIp] == "0.0.0.0" or [clientIp] =~ "^0:0:0:0*" {
mutate {
add_tag => ["internal"]
}
}
if [clientIp] and "internal" not in [tags] {
geoip {
source => "clientIp"
target => "[client][geo]"
}
}
mutate {
rename => {
"clientIp" => "[client][ip]"
"userId" => "[client][user][id]"
"smn" => "[log][origin][function]"
"scn" => "[log][origin][file][name]"
"readingCount" => "reading_count"
"articleContent" => "article_content"
"articleGenre" => "article_genre"
}
remove_field => ["[client][geo][ip]","[client][geo][latitude]","[client][geo][longitude]","[message]"]
}
}
输出到Elasticsearch中去
在测试环境,可以使用stdout plugin进行调试,看看最终的数据是什么样的
对于输出可以选择file和elasticseach
output {
stdout {}
file {
# 生成年月日的格式
path => "D:\ElkLogs\testEcs-%{+YYYY-MM-dd}.log"
}
elasticsearch {
hosts => ["localhost:9200"]
user => "elastic"
password => "password"
index => "articles"
}
}
拓展:Elasticsearch Pipeline
其实对于filter的处理部分,Elasticsearch本身也有支持,就是Pipeline
基于这篇博文内容有点多,所以这里不做介绍,只是引出,后续博文再做介绍有兴趣可以了解下。
验证流程
此时所有准备都已经完成,现在所有数据都由我们请求web应用的接口,动态向Elasticsearch中输送。
为了验证整体流程的准确性,即
- web应用输出日志
- Logstash处理日志并输出到Elasticsearch中
- Elasticsearch根据index template创建index
- 展示数据
我们将前两篇所用到的index和其中的数据全部删掉
DELETE /articles
启动刚才准备的示例程序-正常启动无报错即可,
-
请求接口
-
在Logstash控制台中查看
-
在Elasticsearch中进行查看,看最终生成的数据是否符合我们的预期
可以看到,对于嵌套的格式数据也是正确的
-
在Kibana中查看,在Kibana之前,不要忘了创建index pattern
可以看到,数据格式和我们的期望是一致的,地理位置信息解析出来了,然后符合ECS规范的字段也是有的,其余的自定义的字段就是我们的article的数据结构了
思维导图
有了完整的流程,以及带有地理位置信息的数据。下一篇将在kibana中绘制地图
更多推荐
所有评论(0)