1、本机的kafka环境配置,不再赘述

2、添加 pom 文件

 <!--kafka依赖-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.6</version>
        </dependency>

3、配置application.yml

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093
    producer:
      # # 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
      retries: 1
      #一个批次可以使用的内存大小
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
      acks: all
    consumer:
      # 默认的消费组ID
      group-id: java-group
      # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
       # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: latest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 批量一次最大拉取数据量
      max-poll-records: 65535
      #监测消费端心跳时间
      heartbeat-interval: 30000
      # 批量拉取间隔,要大于批量拉取数据的处理时间,时间间隔太小会有重复消费
      max.poll.interval.ms: 50000
      listener:
        #手工ack,调用ack后立刻提交offset
        ack-mode: MANUAL_IMMEDIATE
        #容器运行的线程数
        concurrency: 4

4、复写kafka的相关配置类:生产、消费相关配置

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@Configuration
public class KafkaMQConfig {
    // 此处配置代替zk
    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;
    // 消费组标识
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    // 偏移量的起始点
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    // 偏移量的提交方式
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    // 一次从kafka服务拉取的数据量
    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;
    // 监测消费端心跳时间
    @Value("${spring.kafka.consumer.heartbeat-interval}")
    private String heartbeatInterval;
    // 两次拉取数据的最大时间间隔
    @Value("${spring.kafka.consumer.max.poll.interval.ms}")
    private String maxPollIntervalMs;
    //生产者相关配置
    @Bean
    public KafkaProducer kafkaProducer() {
        Properties props = new Properties();
        // 设置接入点,请通过控制台获取对应 Topic 的接入点
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        // Kafka 消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 请求的最长等待时间
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        // 构造 Producer 对象,注意,该对象是线程安全的
        // 一般来说,一个进程内一个 Producer 对象即可
        // 如果想提高性能,可构造多个对象,但最好不要超过 5 个
        return new KafkaProducer<String, String>(props);
    }
    // 消费端相关配置
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

5、生产、消费的伪代码

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaUtils {

    private final KafkaTemplate<String, String> kafkaTemplate;


    /**
     * 发送消息
     */
    public void kafkaSendMsg(String topicName, String msg) {
        kafkaTemplate.send(topicName, msg);
        log.info("kafka成功发送消息给:" + topicName + ",内容为:" + msg);
    }

    /**
     * 监听消息
     */
    @KafkaListener(topics = {"test"}, groupId = "java-group",containerFactory="kafkaListenerContainerFactory")
    public void kafkaListener(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        log.info("这是消费者在消费消息:" + record.topic() + "----" + record.partition() + "----" + record.value());

        ack.acknowledge();
    }

}

6、测试消息发送

@RestController
@RequestMapping("/v1")
public class TestController {

    @Autowired
    private KafkaUtils kafkaUtils;

    /**
     * 测试卡夫卡消息
     * @return 结果
     */
    @GetMapping("/kafkaTest")
    public JSONObject kafkaTest() {
        kafkaUtils.kafkaSendMsg("test", "2022年11月10日上午发送的消息!!!");
        return null;
    }
}

经过测试!

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐