配置Kafka output

kafka output 发送事件到apache kafka

https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html

Example configuration

output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]

  # message topic selection + partitioning
  topic: '%{[fields.log_topic]}'
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

大于max_message_bytes的事件将被丢弃,确保Filebeat发出的事件小于max_message_bytes

兼容性

kafka outpu可以工作与kafka 0.11 和 2.2.2之间的所有版本。旧版本或许也能执行,但不被支持。

配置选项

enable

enable是一个布尔值,可以控制output的启用和停用。

默认值:true

host

列表,获取集群元数据的kafka borker地址。

version

假设filebeat所输出到的对应的kafka的版本

默认:1.0.0

username

连接kafka的用户名,如果设置了用户名就必须设置password

password

连接kafka的密码

sasl.mechanism

连接到 Kafka 时使用的 SASL 机制。处于测试阶段。

topic

用于生产事件的topic。

可以通过使用格式字符串访问任何事件字段来动态设置主题。例如,此配置使用自定义字段 fields.log_topic为每个事件设置主题:

topic: '%{[fields.log_topic]}'

如何添加事件的自定义fields:https://www.elastic.co/guide/en/beats/filebeat/current/configuration-general-options.html#libbeat-configuration-fields

topics

一组topic选择器规则。每个规则指定topic用于匹配规则的events。Filebeat根据数组中的第一个匹配规则为每个事件设置topic 。规则可以包含conditionals、基于字符串格式的fields和name mapping。如果topics字段没有配置或者没有规则匹配上,则会使用topic字段

规则设置:

topic

        要使用的topic的字符串格式,如果字符串包含字段引用。如:%{[fields.name]},那么引用的字段必须存在,否则规则失效。

mappings

        字典,获取topic返回值,将其映射到新的名称

default

        mappings未找到匹配项时,使用默认字符串的值

when

        条件,必须成功才能执行当前规则。when支持的条件:https://www.elastic.co/guide/en/beats/filebeat/current/defining-processors.html#conditions

以下示例根据消息字段是否包含指定字符串来设置topic:

output.kafka:
  hosts: ["localhost:9092"]
  topic: "logs-%{[agent.version]}"
  topics:
    - topic: "critical-%{[agent.version]}"
      when.contains:
        message: "CRITICAL"
    - topic: "error-%{[agent.version]}"
      when.contains:
        message: "ERR"

这个配置规则的 topics 名称将会有 critical-7.13.3, error-7.13.3, 和 logs-7.13.3。

key

partition

client_id

用于日志记录、调试和审计目的的可配置 ClientID。默认为“beats”。

worker

负载平衡的 Kafka output工作进程的并发线程的数量。

codec

输出编解码器配置。如果codec缺少该部分,则事件将被 json 编码。

https://www.elastic.co/guide/en/beats/filebeat/current/configuration-output-codec.html

metadata

Kafka 元数据更新设置。元数据确实包含有关用于发布的brokers, topics, partition和活动领导者的信息。

refresh_frequency

        元数据刷新间隔,默认十分钟。

full

        获取元数据时使用的策略,当此选项为 时true,客户端将维护所有可用topic的完整元数据集,如果此选项设置为false它只会刷新配置topic的元数据。默认值为false。

retry.max

        当集群处于领导者选举中间时,元数据更新重试的总数。默认值为 3。

retry.backoff

        领导选举期间重试之间的等待时间。默认值为 250 毫秒。

max_retries

Filebeat 会忽略该max_retries设置,并无限期重试。

bulk_max_size

在单个 Kafka 请求中批量处理的最大事件数。默认值为 2048。

bulk_flush_frequency

发送批量 Kafka 请求之前等待的时间。0 是没有延迟。默认值为 0。

timeout

超时前等待 Kafka 代理响应的秒数。默认值为 30(秒)。

broket_timeout

代理等待所需 ACK 数量的最长时间。默认为 10 秒。

channel_buffer_size

每个 Kafka 代理在输出管道中缓冲的消息数。默认值为 256。

keep_alive

活动网络连接的保持活动期。如果为 0,则保持连接被禁用。默认值为 0 秒。

compression

设置输出压缩编解码器。必须是none,snappy,lz4和gzip其中一个。默认为gzip。

compression_level

设置 gzip 使用的压缩级别。将此值设置为 0 将禁用压缩。压缩级别必须在 1(最佳速度)到 9(最佳压缩)的范围内。

提高压缩级别会降低网络使用率,但会增加 CPU 使用率。

默认值为 4。

mas_message_bytes

JSON 编码消息的最大允许大小。较大的消息将被丢弃。默认值为 1000000(字节)。该值应等于或小于borker的message.max.bytes。

required_acks

borker要求的 ACK 可靠性级别。0=无响应,1=等待本地提交,-1=等待所有副本提交。默认值为 1。

注意:如果设置为 0,则 Kafka 不会返回任何 ACK。消息可能会在出错时静默丢失。

enable_krb5_fast

启用 Kerberos FAST 身份验证。这可能与某些 Active Directory 安装冲突。它与标准 Kerberos 设置不同,因为此标志仅适用于 Kafka 输出。默认为false。

Warning:此功能处于测试阶段,可能会发生变化。

ssl

SSL 参数的配置选项。

kerberos

Kerberos 身份验证的配置选项。

Warning:此功能处于测试阶段,可能会发生变化。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐