fetch.min.bytes
这个参数允许消费者指定从broker读取消息时最小的数据量。当消费者从broker读取消息时,如果数据量小于这个阈值,broker会等待直到有足够的数据,然后才返回给消费者。对于写入量不高的主题来说,这个参数可以减少broker和消费者的压力,因为减少了往返的时间。而对于有大量消费者的主题来说,则可以明显减轻broker压力。

fetch.max.wait.ms
上面的fetch.min.bytes参数指定了消费者读取的最小数据量,而这个参数则指定了消费者读取时最长等待时间,从而避免长时间阻塞。这个参数默认为500ms。

max.partition.fetch.bytes
这个参数指定了每个分区返回的最多字节数,默认为1M。也就是说,KafkaConsumer.poll()返回记录列表时,每个分区的记录字节数最多为1M。如果一个主题有20个分区,同时有5个消费者,那么每个消费者需要4M的空间来处理消息。实际情况中,我们需要设置更多的空间,这样当存在消费者宕机时,其他消费者可以承担更多的分区。

需要注意的是,max.partition.fetch.bytes必须要比broker能够接收的最大的消息(由max.message.size设置)大,否则会导致消费者消费不了消息。另外,在上面的样例可以看到,我们通常循环调用poll方法来读取消息,如果max.partition.fetch.bytes设置过大,那么消费者需要更长的时间来处理,可能会导致没有及时poll而会话过期。对于这种情况,要么减小max.partition.fetch.bytes,要么加长会话时间。

session.timeout.ms
这个参数设置消费者会话过期时间,默认为3秒。也就是说,如果消费者在这段时间内没有发送心跳,那么broker将会认为会话过期而进行分区重平衡。这个参数与heartbeat.interval.ms有关,heartbeat.interval.ms控制KafkaConsumer的poll()方法多长时间发送一次心跳,这个值需要比session.timeout.ms小,一般为1/3,也就是1秒。更小的session.timeout.ms可以让Kafka快速发现故障进行重平衡,但也加大了误判的概率(比如消费者可能只是处理消息慢了而不是宕机)。

auto.offset.reset
这个参数指定了当消费者第一次读取分区或者上一次的位置太老(比如消费者下线时间太久)时的行为,可以取值为latest(从最新的消息开始消费)或者earliest(从最老的消息开始消费)。

enable.auto.commit
这个参数指定了消费者是否自动提交消费位移,默认为true。如果需要减少重复消费或者数据丢失,你可以设置为false。如果为true,你可能需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。

partition.assignment.strategy
我们已经知道当消费组存在多个消费者时,主题的分区需要按照一定策略分配给消费者。这个策略由PartitionAssignor类决定,默认有两种策略:

范围(Range):对于每个主题,每个消费者负责一定的连续范围分区。假如消费者C1和消费者C2订阅了两个主题,这两个主题都有3个分区,那么使用这个策略会导致消费者C1负责每个主题的分区0和分区1(下标基于0开始),消费者C2负责分区2。可以看到,如果消费者数量不能整除分区数,那么第一个消费者会多出几个分区(由主题数决定)。
轮询(RoundRobin):对于所有订阅的主题分区,按顺序一一的分配给消费者。用上面的例子来说,消费者C1负责第一个主题的分区0、分区2,以及第二个主题的分区1;其他分区则由消费者C2负责。可以看到,这种策略更加均衡,所有消费者之间的分区数的差值最多为1。
partition.assignment.strategy设置了分配策略,默认为org.apache.kafka.clients.consumer.RangeAssignor(使用范围策略),你可以设置为org.apache.kafka.clients.consumer.RoundRobinAssignor(使用轮询策略),或者自己实现一个分配策略然后将partition.assignment.strategy指向该实现类。

client.id
这个参数可以为任意值,用来指明消息从哪个客户端发出,一般会在打印日志、衡量指标、分配配额时使用。

max.poll.records
这个参数控制一个poll()调用返回的记录数,这个可以用来控制应用在拉取循环中的处理数据量。

receive.buffer.bytes、send.buffer.bytes
这两个参数控制读写数据时的TCP缓冲区,设置为-1则使用系统的默认值。如果消费者与broker在不同的数据中心,可以一定程度加大缓冲区,因为数据中心间一般的延迟都比较大。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐