Kafka producer写入优化
目录提出需求环境信息测试系统,寻找问题Kafka producer 写入原理Producer 参数ACKS 参数Producer send 方法调用结果Note参考文献提出需求由于线上的接口是暴露给外部用户使用,外部用户付费接入,产品经理提出了需求,需要在公司内部做重新数据清洗的时候尽量减少对外部用户数据更新的影响,当前更新数据需要耗费24个小时左右,产品经理提出12小时是可接受的,于是针对当前的
提出需求
由于线上的接口是暴露给外部用户使用,外部用户付费接入,产品经理提出了需求,需要在公司内部做重新数据清洗的时候尽量减少对外部用户数据更新的影响,当前更新数据需要耗费24个小时左右,产品经理提出12小时是可接受的,于是针对当前的调用链路逐一排查,寻找问题.
环境信息
所有服务基于AWS公有云部署,
producer部署在ECS上 1C2G 由于之前的架构设计,只能部署单节点,从HTTP API拉取数据.
Kafka server 使用AWS MSK, 3台broker, 2C8G
Kafka topic 12 partition, 3 replication
kafka version: 2.2.1
测试系统,寻找问题
为了判断系统的瓶颈在什么地方,我们首先review代码,挑选出当前系统可能存在性能瓶颈的地方,使用kafka server cloudwatch中的messageInPerSecond作为我们观测的性能指标,不做任何修改的时候进行测试,得到baseline的结果为单台broker 120records/s
之后进行了不同类型的测试,如去掉日志,去掉发送cloudwatch metric等,基本上只能达到140records/s的速率就提升不上去了.
我们尝试将写入kafka的操作去掉,从日志输出情况可以看到,处理速度提升了4倍左右.因此可以断定,系统瓶颈在kafka写入.
Kafka producer 写入原理
Producer 参数
如图所示,当调用producer.send()方法之后,Kafka会将record进行serializaer
及partitationer
操作,将record根据不同的partition放入不同的buffer中,每个partition对应了一个buffer,写入的message需要放在buffer的batch中,由另外的一个i/o线程取出后发送到Kafka server,对于如何生成这个batch有两个关键的参数,分别是batch.size
和linger.ms
具体含义是:
batch.size
: 当发送过来的kafka records数据量大小小于设置的值时,将会放在同一个batch中进行发送,否则会新生成一个batch存放当前的这些消息,默认大小为16kb
linger.ms
: 发送的kafka records在一段时间内将进入到同一个batch中,超过时段的话会新创建一个batch,默认大小为0,含义为每来一条数据生成一个batch.
两个参数组合起来先达到哪个条件就优先生成batch,并且被io线程读到并发送.
ACKS 参数
acks参数是producer另一个非常重要的参数,在kafka 2.x版本中默认参数为ALL, 3.x版本中默认参数为1, 另外也可以取值为0.
不同取值的含义:
ALL
: 当producer的send请求到了kafka对应的topic的partition leader节点,leader节点在本地将数据写入到磁盘,并且请求其他的replication从节点,等到所有的从节点成功响应之后,向kafka producer返回写入成功的信息.
1
: 当producer的send请求到了kafka对应的topic的partition leader节点,leader节点写入本地磁盘成功后立即返回成功的response到producer.
0
: 当producer的send请求到了kafka对应的topic的partition leader节点,不管是否写入成功,立即返回成功的response.
从实际的测试结果看,当acks=1时,反而写入的速率相对于acks=all有了一些下降.所以最终并没有对acks的值进行修改.
Producer send 方法调用
需要注意的是,producer.send方法之后会返回一个Future对象,在我们之前的代码中,获取到了Future对象之后直接调用的get方法, Future调用了get方法之后会将异步调用转换为同步等待,producer为我们提供了另外一个可以异步回调的方法public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
,可以在callback中定义后续对于返回结果中需要进行的操作.
结果
在修改了Producer调用方法从阻塞到非阻塞之后,单个broker的写入速率达到了260records/s
修改batch.size=100 kb linger.ms=50ms 之后,速率达到了400records/s
比之前的生产速率提升了3倍多,从结果来看已经能够达到预期了.
从最终的结果可以看出,从producer的调用方法及修改了producer的参数之后,写入效率有了极大的提升.
Note
在查阅资料的过程中,对于kafka producer参数的设置,topic partition的设置,并没有一个标准的最佳实践,文章中给出的建议基本都是需要根据当前环境,使用不同的参数在环境中进行测试,选择最优的测试结果对应的参数.
参考文献
Benchmarking Kafka producer throughput
Creating Advanced Kafka Producers
Kafka Performance Tuning
Optimizing for Throughput
Optimizing Kafka producers
Difference between callback
and returned Future
Kafka Producer’s Architecture and Internal Workings
更多推荐
所有评论(0)