Kafka为了增加系统的伸缩性(Scalability),引入了分区(Partitioning)的概念。

        Kafka 中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。

        通过这个设计,就可以以分区这个粒度进行数据读写操作,每个Broker的各个分区独立处理请求,进而实现负载均衡,提升了整体系统的吞吐量。

        分区策略决定生产者将消息发送到哪个分区的算法

1、默认的分区器

        kafka在数据生产的时候,有一个数据分发策略。默认的情况使用DefaultPartitioner.class类。这个类中就是定义数据分发的策略。

kafka默认的分区器 org.apache.kafka.clients.producer.internals.DefaultPartitioner

使用默认分区器,生产者创建消息时,根据 参数决定发送到哪个分区:

1.1、黏性分区策略(2.4.0之前是轮询)- 未指定分区、key

        既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直 使用该分区,待该分区的batch已满或者已完成,Kafka再随机选一个分区进行使用(和上一次的分区不同)。 

        Sticky Partitioning Strategy会随机地选择一个分区并会尽可能地坚持使用该分区——即所谓的粘住这个分区

原因:

        kafka 在发送消息的时候 , 采用批处理方案 , 当达到一批后进行分送 , 但是如果一批数据中有不同分区的数据 , 就无法放置到一个批处理中, 而老版本中轮询方案 , 就会导致一批数据被分到多个小的批次中 , 从而影响效率 , 故在新版本中 , 采用这种粘性的划分策略。

例如:

        第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进 行使用(如果还是0会继续随机)。

1.2、hash分区策略

          没有指明partition值,但有key的情况下,将keyhash值与topic的 partition数进行取余得到partition值。 Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions

        注意: 如果 key 一直不变,同一个 key 算出来的 hash 值是个固定值。如果是固定值,这种 hash 取模就没有意义。
        例如:

        key1hash=5, key2hash=6 ,topicpartition=2,那 么key1** 对应的value1写入1号分区,key2对应的value2写入0号分区。

1.3、指定partition策略

        以上两种构造都会通过DefaultPartitioner进行数据分发操作。但指定分区后,不会调用DefaultPartitioner.partition() 方法。

        

        指明partition的情况下,直接将指明的值作为partition值; 例如partition=0,所有数据写入分区0。

2、自定义分区策略

         自定义分区策略 跟DefaultPartitioner实现方式一样。

1、创建一个类,实现Partitioner接口。

2、重写 partitioner中的方法,

        partitioner()方法的参数说明:

                参数1:topic       

                参数2:key值

                参数3:key值字节数组

                参数4:value数据

                参数5:value数据的字节数组

                参数6:集群对象

3、在 partitioner() 方法中编写自定义分区逻辑,返回分区编号。

4、在生产者配置信息中进行配置自定义分区:

spring.kafka.producer.properties.partitioner.class=配置类全路径

代码示例:

@Component
public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        String msgValues = value.toString();
        int partition;
        if (msgValues.contains("test")){
            partition = 0;
        }else {
            partition = 1;
        }
        return partition;
    }
    @Override
    public void close() {
        //Nothing to close
    }
    @Override
    public void configure(Map<String, ?> configs) {

    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐