使用 log4j 输出日志到 Kafka
一、背景介绍使用 log4j 自带的 appender,我们可以将日志打印到控制台,也可以将日志打印到本地文件。更深入使用的话,我们可以将日志进行隔离,根据日志级别输出到不同的文件,或者,我们还可以根据日期对日志进行滚动存储。这些原生功能看起来都很不错,但总感觉缺了点什么。假设我们部署了具有 10 个节点的集群,使用 log4j 记录日志到节点的本地文件。某一天,运维反馈集群有异常,需要找出故障点
一、背景介绍
使用 log4j 自带的 appender,我们可以将日志打印到控制台,也可以将日志打印到本地文件。更深入使用的话,我们可以将日志进行隔离,根据日志级别输出到不同的文件,或者,我们还可以根据日期对日志进行滚动存储。
这些原生功能看起来都很不错,但总感觉缺了点什么。
假设我们部署了具有 10 个节点的集群,使用 log4j 记录日志到节点的本地文件。某一天,运维反馈集群有异常,需要找出故障点。此时,我们只能对 10 个节点的日志文件进行逐一排查,找出其中的 ERROR 日志。这看起来十分僵硬。若我们的集群节点数达百个甚至千个,怎么办?维护人员怕是要对人生产生质疑。
所以,我们需要更优雅的方式,来管理我们的日志。而这,也正是本文的主要内容,使用 log4j 输出日志到 Kafka。
二、优劣分析
将日志发往 Kafka,我们可以得到以下好处:
- 集群日志拥有统一的数据源
- 集群日志可以快捷应用到各种查询引擎,例如,使用 Flume 将 Kafka 中的日志发往 ElasticSearch
但是,引入 Kafka Appeder,也不是一点风险都没有的,至少我们需要了解:
- 若 Kafka 集群异常,将导致日志发送失败,进而导致集群服务异常
当然,我们也可以不选择使用 Kafka Appender 将日志直接发往 Kafka,而是基于 Flume 读取每个节点的日志文件,再发往 Kafka 集群。这个方案会比较稳定,但是每个节点都需要部署 Flume 的成本恐怕也不是我们能接受的。
综合以上,我们最终选择 log4j + kafka-log4j-appender + Kafka 组合,实现统一输出日志到 Kafka 集群。
注:log4j 2 自带 Kafka Appender,请勿参考本文。
三、具体步骤
实现输出日志到 Kafka,我们需要完成 依赖添加 和 配置编写 两个步骤。
3.1 依赖添加
第一步,在项目 pom.xml
文件中添加依赖,与本文目标相关的依赖有:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>net.logstash.log4j</groupId>
<artifactId>jsonevent-layout</artifactId>
<version>1.7</version>
</dependency>
各个依赖包的说明如下:
- kafka-clients:Kafka 官方客户端工具
- kafka-log4j-appender:提供了 Kafka Appender 实现
- jsonevent-layout:提供日志 JSON 格式化功能
注:上述版本号搭配确认可用,若与自身项目存在版本冲突,需自身摸索解决。
3.2 配置编写
第二步,编写 log4j 配置文件,具体示例为:
log4j.rootLogger = INFO,kafka
log4j.logger.org.apache.kafka.clients.Metadata = ERROR
log4j.appender.kafka = org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic = default-flume-topic
log4j.appender.kafka.brokerList = master:9092,slave1:9092,slave2:9092
log4j.appender.kafka.compressionType = none
log4j.appender.kafka.Threshold = ERROR
log4j.appender.kafka.requiredNumAcks = -1
log4j.appender.kafka.syncSend = false
log4j.appender.kafka.maxBlockMs = 5000
log4j.appender.kafka.layout = net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields = app_name:xxx
其中,主要配置的简要说明如下:
配置项 | 说明 |
---|---|
log4j.appender.kafka.topic | log4j 日志发往的 topic 名称 |
log4j.appender.kafka.brokerList | Kafka Broker 地址列表 |
log4j.appender.kafka.compressionType | 生产者压缩类型,消费者需要与之一致 |
log4j.appender.kafka.Threshold | 输出的日志级别 |
log4j.appender.kafka.requiredNumAcks | 生产者 ack 值,-1 表示所有 Broker 都接收到才是生产成功 |
log4j.appender.kafka.syncSend | 输出方式,有同步输出和异步输出两种方式 |
注:除表格所述内容外,还有少数配置未予解释,这些内容将在下文作进一步讲解。
3.3 代码测试
最后,我们可以直接在代码中打印日志,并在对应的 Kafka 集群观察是否有日志生成:
public static void main(String[] args) throws Exception {
LOGGER.error("Hello World!");
}
四、常见问题
在参考网上各类教程时,发现总有些配置缺漏,引发非常棘手的问题。
(我很好奇这些博主,写文章时自己验证过吗?)
针对这些问题,进行了一些整理,并在本节中单独强调说明。
4.1 死锁
死锁恐怕是使用 Kafka Appender 时最容易碰到的问题,因为绝大部分教程都在 log4j 里面缺少了这行配置:
log4j.logger.org.apache.kafka.clients.Metadata = ERROR
如果缺少这行配置,将出现主线程和 Kafka 生产线程,因分别抢占 RootLogger 和 Metadata 对象锁,而陷入死锁的状态。此时,除非关闭进程,否则这个状态将一直僵持下去。
具体的原因,可以参考这位老哥的博文:https://blog.csdn.net/xiaolongge904913/article/details/104180991
4.2 JSON 格式化
如果我们希望集群日志可以通过 Kafka Appender -> Kafka -> ElasticSearch 的链路直接落地到搜索层,那么将日志转成 JSON 字符串是有必要的。
我们都知道,log4j 的输出格式通常是由 Pattern 指定的,大多数情况下是由 类名 + 时间 + 日志 等要素组成的字符串。此时,便需要将日志信息格式化为 JSON,而 JSONEventLayoutV1
已经替我们实现了这个需求,具体配置方式为:
log4j.appender.kafka.layout = net.logstash.log4j.JSONEventLayoutV1
通过该配置,我们输出到 Kafka 集群的日志将具备如下格式:
{
"source_host": "hostname",
"method": "intercept",
"level": "ERROR",
"message": "your log",
"mdc": {},
"@timestamp": "2021-04-19T05:33:30.245Z",
"file": "xxx.java",
"line_number": "64",
"thread_name": "pool-19-thread-1",
"@version": 1,
"logger_name": "com.xxx.xxx",
"class": "com.xxx.xxx"
}
4.3 自定义 KV
日志统一后,我们可能还会有一些自定义需求。比如说,我们需要在日志体中新增一个字段,以区分一下正式日志和测试日志。
这一点,可以通过 JSONEventLayoutV1
中的用户字段实现,具体使用方式如下:
log4j.appender.kafka.layout.UserFields = app_name:xxx
其中,app_name
为 K,xxx
为 V,多个 KV 键值对以逗号隔开。
更多推荐
所有评论(0)