一、pom文件

<!--Kafka 依赖-->
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

二、配置文件

在application.yml中添加配置文件

spring:
#kafka集群配置
  kafka:
    bootstrap-servers: 10.0.40.11:9092
    #初始化生产者配置
    producer:
      #重试次数
      retries: 0
      #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      acks: 1
      #批量大小
      batch-size: 16384
      properties:
        #当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
        #linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
        linger.ms: 1
      #生产端缓冲区大小
      buffer-memory: 33554432
      #Kafka提供的序列化和反序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    #初始化消费者配置
    consumer:
      #默认的消费组ID
      group-id: iosConsumerGroup
      #默认的消费主题
      topic-name: test,test2
      #是否自动提交offset
      enable-auto-commit: false
      #提交offset延时(接收到消息后多久提交offset)
#      auto-commit-interval: 1000ms
      #当kafka中没有初始offset或offset超出范围时将自动重置offset
      # earliest:重置为分区中最小的offset;
      # latest:重置为分区中最新的offset(消费分区中新产生的数据);
      # none:只要有一个分区不存在已提交的offset,就抛出异常;
      auto-offset-reset: latest
      properties:
        #消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
        session.timeout.ms: 15000
        #消费请求超时时间
        request.timeout.ms: 18000
      #Kafka提供的序列化和反序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #批量消费每次最多消费多少条消息
      #每次拉取一条,一条条消费,当然是具体业务状况设置
      max-poll-records: 1
      # 指定心跳包发送频率,即间隔多长时间发送一次心跳包,优化该值的设置可以减少Rebalance操作,默认时间为3秒;
      heartbeat-interval: 6000
    #监听器设置
    listener:
      #消费端监听的topic不存在时,项目启动会报错(关掉)
      missing-topics-fatal: false
      #设置消费类型 批量消费 batch,单条消费:single
      #type: batch
      #指定容器的线程数,提高并发量
      #concurrency: 3
      #手动提交偏移量
      ack-mode: manual

三、Kafka配置类

3.1 读取配置类

@Data
@SpringBootConfiguration
public class KafkaTopicProperties implements Serializable {
  /** 消费组id */
  @Value("${spring.kafka.consumer.group-id}")
  private String groupId;
  /**消费主题集合*/
  @Value("${spring.kafka.consumer.topic-name}")
  private String[] topicName;
}

3.2 主题配置类

@Configuration
public class KafkaTopicConfiguration {
  private final KafkaTopicProperties properties;

  public KafkaTopicConfiguration(KafkaTopicProperties properties) {
    this.properties = properties;
  }

  @Bean
  public String[] kafkaTopicName() {
    return properties.getTopicName();
  }

  @Bean
  public String topicGroupId() {
    return properties.getGroupId();
  }
}

四、Kafka生产者与消费者配置

4.1 消费者(监听器)

@Slf4j
@Service
public class KafkaConsumer {

  @Autowired private KafkaHandleMessageService kafkaHandleMessageService;

  @KafkaProcessLog //自定义AOP切面,可记录消费日志
  @KafkaListener(topics = "#{kafkaTopicName}", groupId = "#{topicGroupId}")
  public Result processMessage(ConsumerRecord<Integer, String> consumerRecord, Acknowledgment ack) {
    Result result = new Result();
    try {
      // 根据业务消费消息
      result = kafkaHandleMessageService.handleMessage(consumerRecord);
      // 采用手动提交
      ack.acknowledge();
      return result;
    } catch (Exception e) {
      log.error("Kafka消息消费异常,错误原因为:{}", e.getMessage());
      result.setStatus(Result.FAILURE);
      result.setMsg("Kafka消息消费异常,错误原因为:" + e.getMessage());
      return result;
    } finally {
      // 最终手动提交,防止异常捕获漏提交,重复消费
      ack.acknowledge();
    }
  }
}

4.2 生产者

@Slf4j
@Service
public class KafkaProducer {

  @Autowired private KafkaTemplate<Integer, String> kafkaTemplate;

  public void sendMessage(String topic, String data) {
    log.info("kafka sendMessage start");
    ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, data);
    // 回调函数
    future.addCallback(
        new ListenableFutureCallback<SendResult<Integer, String>>() {
          @Override
          public void onFailure(Throwable ex) {
            log.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);
          }

          @Override
          public void onSuccess(SendResult<Integer, String> result) {
            log.info("kafka sendMessage success topic = {}, data = {}", topic, data);
          }
        });
    log.info("kafka sendMessage end");
  }

五、AOP切面配置

5.1 自定义切面注解

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface KafkaProcessLog {
}

5.2 切面处理类

@Aspect
@Component
@Slf4j
public class KafkaProcessAspect {
  @Autowired private KafkamessagelogService kafkamessagelogService;

  @Pointcut("@annotation(cn.com.sinosoft.crs.common.aop.annotation.KafkaProcessLog)")
  public void kafkaPointcut() {
    // Do nothing
  }

  @Around("kafkaPointcut()")
  public void around(ProceedingJoinPoint joinPoint) throws Throwable {
    log.info("Kafka消息处理已启动...");
    long beginTime = System.currentTimeMillis();
    ConsumerRecord<Integer, String> consumerRecord;
    // ==================================保存Kafka消息============================================
    Object[] args = joinPoint.getArgs();
    consumerRecord = (ConsumerRecord<Integer, String>) args[0];
    // 1.消费前保存日志
    Result result = new Result();
    try {
      result = kafkamessagelogService.saveKafkaMeessageLog(consumerRecord);
    } catch (Exception e) {
      log.error("Kafka消息保存失败,错误原因为{}", e.getMessage());
    }
    KafkamessagelogEntity kafkamessagelogEntity = (KafkamessagelogEntity) result.getData();

    // ==================================输出Kafka消息============================================
    log.info(
        "Kafka本次消费内容如下,消费分区编码:{},偏移量:{},主题:{},表:{},表内容:{}",
        consumerRecord.partition(),
        consumerRecord.offset(),
        consumerRecord.topic(),
        consumerRecord.key(),
        // 判断value值是否为空
        StrUtil.isBlank(consumerRecord.value())
            ? ""
            : JsonUtils.transToLowerObject(consumerRecord.value()).toJSONString());

    // ==================================更新Kafka消息============================================
    try {
      // 2.执行方法
      result = (Result) joinPoint.proceed();
      // 3.更新日志信息
      result = kafkamessagelogService.updateKafkaMeessageLog(kafkamessagelogEntity, result);
      log.info("kafka消息更新{}", result.getStatus() ? "成功!" : "失败!");
    } catch (Exception e) {
      // 消费异常时更新日志
      result.setStatus(Result.FAILURE);
      result.setMsg(ExceptionUtils.ex2Str(e));
      kafkamessagelogService.updateKafkaMeessageLog(kafkamessagelogEntity, result);
      log.error("Kafka消息消费转换失败,错误原因为{}", e.getMessage());
    }
    // 4.消费后更新日志
    // 执行时长(毫秒)
    long time = System.currentTimeMillis() - beginTime;
    log.info("Kafka消费处理已结束,共用时{}毫秒!", time);
  }
}

以上,请参考,谢谢!

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐