spring-kafka并行度concurrency在应用集群部署(多个节点)正确设置,附上Kafka Manager监控效果
我们都知道kafka有topic的概念,为了能够更好的支持水平扩容,topic又分了很多的Partition . 消费者负责消费Partition中的消息,一个Partition只能被一个消费者消费,但是一个消费者可以消费多个partition .所以提升消费能力可以开多几个消费者来消费partition,从而提高系统性能。spring.kafka.listener.concurrency就是sp
我们都知道kafka有topic的概念,为了能够更好的支持水平扩容,topic又分了很多的Partition . 消费者负责消费Partition中的消息,一个Partition只能被一个消费者消费,但是一个消费者可以消费多个partition .
所以提升消费能力可以开多几个消费者来消费partition,从而提高系统性能。
spring.kafka.listener.concurrency就是spring-kafka组件用来开启消费者线程数的参数。应用在单机部署环境下,这个参数很好理解,你想要开几个相应设置几个就行,concurrency数不能大于partition数量,因为partition会尽量平均分配给消费者,多出的会再重新分配给某些消费者,即消费者消费的partition数量会不等。
以下为根据concurrency开启线程数的代码:
org.springframework.kafka.listener.ConcurrentMessageListenerContainer#doStart
最终容器container是KafkaMessageListenerContainer类,因为它实现了SmartLifecycle接口,所以会自动执行到上面的doStart方法,接着调用KafkaMessageListenerContainer.start方法,最终会调用这个类的doStart方法以下部分开启消费者线程。
this.listenerConsumerFuture = containerProperties
.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
下面介绍在应用集群部署环境下的concurrency正确配置方式。
案例:
某天晚上,有一个topic消息数猛增。两个小时内涨了500w+,导致kafka消息堆积。为了临时解决消费慢问题,想到加大concurrency并行度来提升。之前设置的是8,这个topic有24个partition,想让一个消费者处理一个,所以concurrency临时调整为24。接着让运维重启应用,查看消费情况,一顿操作猛如虎,最后我勒个去,还是堆积,并且堆积时间更长了。。。
后面想想,我们应用是集群部署的,不是只有一个节点,所以消费者线程数量= 8 * 节点数,远远高于partition=24,所以加大线程数没意义。反而需要更多CPU消耗,堆积时间变长就足以说明。
其中一个节点的线程情况:调整为24后的
所以这样开出来的线程是完全浪费且消耗资源的,应该按节点数来设置这个参数才是正确的做法。
假如节点=3,应该设置= 24 / 3 = 8 个就行了。
org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1 说明下其中0-3-C-1第一个0是用来区分不同topic的,假如有两个topic,那么就是0-x-C-1和1-x-C-1
线程名称设置代码如下:
org.springframework.core.task.SimpleAsyncTaskExecutor#doExecute
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
为了验证这个观点,我自己本地也搭建了kafka集群。
设置了topic:hd-test-topic, 其中partition10个,我将spring.kafka.listener.concurrency设置为4
模拟应用集群部署,本地起两个端口的应用:
先起第一个,看到partition分配情况如下:
接着再启动另外一个节点:
明显看到之前设置的partition被撤销了,重新进行了分配,两个节点都成为了消费者。
所以证明上面观点是对的,在应用集群环境下concurrency要按应用部署节点数来设置。
以下部分为kafka的监控管理部分Kafka Manager
CMAK (Cluster Manager for Apache Kafka, previously known as Kafka Manager)
项目地址:Kafka Manager
要先下载cmak,我是下载当前最新版本的3.0.0.5 链接:CMAK
另外还要下载jdk11,zookeeper-3.5以上版本
配置好/cmak-3.0.0.5/conf/application.conf
kafka-manager.zkhosts="192.168.153.xxx:2181,192.168.153.xx:2182,192.168.153.xx:2183"
cmak.zkhosts="192.168.153.xx:2181,192.168.153.1xx:2182,192.168.153.1xx:2183"
先启动zk集群,然后启动kafka集群,最后再启动cmak。
kafka的zk集群配置好,在使用cmak过程中发生以下错误:
1.启动cmak后报错如下:
[error] k.m.a.c.BrokerViewCacheActor - Failed to get broker metrics for BrokerIdentity(1,192.168.153.1xx,9999,false,true,Map(PLAINTEXT -> 9092))
因为勾选了以下选项所以出现上面的错误
解决只需要修改kafka-server-start.sh 和 kafka-run-class.sh
kafka-server-start.sh加上export JMX_PORT=“9999”
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT="9999"
fi
kafka-run-class.sh
加上:-Djava.rmi.server.hostname=192.168.153.1xx
# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=192.168.153.1xx -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
fi
Kafka Manager效果图:
开始我选用CMAK1.3.3.18 版本发现Consumers consuming from this topic这部分显示不出来,后来换了最新版本的,很正常。
更多推荐
所有评论(0)