参考连接:

kafka的ACK参数详解 - 岚樱 - 博客园


kafka消息丢失情况与解决方案 - 绝症色盲 - 博客园

 

需要结合ack 参数的理解,isr 来学习,总结:

不同的上下游节点之间能否做到exactlyonce要看组件功能是否直接,比较典型的应用就是flink

对kafka的操作;通常在对kafka的上下游节点做相应配置即可,如写入时指定ack 或 读取时 设置

auto.commit.enable 或在flink中设置 checkpointingMode为exactlyonce

producer = KafkaProducer(bootstrap_servers='xxx1:9092,xxx2:9092',acks=1,retries =3,batch_size=524288,reconnect_backoff_max_ms=3000,buffer_memory=536870912)

producer参数参考连接:kafka producer参数详解_weixin_33725272的博客-CSDN博客

self.consumer = KafkaConsumer(self.source_kafka_config.get("topic"),
                     group_id=self.source_kafka_config.get("source_group_id"),
                     bootstrap_servers=self.source_kafka_config.get("bootstrap_servers"),
                     value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                     enable_auto_commit=False,
                     auto_offset_reset="latest")
#bug 数据丢失
s_env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)

1、Kafka消息丢失的情况:

(1)auto.commit.enable=true,消费端自动提交offersets设置为true,当消费者拉到消息之后,还没有处理完 commit interval 提交间隔就到了,提交了offersets。这时consummer又挂了,重启后,从下一个offersets开始消费,之前的消息丢失了。

(2)网络负载高、磁盘很忙,写入失败,又没有设置消息重试,导致数据丢失。

(3)磁盘坏了已落盘数据丢失。

(4)单 批 数 据 的 长 度 超 过 限 制 会 丢 失 数 据 , 报kafka.common.Mess3.ageSizeTooLargeException异常

2、Kafka避免消息丢失的解决方案:

(1)设置auto.commit.enable=false,每次处理完手动提交。确保消息真的被消费并处理完成。

(2)kafka 一定要配置上消息重试的机制,并且重试的时间间隔一定要长一些,默认 1 秒钟不符合生产环境(网络中断时间有可能超过 1秒)。

(3)配置多个副本,保证数据的完整性。

(4)合理设置flush间隔。kafka 的数据一开始就是存储在 PageCache 上的,定期 flush 到磁盘上的,也就是说,不是每个消息都被存储在磁盘了,如果出现断电或者机器故障等,PageCache 上的数据就丢。可以通过 log.flush.interval.messages 和 log.flush.interval.ms 来 4.配置 flush 间隔,interval大丢的数据多些,小会影响性能但在 0.本,可以通过 replica机制保证数据不丢,代价就是需要更多资源,尤其是磁盘资源,kafka 当前支持 GZip 和 Snappy压缩,来缓解这个问题 是否使用 replica 取决于在可靠性和资源代价之间的 balance。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐