kafka消息丢失情况与解决方案
参考连接:kafka的ACK参数详解 - 岚樱 - 博客园kafka消息丢失情况与解决方案 - 绝症色盲 - 博客园需要结合ack 参数的理解,isr 来学习,总结:不同的上下游节点之间能否做到exactlyonce要看组件功能是否直接,比较典型的应用就是flink对kafka的操作;通常在对kafka的上下游节点做相应配置即可。1、Kafka消息丢失的情况:(1)auto.commit.enab
参考连接:
kafka的ACK参数详解 - 岚樱 - 博客园
kafka消息丢失情况与解决方案 - 绝症色盲 - 博客园
需要结合ack 参数的理解,isr 来学习,总结:
不同的上下游节点之间能否做到exactlyonce要看组件功能是否直接,比较典型的应用就是flink
对kafka的操作;通常在对kafka的上下游节点做相应配置即可,如写入时指定ack 或 读取时 设置
auto.commit.enable 或在flink中设置 checkpointingMode为exactlyonce
producer = KafkaProducer(bootstrap_servers='xxx1:9092,xxx2:9092',acks=1,retries =3,batch_size=524288,reconnect_backoff_max_ms=3000,buffer_memory=536870912)
producer参数参考连接:kafka producer参数详解_weixin_33725272的博客-CSDN博客
self.consumer = KafkaConsumer(self.source_kafka_config.get("topic"), group_id=self.source_kafka_config.get("source_group_id"), bootstrap_servers=self.source_kafka_config.get("bootstrap_servers"), value_deserializer=lambda m: json.loads(m.decode('utf-8')), enable_auto_commit=False, auto_offset_reset="latest")
#bug 数据丢失 s_env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
1、Kafka消息丢失的情况:
(1)auto.commit.enable=true,消费端自动提交offersets设置为true,当消费者拉到消息之后,还没有处理完 commit interval 提交间隔就到了,提交了offersets。这时consummer又挂了,重启后,从下一个offersets开始消费,之前的消息丢失了。
(2)网络负载高、磁盘很忙,写入失败,又没有设置消息重试,导致数据丢失。
(3)磁盘坏了已落盘数据丢失。
(4)单 批 数 据 的 长 度 超 过 限 制 会 丢 失 数 据 , 报kafka.common.Mess3.ageSizeTooLargeException异常
2、Kafka避免消息丢失的解决方案:
(1)设置auto.commit.enable=false,每次处理完手动提交。确保消息真的被消费并处理完成。
(2)kafka 一定要配置上消息重试的机制,并且重试的时间间隔一定要长一些,默认 1 秒钟不符合生产环境(网络中断时间有可能超过 1秒)。
(3)配置多个副本,保证数据的完整性。
(4)合理设置flush间隔。kafka 的数据一开始就是存储在 PageCache 上的,定期 flush 到磁盘上的,也就是说,不是每个消息都被存储在磁盘了,如果出现断电或者机器故障等,PageCache 上的数据就丢。可以通过 log.flush.interval.messages 和 log.flush.interval.ms 来 4.配置 flush 间隔,interval大丢的数据多些,小会影响性能但在 0.本,可以通过 replica机制保证数据不丢,代价就是需要更多资源,尤其是磁盘资源,kafka 当前支持 GZip 和 Snappy压缩,来缓解这个问题 是否使用 replica 取决于在可靠性和资源代价之间的 balance。
更多推荐
所有评论(0)