总叙

通过kafka生产消费配额管理可以消峰,减少瞬时kafka的压力。生产配额可以配置每秒写入kafka的字节数,消费配额可以配置每秒消费字节数。
配置粒度可以是全局的,也可以是用户级或者clientId级别。这里以clientID级别管理做测试。

一、测试环境

192.168.1.115机器

二、测试项目

1、生产者限流10M/S测试

1.1 测试步骤

1.1.1 配置clientId为gh_pro_1的生产者限流10M/S

进入Kafka目录,输入如下命令

bin/kafka-configs.sh  \
--zookeeper 192.168.1.115:2181 \
--alter --add-config 'producer_byte_rate=10485760,consumer_byte_rate=10485760' \
--entity-type clients --entity-name gh_pro_1

到zk节点下查看配置是否写入成功

bin/zkCli.sh 
get /config/clients/gh_pro_1

在这里插入图片描述
看到已经写入到zk,该配置实时生效,不需要重起Kafka,如果是写到配置文件的则要重起。

1.1.2 命令行监控该生产者ID生产情况

进入Kafka目录输入如下命令:

bin/kafka-producer-perf-test.sh \
--topic test --num-records 100 \
--record-size 1024 --throughput -1 \
--producer-props bootstrap.servers=192.168.1.115:9092 acks=-1 client.id=gh_pro_1
1.1.3 java程序开始生产数据

创建生产者,需要保持与配置的ID一致,然后循环写入数据,每条数据大小为1kb。
在这里插入图片描述

1.1.4 运行到命令行查看结果

在这里插入图片描述
当限流设为10M时,生产速率在10M之间徘徊,生产被限流。

2、生产者限流50M/S测试

上述步骤只需要修改配额参数即可

bin/kafka-configs.sh  \
--zookeeper 192.168.1.115:2181 \
--alter --add-config 'producer_byte_rate=52428800,consumer_byte_rate=52428800' \
--entity-type clients --entity-name gh_pro_1

然后运行java生产者结果如下,可以看到写入速率在50M/S左右,写入被限流:
在这里插入图片描述

3、生产者限速积压测试

3.1 配置生产者限速为1kb每秒

bin/kafka-producer-perf-test.sh \
--topic test2 --num-records 10000 \
--record-size 1024 --throughput -1 \
--producer-props bootstrap.servers=192.168.1.115:9092 acks=-1 client.id=gh_pro_1

3.3 启动java程序消费test2

public static void main(String[] args){
		KafkaConsumer<String, String> consumer = KafkaUtils.genConsumer("192.168.1.115:9092", "gh_pro_1", "groupid_01",new String[]{"test2"});
		long count=0;
		long startTime=System.currentTimeMillis();
		while(true){
			ConsumerRecords<String, String> records = consumer.poll(200);
			for(ConsumerRecord<String,String> record:records){
				System.out.println(count+"->"+record.value());
				if(record.value().contains("end")){
					System.out.println("共消费"+count+"条数据,耗时:"+(System.currentTimeMillis()-startTime));
				}
				count++;
			}
		}
	}

3.3 运行java程序写入Kafka

每次写100条,连续写100次,之后保持producer与Kafka的心跳每10秒写一次
创建生产者需要制定client ID

	public static Producer<String, String> genProducer(String kafkaServer,String cliId) {
		Properties kafkaProps = new Properties();
		kafkaProps.put("client.id", cliId);
		kafkaProps.put("acks", "all");
		kafkaProps.put("retries", 0);
		// kafkaProps.put("compression.type", "snappy");
		kafkaProps.put("batch.size", 16384); // 100
		kafkaProps.put("linger.ms", 1);
		kafkaProps.put("buffer.memory", 1073741824); // 33554432 943718400
		// kafka 会接收单个消息size的最大限制 默认1m
		kafkaProps.put("message.max.bytes", 1073741824);
		kafkaProps.put("max.request.size", 1073741824);// 10485760 生产者能请求的最大消息
		kafkaProps.put("max.in.flight.requests.per.connection", 1);
		// 主机信息(broker)
		kafkaProps.put("bootstrap.servers", kafkaServer);
		// 键为字符串类型
		kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// 值为字符串类型
		kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(kafkaProps);
		return producer;
	}

在这里插入图片描述

3.4 观察Kafka终端命令行消费

观察结果得知积压超过5分钟的数据将被丢失,Kafka会自动从最新位置开始消费数据。
在这里插入图片描述

4、消费者限流测试

4.1 限流100KB/S测试

4.1.1 配置消费者限流为100kb/s
bin/kafka-configs.sh  \
--zookeeper 192.168.1.115:2181 \
--alter --add-config 'producer_byte_rate=1024000,consumer_byte_rate=102400' \
--entity-type clients --entity-name gh_pro_1
4.1.2 生产数据

生产者生产1万条数据,每条数据1kb 。

4.1.3 启动消费者

消费者创建需要制定clientID

public static KafkaConsumer<String,String> genConsumer(String kafkaServer, String clientId,String groupId, String... topics) {
		Properties properties = new Properties();
		properties.put("bootstrap.servers", kafkaServer);// xxx是服务器集群的ip
		properties.put("client.id", clientId);
		properties.put("group.id", groupId);
		properties.put("enable.auto.commit", "true");
		properties.put("auto.commit.interval.ms", "1000");
		properties.put("fetch.max.bytes", 1073741824); // 这是消费者能读取的最大消息
		properties.put("auto.offset.reset", "latest");// latest earliest
		properties.put("session.timeout.ms", "6000");
		properties.put("max.partition.fetch.bytes", 1073741824);
		properties.put("receive.message.max.bytes", 1073742824);
		properties.put("heartbeat.interval.ms", "2000");
		properties.put("max.poll.records", 1);
		properties.put("max.poll.interval.ms", "1000");
		properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
		kafkaConsumer.subscribe(Arrays.asList(topics));
		return kafkaConsumer;
	}
public static void main(String[] args){
		KafkaConsumer<String, String> consumer = KafkaUtils.genConsumer("192.168.1.115:9092", "gh_pro_1", "groupid_01",new String[]{"test2"});
		long count=0;
		long startTime=System.currentTimeMillis();
		while(true){
			ConsumerRecords<String, String> records = consumer.poll(200);
			for(ConsumerRecord<String,String> record:records){
				System.out.println(count+"->"+record.value());
				if(record.value().contains("end")){
					System.out.println("共消费"+count+"条数据,耗时:"+(System.currentTimeMillis()-startTime));
				}
				count++;
			}
		}
	}

按每秒100kb的速率消费,10000kb预计100秒消费完成。
在这里插入图片描述

测试结果与预期结果相符。

4.2 限流1M/S测试

配置消费者限流每秒1M:

bin/kafka-configs.sh  \
--zookeeper 192.168.1.115:2181 \
--alter --add-config 'producer_byte_rate=104857600,consumer_byte_rate=1048576' \
--entity-type clients \
--entity-name gh_pro_1

生产102400条数据,每条1kb,总共约100M数据,每秒消费1M,预期100秒完成消费
在这里插入图片描述
实际结果与预期结果相符。

三、总结

1、配置限流需要指定clientID
2、限流配置成功后超过限流会以断续的方式写入或者消费,类似于java的sleep,延长整体处理时间,从而达到限流的目的
3、生产者限流积压超时未写入Kafka的数据会丢失,超时时间约为5分钟

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐