背景

目前我们的日志系统收集流为:  Filbeat-->Logstash-->Python过滤器--->Kafka--->Consumer--->Kibana。

因为FIlebeat不支持http层的output 所以只能依赖Logstash。所以我们决定替换FIlbeat 用Fluentd 的output 到http,减少数据流经过的中间服务。

对比Filebeat

Fluentd 支持多input和多output,并且插件比filebeat更多。功能更加丰富。

beats/filebeat at master · elastic/beats · GitHub     star: 10.4K

GitHub - fluent/fluentd: Fluentd: Unified Logging Layer (project under CNCF)  star: 1.7K

Fluentd官网

https://docs.fluentbit.io/manual/pipeline/outputs/http

https://docs.fluentd.org/

插件中心

List of All Plugins | Fluentd

Fluent & Fluent bit 对比

Fluent bit 优点 详见官网: Fluent Bit v1.8 Documentation - Fluent Bit: Official Manual

fluent和fluent bit 都是由Treasure Data创建的,fluent bit 没有fluent的聚合特性。工作原理如下:

Fluent-bit 是一个简单日志收集工具,上图就是对它工作流程的全局概述,它通过输入、转换、过滤、缓冲、路由到输出而完成日志的收集。

对比官网: Fluentd & Fluent Bit - Fluent Bit: Official Manual

两者对比
fluentdfluent-bit
范围容器/服务器容器/服务器
语言C和RubyC
大小约40MB约450KB
性能高性能高性能
依赖关系作为Ruby Gem构建,主要依赖gems除了一些安装编译插件(GCC、CMAKE)其它零依赖。
插件支持超过1000个可用插件大约70个可用插件
许可Apache许可证2.0版Apache许可证2.0版

所以我们可以用 fluent-bit 做日志收集,如果你不用我们的Python过滤器可以使用fluentd 做聚合操作。

工作流程

  • source : 定义输入源(directives determine the input sources)

  • match :定义匹配标签,输出到哪里。(directives determine the output destinations)

  • filter directives determine the event processing pipelines

  • system directives set system-wide configuration

  • label directives group the output and filter for internal routing

  • @include directives include other files

支持的常见输入何输出数据源

  • tail输入:增量读取日志文件作为数据源,支持日志滚动。
  • exec输入:定时执行命令,获取输出解析后作为数据源。
  • syslog输出:解析标准的syslog日志作为输入。
  • forward输入:接收其他fluentd转发来的数据作为数据源。
  • dummy:虚拟数据源,可以定时产生假数据,用于测试。
  • regexp解析器:使用正则表达式命名分组的方式提取出日志内容为JSON字段。
  • record_transformer过滤器:人为修改record内的字段。
  • file输出:用于将event落地为日志文件。
  • stdout:将event输出到stdout。如果fluentd以daemon方式运行,输出到fluentd的运行日志中。
  • forward:转发event到其他fluentd节点。
  • copy:多路输出,复制event到多个输出端。
  • kafka:输出event到Kafka。
  • elasticsearch:输出event到ES



常用的运维命令

# 修改用户何用户组
vim /usr/lib/systemd/system/td-agent.service

## 检测配置文件是否正确的方法
/opt/td-agent/embedded/bin/fluentd -c /etc/td-agent/td-agent.conf

安装Fluent 

官网安装链接:  https://docs.fluentd.org/installation/install-by-rpm

数据流逻辑

fluentd以tag值为基准,决定数据的流经哪些处理器

数据的流向为:Events ->source -> parser -> filter -> Matches and Labels (output)

Event对象Json结构

一个event对象通过source标签的input plugin转换为以下结构

  • tag:事件来源,用于消息路由
  • time:事发时间(通过source标签中的time_key、time_format来解析)
  • record:json格式的日志内容(每个format匹配的正则捕获键名作为一个json字段)

Source流

Source字段必须加Parse :  详情: Config: Parse Section - Fluentd

(1)http输入流

# http 输入
# http://<ip>:9880/myapp.access?json={"event":"data"}
<source>
  @type http
  port 9000
  bind 0.0.0.0
</source>



# 文件流输出
<match file.out>
   @type file
     path /home/data/logs/ok/out.log
</match>

## 标准输出
<match std.logs>
   @type stdout
</match>

发送请求: 

# 匹配的是match的 std.logs项
curl -X POST -d 'json={"name":"knight"}' http://localhost:9000/std.logs

#  匹配match的 file.out项
curl -X POST -d 'json={"city":"sz"}' http://localhost:9000/file.out 

结果如下:

2021-11-15 16:29:08.939551352 +0800 std.logs: {"foo":"bar"}
2021-11-15 16:29:22.665292836 +0800 std.logs: {"name":"knight"}

Match流

match 用来指定动作,通过 tag 匹配 source,然后执行指定的命令来分发日志,最常见的用法就是将 source 收集的日志转存到中间件服务。

  • *:匹配任意一个 tag;
  • **:匹配任意数量个 tag;
  • a b:匹配 a 或 b;
  • {X,Y,Z}:匹配 X, Y, Z 中的一个。

比如我们可以写成这样:

<match a.*>
<match **>
<match a.{b,c}>
<match a.* b.*>

fluentd 按照 match 出现的顺序依次匹配,一旦匹配成功就不会再往下匹配,所以如果你先写了一个 match **,然后后面的所有的 match 都会被忽略。

然后我们使用了 @type file 插件来处理事件,这个插件有一个 path 属性,用来指定输出文件。

用法和 source 几乎一模一样,不过 source 是抛出事件,match 是接收并处理事件。你同样可以找到大量的各式各样的输出插件

filter 流

filter 和 match 的语法几乎完全一样,但是 filter 可以串联成 pipeline,对数据进行串行处理,最终再交给 match 输出。比如往message字段里加新的东西等等。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐