1.Kafka数据重复的解决方案:

对每个生产者生成的每条数据,都添加由生产者id,分区号,随机的序列号组成的标识符: (producerid,partition,SequenceId),通过标识符对数据进行去重。

2.Kafka数据丢失的解决方案:

  1. 设置enable.auto.commit=false,每次处理完手动提交,确保消息真的被消费并处理完成。Kafka的消息自动提交和手动提交_刘Java的博客-CSDN博客_kafka 自动提交
  2. 配置上消息重试的机制。Spring-Kafka —— 消费重试机制实现 - 曹伟雄 - 博客园

Kafka自动提交和手动提交说明:

使用原始apache-kafka依赖的API来消费数据:

  1. 如果enable.auto.committrue,则表示自动提交,但不会在拉取数据之后立即提交。在一次poll的数据处理完毕之后,将会在下一次poll数据的时候,首先检查是否到达了auto.commit.interval.ms自动提交间隔的时间,如果到达了(默认5s),那么会提交此前拉取的消息的最大偏移量,否则不会提交。
  2. 如果enable.auto.commitfalse,则表示手动提交,那么需要通过consumer.commitAsync()或者commitSync()手动提交偏移量,这两个方法将会提交目前最大的offset,否则重启之后将会消费此前的数据。

使用spring-kafka@Listener注解来消费数据:

  1. 如果enable.auto.committrue,则表示自动提交,但不会在拉取数据之后立即提交。在一次poll的数据处理完毕之后,将会在下一次poll数据的时候,首先检查是否到达了auto.commit.interval.ms自动提交间隔的时间,如果到达了(默认5s),那么会提交此前拉去的消息的最大偏移量,否则不会提交。
  2. 如果enable.auto.commitfalse,则表示手动提交,此时需要注意选择提交的模式AckMode。

AckMode模式可以在工厂类配置:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> manualListenerContainerFactory(
	ConsumerFactory<String, String> consumerFactory) {

	ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(consumerFactory);
	factory.getContainerProperties().setPollTimeout(1500);
	factory.setBatchListener(true);
	//配置手动提交offset,默认BATCH
	factory.getContainerProperties().setAckMode(AckMode.BATCH);
	return factory;
}
AckMode模式作用
BATCH默认的提交模式。当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交,由Spring帮我们提交。
RECORD当每一条记录被消费者监听器(ListenerConsumer)处理之后提交,由Spring帮我们提交。
TIME当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交,由Spring帮我们提交。
COUNT当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交,由Spring帮我们提交。
COUNT_TIMETIME和COUNT有一个条件满足时提交,由Spring帮我们提交。
MANUAL需要对监听消息的方法中引入 Acknowledgment参数,并在代码中调用acknowledge()方法进行手动提交。实际上,对于每一批poll()的数据,每次调用acknowledge()方法之后仅仅是将offset存放到本地map缓存,在下一次poll的时候,在poll新数据之前从缓存中拿出来批量提交,也就是说与BATCH有相同的语义。
MANUAL_IMMEDIATE需要对监听消息的方法中引入 Acknowledgment参数,并在代码中调用acknowledge()方法进行手动提交。实际上,对于每一批poll()的数据,每次调用acknowledge()方法之后立即进行偏移量的提交。

  • 由于默认的提交模式是BATCH,因此在使用@Listener注解来消费数据时,即使enable.auto.commitfalse,偏移量也会在每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交,这里的提交实际上是手动提交,但是这个“手动提交”操作由Spring帮我们做了,因此如果不设置AckModeMANUAL或者MANUAL_IMMEDIATE,我们仍然会觉得这些数据被“自动提交”了,实际上是由Spring帮我们执行了手动提交的代码,造成误解。
  • MANUALMANUAL_IMMEDIATE的区别是:MANUAL_IMMEDIATE是消费完一个消息就提交,MANUAL是处理完一批消息(默认500)之后,在下一次拉取消息之前批量提交。
  • 如果中间有一批数据没有提交,那么在一次消费过程中,这些没有提交的数据不会重复消费,而是会一直向后消费,除非重启消费者,会被再次消费。如果后面有消息的offset被提交,那么该offset之前的所有消息都算作已提交,重启之后也不会被再次消费。

​​​​​​​Spring Boot集成kafka的相关配置-CSDN博客

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐