1.问题问题简介及背景

在使用Flink自带的Kafka消费API时,我们可以像单纯的使用Kafka消费对象API对其进行相应的属性设置,例如,读取offset的方式、设置offset的方式等。但是,Flink具有checkpoint功能,保存各运算算子的状态,也包括消费kafka时的offset,那这两种情况分别在什么时候起作用呢?

2. Flink checkpoint设置

flink并不依赖kafka或zookeeper保证容错,其保存offset只是为了外部来查询监视kafka数据的消费情况。但其提供了提交消费kafka数据的offset给Kafka或者zookeeper(kafka0.8之前)的配置,因此最终是由kafka自动设置offset,还是由flink的checkpoint机制进行最终的offset设置,取决于开发过程中的相关设置。

配置offset的提交方式取决于是否为job设置开启checkpoint,可以使用env.enableCheckpointing(milliseconds)来设置开启checkpoint。

如果禁用了checkpoint,那么offset位置的提交取决于Flink读取kafka客户端的配置,enable.auto.commit ( auto.commit.enable【Kafka 0.8】)配置是否开启自动提交offset, auto.commit.interval.ms决定自动提交offset的周期。

如果开启了checkpoint,那么当checkpoint保存状态完成后,将checkpoint中保存的offset位置提交到kafka。这样保证了Kafka中保存的offset和checkpoint中保存的offset一致,可以通过配置setCommitOffsetsOnCheckpoints(boolean)来配置是否将checkpoint中的offset提交到kafka中(默认是true)。如果使用这种方式,那么properties中配置的kafka offset自动提交参数enable.auto.commit和周期提交参数auto.commit.interval.ms参数将被忽略。

    // checkpoint设置
    // 每隔1s进行启动一个检查点【设置checkpoint的周期】
    env.enableCheckpointing(1000);
    // 设置模式为:exactly_one,仅一次语义
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    // 确保检查点之间有1s的时间间隔【checkpoint最小间隔】
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
    // 检查点必须在1s之内完成,或者被丢弃【checkpoint超时时间】
    env.getCheckpointConfig().setCheckpointTimeout(1000);
    // 同一时间只允许进行一次检查点
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    //表示一旦Flink程序被cancel后,会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    //设置statebackend,将检查点保存在hdfs上面,默认保存在内存中。这里先保存到本地
    // env.setStateBackend(new FsStateBackend("file:///F:/kafkaTool/aaa"));

    // Kafka相关设置
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop:9091,hadoop:9092,hadoop:9093")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    // properties.setProperty("auto.offset.reset", "latest") // 默认是latest,当第一次运行,会读取最近提交的offset
    // properties.setProperty("enable.auto.commit", "true") // 默认是true

    val flinkKafkaConsumer011 = new FlinkKafkaConsumer011[String]("GMALL_EVENT_0105", new SimpleStringSchema(), properties)
    // 设置根据程序checkpoint进行offset提交
    flinkKafkaConsumer011.setCommitOffsetsOnCheckpoints(true)
    flinkKafkaConsumer011.setStartFromGroupOffsets()

无论是否设置checkpoint,auto.offset.reset都默认为latest,enable.auto.commit都默认为true,第一次启动程序,会根据FlinkKafkaConsumer011对象的设置读取相应的offsets。

  1. setStartFromEarliest():从消息最开始消费;
  2. setStartFromLatest():从消息的最后开始消费;
  3. setStartFromTimestamp(long startupOffsetsTimestamp):从设定的时间戳消费;
  4. setStartFromGroupOffsets():从kafka保存的消费者组的offsets消费,如果没有,会根据auto.offset.reset设定进行消费(auto.offset.reset默认为latest),也就是说我第一次启动程序的时候从哪里消费也需要考虑;
    setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets):从特定offset消费;
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐