结合项目中使用ELK做日志管理工具,前面已整合介绍了Elasticsearch的相关内容,项目中部署两个Logstash节点从Kafka集群消费消息并输出到Elasticsearch集群进行日志数据存储,本文结合项目实践以及官网等网络资料,继续整合做ELK技术栈中的Logstash相关简介、工作过程、安装运行以及相关配置等内容。文中不免疏漏之处,望请读者予以评论探讨不足之处,共同学习、不断进步,不胜感激!

1. Logstash简介

Logstash 是一个开源数据收集引擎,具备实时管道处理能力。在 ELK Stack 中,它的主要作用是收集日志内文件数据,在内部将日志数据进行格式化和规范化,然后发送到指定的接收者,通常是 Elasticsearch。Logstash 主要依靠丰富的内部插件来对搜集的数据进行处理。

Logstash优点

1.可伸缩性
Beats应该在一组Logstash节点之间进行负载平衡,建议至少使用两个Logstash节点以实现高可用性。
每个Logstash节点只部署一个Beats输入是很常见的,但每个Logstash节点也可以部署多个Beats输入,以便为不同的数据源公开独立的端点。
2.弹性
Logstash持久队列提供跨节点故障的保护。对于Logstash中的磁盘级弹性,确保磁盘冗余非常重要。
对于内部部署,建议配置RAID。在云或容器化环境中运行时,建议使用具有反映数据SLA的复制策略的永久磁盘。
3.可过滤
对事件字段执行常规转换。可以重命名,删除,替换和修改事件中的字段。
可扩展插件生态系统,提供超过200个插件,以及创建和贡献自己的灵活性。

Logstash缺点

Logstash耗资源较大,运行占用CPU和内存高。另外没有消息队列缓存,存在数据丢失隐患。

2. Logstash工作过程

在这里插入图片描述

Logstash事件处理管道有三个阶段:输入、过滤器、输出。 输入生成事件,过滤器修改它们,输出将它们运送到别处。 输入和输出支持codecs(编解码器),使您可以在数据进入或退出流水线时进行编码或解码,而无需使用单独的过滤器。这四个部分均以插件形式存在,用户通过定义pipeline配置文件,设置需要使用的input,filter,output,codec插件,以实现特定的数据采集,数据处理,数据输出等功能 。

Inputs:用于从数据源获取数据,常见的插件如file, syslog, redis, beats 等
Filters:用于处理数据如格式转换,数据派生等,常见的插件如grok, mutate, drop, clone, geoip等
Outputs:用于数据输出,常见的插件如elastcisearch,file, graphite, statsd等
Codecs:Codecs(编码插件)不是一个单独的流程,而是在输入和输出等插件中用于数据转换的模块,
				用于对数据进行编码处理,常见的插件如json,multiline。
Logstash不只是一个input | filter | output 的数据流,而是一个 input | decode | filter | encode | output 的数据流!
			codec 就是用来 decode、encode 事件的。

3. Logstash安装运行

  1. 下载安装包

     https://artifacts.elastic.co/downloads/logstash/logstash-6.2.2.tar.gz
    
  2. 解压

     tar -zvxf logstash-6.2.2.tar.gz
    
  3. 启动运行

     ./logstash -f  input_out.conf
      input_out.conf为配置文件,其中可指定数据源input、filter过滤以及数据导出方向output等;
    

4. Logstash配置

配置文件示例:标准输入输出,文件输入到标准控制台输出,标准控制台输入Elasticsearch输出,kafka输入Elasticsearch输出等

标准控制台输入输出配置

input {
    stdin{
    }
}
output {
    stdout{
    }
}

文件输入到标准控制台输出

input{
    file{
        path =>"/home/u-0/logstash/logstash-6.2.2/logs/logstash-plain.log"
        start_position=>"beginning"
    }
}

filter{
    grok{
        match=>{
            "message"=>"%{DATA:clientIp} "
        }

        remove_field=>"message"
    }
    date{
        match=>["accessTime","dd/MMM/yyyy:HH:mm:ss Z"]
    }
}

output{
    stdout{
        codec=>rubydebug
    }
}

Grok 是 Logstash 最重要的插件。它的主要作用就是将文本格式的字符串,转换成为具体的结构化的数据,配合正则表达式使用。

input/file/path:指定扫描的文件。扫描多个文件可以使用*路径通配符;
				或者以数组形式提供(path=>[“outer-access.log”,”access.log”]);
				或者直接给定目录,logstash会扫描路径所有的文件,并监听是否有新文件。

filter/grok/match/message:里面的DATA是grok语法内置的正则表达式,DATA匹配任意字符.

filter/grok/match/date:是对HTTPDATE日期格式的解释,joda可以支持很多复杂的日期格式,
						需要在这里指明才能正确匹配。

remove_field=>”message”:用处是去掉原有的整个日志字符串,仅保留filter解析后的信息。
						可以试着去掉这一句就可明白用处。

标准控制台输入Elasticsearch输出

input{
        stdin {}
}
output {
        elasticsearch {
                hosts => ["192.168.65.146:9201","192.168.65.148:9202","192.168.65.149:9203"]
                index => "test_index"
                user => "elastic"
    			password => "pwd123"
        }
        stdout { codec => rubydebug}
}

kafka输入Elasticsearch输出

input{
        kafka {
        		type => "log"
        		bootstrap_servers => ["http://192.168.65.146:9092","http://192.168.65.148:9092","http://192.168.65.149:9092"]
        		group_id => "log_consumer_group"
        		auto_offset_reset => "earliest"
        		consumer_threads => 1
        		decorate_events => true
        		topics => ["log1","log2"]
        		codec => "json"
        }
}
filter {
	mutate {
			lowercase => ["service"]
	}
}

output {
        elasticsearch {
                hosts => ["192.168.65.146:9201","192.168.65.148:9202","192.168.65.149:9203"]
                index => "test_index"
                user => "elastic"
    			password => "pwd123"
        }
        stdout { codec => rubydebug}
}

5. 参考资料

[1]. Logstash介绍.
[2]. logstash简介及架构.
[3]. Logstash详细记录.
[4]. Logstash 最佳实践.
[4]. ELK下之Logstash性能调优.

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐