前言

大名鼎鼎的消息中间件Kafka大家一定不陌生吧,使用消息中间件的时候最怕的就是消息丢失了,如何解决这个问题呢?或许大家都知道,消费者端手动提交offset嘛。那么具体代码该怎么写呢?本文就基于springboot来进行消费者手动提交offset的试验。

配置

application.yml

spring:
  kafka:
    # 指定 kafka 地址可以多个
    bootstrap-servers:
      - 192.168.130.128:9092
      - 192.168.130.128:9093
      - 192.168.130.128:9094
    # 指定listener 容器中的线程数,用于提高并发量
    listener:
      concurrency: 3
      ack-mode: manual

    # 消费者的配置
    consumer:
      # 指定默认消费者group id
      group-id: test-group
      #earliest
      #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      #latest
      #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      #none
      #topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: earliest
      # 是否开启自动提交
      enable-auto-commit: false
      # key,value的解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Consumer

package com.study.springboot.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;


@Component
public class MyConsumer {
    @KafkaListener(topics = {"offsettest"})
    public void processMessage(ConsumerRecord<?, ?> record, Acknowledgment ack) {
        try {
            System.out.printf("topic is %s, offset is %d,partition is %s, value is %s \n", record.topic(), record.offset(),record.partition(), record.value());
            // 手动提交offset
            ack.acknowledge();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

重点是ack.acknowledge();这一行,手动提交offset,我们先注释掉该行,启动springboot项目,然后发送消息hello world

在这里插入图片描述
可以看到消费者消费到了该条消息,但是由于我们没有提交offset,此时重启springboot项目。
在这里插入图片描述
发现该条消息仍然被消费到了。

然后我们取消注释,再重启,此时仍然可以消费到该条消息,但是与之前不同的是,此时我们提交了offset,所以再重启的时候就不会消费到该条消息了。
在这里插入图片描述

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐