一、背景介绍

使用 log4j 自带的 appender,我们可以将日志打印到控制台,也可以将日志打印到本地文件。更深入使用的话,我们可以将日志进行隔离,根据日志级别输出到不同的文件,或者,我们还可以根据日期对日志进行滚动存储。

这些原生功能看起来都很不错,但总感觉缺了点什么。

假设我们部署了具有 10 个节点的集群,使用 log4j 记录日志到节点的本地文件。某一天,运维反馈集群有异常,需要找出故障点。此时,我们只能对 10 个节点的日志文件进行逐一排查,找出其中的 ERROR 日志。这看起来十分僵硬。若我们的集群节点数达百个甚至千个,怎么办?维护人员怕是要对人生产生质疑。

所以,我们需要更优雅的方式,来管理我们的日志。而这,也正是本文的主要内容,使用 log4j 输出日志到 Kafka。

二、优劣分析

将日志发往 Kafka,我们可以得到以下好处:

  • 集群日志拥有统一的数据源
  • 集群日志可以快捷应用到各种查询引擎,例如,使用 Flume 将 Kafka 中的日志发往 ElasticSearch

但是,引入 Kafka Appeder,也不是一点风险都没有的,至少我们需要了解:

  • 若 Kafka 集群异常,将导致日志发送失败,进而导致集群服务异常

当然,我们也可以不选择使用 Kafka Appender 将日志直接发往 Kafka,而是基于 Flume 读取每个节点的日志文件,再发往 Kafka 集群。这个方案会比较稳定,但是每个节点都需要部署 Flume 的成本恐怕也不是我们能接受的。

综合以上,我们最终选择 log4j + kafka-log4j-appender + Kafka 组合,实现统一输出日志到 Kafka 集群。

注:log4j 2 自带 Kafka Appender,请勿参考本文。

三、具体步骤

实现输出日志到 Kafka,我们需要完成 依赖添加配置编写 两个步骤。

3.1 依赖添加

第一步,在项目 pom.xml 文件中添加依赖,与本文目标相关的依赖有:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-log4j-appender</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>net.logstash.log4j</groupId>
    <artifactId>jsonevent-layout</artifactId>
    <version>1.7</version>
</dependency>

各个依赖包的说明如下:

  • kafka-clients:Kafka 官方客户端工具
  • kafka-log4j-appender:提供了 Kafka Appender 实现
  • jsonevent-layout:提供日志 JSON 格式化功能

注:上述版本号搭配确认可用,若与自身项目存在版本冲突,需自身摸索解决。

3.2 配置编写

第二步,编写 log4j 配置文件,具体示例为:

log4j.rootLogger = INFO,kafka

log4j.logger.org.apache.kafka.clients.Metadata = ERROR

log4j.appender.kafka = org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic = default-flume-topic
log4j.appender.kafka.brokerList = master:9092,slave1:9092,slave2:9092
log4j.appender.kafka.compressionType = none
log4j.appender.kafka.Threshold = ERROR
log4j.appender.kafka.requiredNumAcks = -1
log4j.appender.kafka.syncSend = false
log4j.appender.kafka.maxBlockMs = 5000
log4j.appender.kafka.layout = net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields = app_name:xxx

其中,主要配置的简要说明如下:

配置项说明
log4j.appender.kafka.topiclog4j 日志发往的 topic 名称
log4j.appender.kafka.brokerListKafka Broker 地址列表
log4j.appender.kafka.compressionType生产者压缩类型,消费者需要与之一致
log4j.appender.kafka.Threshold输出的日志级别
log4j.appender.kafka.requiredNumAcks生产者 ack 值,-1 表示所有 Broker 都接收到才是生产成功
log4j.appender.kafka.syncSend输出方式,有同步输出和异步输出两种方式

注:除表格所述内容外,还有少数配置未予解释,这些内容将在下文作进一步讲解。

3.3 代码测试

最后,我们可以直接在代码中打印日志,并在对应的 Kafka 集群观察是否有日志生成:

public static void main(String[] args) throws Exception {
    LOGGER.error("Hello World!");
}

四、常见问题

在参考网上各类教程时,发现总有些配置缺漏,引发非常棘手的问题。

(我很好奇这些博主,写文章时自己验证过吗?)

针对这些问题,进行了一些整理,并在本节中单独强调说明。

4.1 死锁

死锁恐怕是使用 Kafka Appender 时最容易碰到的问题,因为绝大部分教程都在 log4j 里面缺少了这行配置:

log4j.logger.org.apache.kafka.clients.Metadata = ERROR

如果缺少这行配置,将出现主线程和 Kafka 生产线程,因分别抢占 RootLogger 和 Metadata 对象锁,而陷入死锁的状态。此时,除非关闭进程,否则这个状态将一直僵持下去。

具体的原因,可以参考这位老哥的博文:https://blog.csdn.net/xiaolongge904913/article/details/104180991

4.2 JSON 格式化

如果我们希望集群日志可以通过 Kafka Appender -> Kafka -> ElasticSearch 的链路直接落地到搜索层,那么将日志转成 JSON 字符串是有必要的。

我们都知道,log4j 的输出格式通常是由 Pattern 指定的,大多数情况下是由 类名 + 时间 + 日志 等要素组成的字符串。此时,便需要将日志信息格式化为 JSON,而 JSONEventLayoutV1 已经替我们实现了这个需求,具体配置方式为:

log4j.appender.kafka.layout = net.logstash.log4j.JSONEventLayoutV1

通过该配置,我们输出到 Kafka 集群的日志将具备如下格式:

{
  "source_host": "hostname",
  "method": "intercept",
  "level": "ERROR",
  "message": "your log",
  "mdc": {},
  "@timestamp": "2021-04-19T05:33:30.245Z",
  "file": "xxx.java",
  "line_number": "64",
  "thread_name": "pool-19-thread-1",
  "@version": 1,
  "logger_name": "com.xxx.xxx",
  "class": "com.xxx.xxx"
}

4.3 自定义 KV

日志统一后,我们可能还会有一些自定义需求。比如说,我们需要在日志体中新增一个字段,以区分一下正式日志和测试日志。

这一点,可以通过 JSONEventLayoutV1 中的用户字段实现,具体使用方式如下:

log4j.appender.kafka.layout.UserFields = app_name:xxx

其中,app_name 为 K,xxx 为 V,多个 KV 键值对以逗号隔开。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐