Flink消费kafka的offset设置
1.问题问题简介及背景在使用Flink自带的Kafka消费API时,我们可以像单纯的使用Kafka消费对象API对其进行相应的属性设置,例如,读取offset的方式、设置offset的方式等。但是,Flink具有checkpoint功能,保存各运算算子的状态,也包括消费kafka时的offset,那这两种情况分别在什么时候起作用呢?2. Flink checkpoint设置flink并不依赖kaf
1.问题问题简介及背景
在使用Flink自带的Kafka消费API时,我们可以像单纯的使用Kafka消费对象API对其进行相应的属性设置,例如,读取offset的方式、设置offset的方式等。但是,Flink具有checkpoint功能,保存各运算算子的状态,也包括消费kafka时的offset,那这两种情况分别在什么时候起作用呢?
2. Flink checkpoint设置
flink并不依赖kafka或zookeeper保证容错,其保存offset只是为了外部来查询监视kafka数据的消费情况。但其提供了提交消费kafka数据的offset给Kafka或者zookeeper(kafka0.8之前)的配置,因此最终是由kafka自动设置offset,还是由flink的checkpoint机制进行最终的offset设置,取决于开发过程中的相关设置。
配置offset的提交方式取决于是否为job设置开启checkpoint,可以使用env.enableCheckpointing(milliseconds)来设置开启checkpoint。
如果禁用了checkpoint,那么offset位置的提交取决于Flink读取kafka客户端的配置,enable.auto.commit ( auto.commit.enable【Kafka 0.8】)配置是否开启自动提交offset, auto.commit.interval.ms决定自动提交offset的周期。
如果开启了checkpoint,那么当checkpoint保存状态完成后,将checkpoint中保存的offset位置提交到kafka。这样保证了Kafka中保存的offset和checkpoint中保存的offset一致,可以通过配置setCommitOffsetsOnCheckpoints(boolean)来配置是否将checkpoint中的offset提交到kafka中(默认是true)。如果使用这种方式,那么properties中配置的kafka offset自动提交参数enable.auto.commit和周期提交参数auto.commit.interval.ms参数将被忽略。
// checkpoint设置
// 每隔1s进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(1000);
// 设置模式为:exactly_one,仅一次语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有1s的时间间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
// 检查点必须在1s之内完成,或者被丢弃【checkpoint超时时间】
env.getCheckpointConfig().setCheckpointTimeout(1000);
// 同一时间只允许进行一次检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//表示一旦Flink程序被cancel后,会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//设置statebackend,将检查点保存在hdfs上面,默认保存在内存中。这里先保存到本地
// env.setStateBackend(new FsStateBackend("file:///F:/kafkaTool/aaa"));
// Kafka相关设置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop:9091,hadoop:9092,hadoop:9093")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
// properties.setProperty("auto.offset.reset", "latest") // 默认是latest,当第一次运行,会读取最近提交的offset
// properties.setProperty("enable.auto.commit", "true") // 默认是true
val flinkKafkaConsumer011 = new FlinkKafkaConsumer011[String]("GMALL_EVENT_0105", new SimpleStringSchema(), properties)
// 设置根据程序checkpoint进行offset提交
flinkKafkaConsumer011.setCommitOffsetsOnCheckpoints(true)
flinkKafkaConsumer011.setStartFromGroupOffsets()
无论是否设置checkpoint,auto.offset.reset都默认为latest,enable.auto.commit都默认为true,第一次启动程序,会根据FlinkKafkaConsumer011对象的设置读取相应的offsets。
- setStartFromEarliest():从消息最开始消费;
- setStartFromLatest():从消息的最后开始消费;
- setStartFromTimestamp(long startupOffsetsTimestamp):从设定的时间戳消费;
- setStartFromGroupOffsets():从kafka保存的消费者组的offsets消费,如果没有,会根据auto.offset.reset设定进行消费(auto.offset.reset默认为latest),也就是说我第一次启动程序的时候从哪里消费也需要考虑;
setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets):从特定offset消费;
更多推荐
所有评论(0)