关于flink kafkaconsumer的偏移量存储位置测试
偏移量存储位置:1、如果设置了状态后端和checkpoint机制,同时有下面设置:FlinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true)(上面代码表示每次checkpoint的时候把组消费偏移量提交到kafka,默认为true)那么组消费偏移量会记录到 状态后端 和 kafka2、如果设置了状态后端和checkpoint机制,同时有下面设置:F
·
目录
偏移量存储位置:
1、如果设置了状态后端和checkpoint机制,同时有下面设置:
FlinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true)
(上面代码表示每次checkpoint的时候把组消费偏移量提交到kafka,默认为true)
那么组消费偏移量会记录到 状态后端 和 kafka
2、如果设置了状态后端和checkpoint机制,同时有下面设置:
FlinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false)
那么组消费偏移量只会记录到 状态后端
偏移量读取顺序:
先从状态后端读,如果没有设置状态后端就从kafka读,如果kafka没有记录组消费偏移量, 只能从 earliest或者latest或者timestamp读。
代码测试:
flink版本:1.12.1,idea直接运行
public class KafkaOffsetValidate0 {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.setStateBackend(new FsStateBackend("file:///D:/flink-state/validate1"));
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
p.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//从最开始位置消费
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//偏移量交给flink管理
p.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
FlinkKafkaConsumer<String> kafkasource = new FlinkKafkaConsumer<>("offset-validate",
new SimpleStringSchema(),p);
//checkpoint时是否提交组消费偏移量默认为true
//表示每次checkpoint的时候都要把组消费偏移量提交到kafka
//当没有状态后端就会读这里偏移量,优先读取状态后端的偏移量
//这里设置为false方便验证问题
//用命令sh kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test查看组消费偏移量
kafkasource.setCommitOffsetsOnCheckpoints(false);
DataStream<String> datasource = env.addSource(kafkasource, TypeInformation.of(String.class));
datasource.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
System.out.println(s);
if("exception".equals(s)){
double i = 1 / 0;
}
return s;
}
},TypeInformation.of(String.class)).print();
env.execute("test");
}
}
kafka脚本说明:
创建主题:sh kafka-topics.sh --zookeeper 127.0.0.1 --create --topic offset-validate --partitions 1 --replication-factor 1 生产数据:sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic offset-validate 查看主题已有消息量:sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic offset-validate --time -1 查看消费组偏移量:sh kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test
生产数据流程:
输入1,重启程序,1还会再被消费和打印输出。 答案:关闭程序时没有savepoint,启动程序时也没有指定从哪个checkpoint点恢复,同时组消费偏移量没有在checkpoint的时候记录到kafka,只能从头消费处理。
当输入exception时,程序抛出异常,但程序没有停止运行,还知道checkpoint点在哪,数据1没有被重复消费,但数据exception会被重复消费,然后再出异常,再重复消费,再出异常,不断循环。。。
运行效果:
更多推荐
已为社区贡献1条内容
所有评论(0)