1. kafka分区数据顺序性

kafka具有分区内数据有序的特点,可以通过将数据指定到特定的分区来实现数据的顺序性。kafka分区逻辑代码如下:如果指定了分区号生产,则发送到指定分区;否则调用分区器计算方法partitioner.partition()

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
            partition :
            partitioner.partition(
                    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

一共三个分区器实现了Partitioner类的partition()方法:

  1. DefaultPartitioner
    指定分区则使用;否则如果存在key则根据key去hash;否则batch满了切换分区。
  2. RoundRobinPartitioner
    没指定分区则平均分配循环写入分区。
  3. UniformStickyPartitioner
    和默认相比去除掉key取hash相关的规则。

综上,我们想实现数据顺序入kafka,可以指定分区写或者通过设置key值相同保证数据入同一个分区。但是要注意避免全部数据入同一分区的场景,最好将数据分组即保证组内数据有序而不是全局有序。

如果采用设置key值相同方式进行组内数据入同一分区,则计算分区方式如下:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    //key为null等同于UniformStickyPartitioner分区器
    if (keyBytes == null) {
        return stickyPartitionCache.partition(topic, cluster);
    } 
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    // key取hash后再取正值(并非绝对值)再对分区数量取余
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

经测试尽量保证key的前缀多样化来保证数据的均匀分布,可以对自己的数据进行测试来敲定key的定义方式:如下数据返回结果为:1 1 1 0 2 / 0 2 0 1 2 / 0 2 2 1 0

System.out.print(Utils.toPositive(Utils.murmur2("test11".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("test12".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("test13".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("test14".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("test15".getBytes()))%3+" ");
System.out.println();
System.out.print(Utils.toPositive(Utils.murmur2("1test1".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("2test1".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("3test1".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("4test1".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("5test1".getBytes()))%3+" ");
System.out.println();
System.out.print(Utils.toPositive(Utils.murmur2("1".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("2".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("3".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("4".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("5".getBytes()))%3+" ");
System.out.println();

2. Flink消费kafka的顺序性

首先构造三个分区的topic,然后写入测试数据:指定了key和每个key的版本号,以版本号升序方式写入kafka。

new ProducerRecord<>("test1", "a", "{\"key\":\"a\",\"value\":\"1\",\"time\":1623588192345}");
new ProducerRecord<>("test1", "b", "{\"key\":\"b\",\"value\":\"1\",\"time\":1623588192342}");
new ProducerRecord<>("test1", "a", "{\"key\":\"a\",\"value\":\"2\",\"time\":1623588192347}");
new ProducerRecord<>("test1", "b", "{\"key\":\"b\",\"value\":\"2\",\"time\":1623588192344}");
new ProducerRecord<>("test1", "c", "{\"key\":\"c\",\"value\":\"1\",\"time\":1623588192345}");
new ProducerRecord<>("test1", "c", "{\"key\":\"c\",\"value\":\"2\",\"time\":1623588192348}");
new ProducerRecord<>("test1", "b", "{\"key\":\"b\",\"value\":\"3\",\"time\":1623588192346}");
new ProducerRecord<>("test1", "a", "{\"key\":\"a\",\"value\":\"3\",\"time\":1623588192349}");
new ProducerRecord<>("test1", "b", "{\"key\":\"b\",\"value\":\"4\",\"time\":1623588192348}");

通过以下命令可以查看kafka topic数据和消费组情况:

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test1  --time -1
./kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group test1 --describe

以上数据分布情况如下:key a和key c位于1号分区,key b位于2号分区。

test1:0:0
test1:1:5
test1:2:4

编写flink代码消费kafka观察数据顺序性:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度为3对应分区数,但是只有
env.setParallelism(3);
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test1");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
String[] fields = new String[]{"key","value","time"};
TypeInformation[] type = new TypeInformation[3];
type[0] = Types.STRING;
type[1] = Types.STRING;
type[2] = Types.LONG;
RowTypeInfo rowTypeInfo = new RowTypeInfo(type, fields);

SingleOutputStreamOperator<Row> dataStreamSource = env.addSource(new FlinkKafkaConsumer(
                "test1",
                new SimpleStringSchema(),
                properties
        ).setStartFromEarliest())
                .process(new JsonToRow(rowTypeInfo), rowTypeInfo)
                .name("kafkaSource").uid("kafkaSource");
//source数据情况
dataStreamSource.print("source");
//按key对数据进行分区
KeyedStream<Row, String> key = dataStreamSource
        .keyBy((KeySelector<Row, String>) value ->  value.getFieldAs(0));
//keyby后数据情况
key.print("sink");

env.execute("kafkatest");

输出结果如下:

source:3> +I[a, 1, 1623588192345]
source:1> +I[b, 1, 1623588192342]
source:1> +I[b, 2, 1623588192344]
source:3> +I[a, 2, 1623588192347]
source:1> +I[b, 3, 1623588192346]
source:3> +I[c, 1, 1623588192345]
source:1> +I[b, 4, 1623588192348]
source:3> +I[c, 2, 1623588192348]
source:3> +I[a, 3, 1623588192349]
sink:2> +I[a, 1, 1623588192345]
sink:1> +I[b, 1, 1623588192342]
sink:2> +I[a, 2, 1623588192347]
sink:1> +I[b, 2, 1623588192344]
sink:1> +I[b, 3, 1623588192346]
sink:2> +I[c, 1, 1623588192345]
sink:1> +I[b, 4, 1623588192348]
sink:2> +I[c, 2, 1623588192348]
sink:2> +I[a, 3, 1623588192349]

可以看出source和sink每个线程输出的数据中均按key值的版本号升序排布,即flink消费kafka和进行keyby操作(shuffle)均为破坏kafka的分区有序性。

3. Flink消费kafka并进行checkpoint

在上文代码基础上配置checkpoint配置,设置为本地文件存储且任务停止时保留checkpoint文件

String topic = args[0];
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///Users/caster/Desktop/checkpoint");

// 两个checkpoint超时时间之间的间隔和语义
env.enableCheckpointing(5*1000, CheckpointingMode.AT_LEAST_ONCE);
// checkpoint超时时间
env.getCheckpointConfig().setCheckpointTimeout(60*1000);
// 最大同时进行的checkpoint任务,即多个Barrier进入处理流程
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 两次checkpoint之间的空闲时间最小值,设置后上一个配置则为1
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
// 设置为0表示不容忍checkpoint失败
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
// 重启策略2:每隔10s尝试重启一次共三次,超时一分钟则失败
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.seconds(60),Time.seconds(10)));

// 任务流取消和故障时会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

env.setParallelism(3);

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", topic);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
String[] fields = new String[]{"key","value","time"};
TypeInformation[] type = new TypeInformation[3];
type[0] = Types.STRING;
type[1] = Types.STRING;
type[2] = Types.LONG;
RowTypeInfo rowTypeInfo = new RowTypeInfo(type, fields);

SingleOutputStreamOperator<Row> dataStreamSource = env.addSource(new FlinkKafkaConsumer(
                topic,
                new SimpleStringSchema(),
                properties
        ).setStartFromEarliest())
                .process(new JsonToRow(rowTypeInfo), rowTypeInfo)
                .name("kafkaSource").uid("kafkaSource");
//source数据情况
dataStreamSource.print("source");
KeyedStream<Row, String> keyedStream = dataStreamSource
        .keyBy((KeySelector<Row, String>) value -> value.getFieldAs(0));
//keyby后数据情况
keyedStream.print("sink");

env.execute("kafkatest");

mvn打包带有依赖的jar包,然后到flink 客户端提交任务:

./flink run flink-compute-1.0-SNAPSHOT.jar test1

提交成功会返回job 的id:

Job has been submitted with JobID e7309690361278a82675c7d981057692

且在配置的checkpoint目录可以看到对应的job的目录,在不断进行新的checkpoint
checkpoint目录
在flink 的 web UI可以看到具体的两个print()输出,直到kafka数据全部消费完毕,如下图:
job 输出情况

关闭任务,并向kafka内新生产部分数据,测试指定checkpoint目录恢复任务,命令如下:

./flink run -s /Users/caster/Desktop/checkpoint/e7309690361278a82675c7d981057692/chk-19  flink-compute-1.0-SNAPSHOT.jar test1

相当于重新启动新的flink job,job id发生改变,checkpoint目录也随之改变。
观察到任务会从上次消费到的位置继续消费,红线上为第一个job输出,红线下为第二个job输出:
job 输出情况

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐