1.yml配置文件(简单配置)

spring:
  kafka:
    bootstrap-servers: ip:端口
    consumer:
      group-id: group-test
      enable-auto-commit: true
      auto-commit-interval: 1000ms
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2.类配置方式(使用了SASL/PLAINTEXT安全认证协议)

出现[Consumer clientId=consumer-1, groupId=group1] Bootstrap broker xxx (id: -1 rack: null) disconnected报错信息就需要用这种配置方式,加入安全认证

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

/**
 * @author 向振华
 * @date 2021/05/10 15:58
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(10);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>(16);
        // 服务地址
        propsMap.put("bootstrap.servers", "ip:端口");
        // 安全认证协议
        propsMap.put("security.protocol", "SASL_PLAINTEXT");
        propsMap.put("sasl.mechanism", "PLAIN");
        // 填充安全认证用户名和密码
        propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"usnm\" password=\"pwd\";");
        propsMap.put("group.id", "group-test");
        propsMap.put("enable.auto.commit", true);
        propsMap.put("auto.commit.interval.ms", 1000);
        // latest: 从最新的偏移量开始消费
        propsMap.put("auto.offset.reset", "latest");
        // 反序列化方式
        propsMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        propsMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return propsMap;
    }
}

再加一个配置文件sasl.jaas.config到resource

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="kafkaadmin"
    password="kafkaadminpwd"
    user_kafkaadmin="kafkaadminpwd"
    user_kafkaclient1="kafkaclient1pwd"
    user_kafkaclient2="kafkaclient2pwd";
}; 

使用:

@Slf4j
@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"xxx"})
    public void listener(ConsumerRecord<?, ?> record) {
        log.info("收到消息 ---> " + record.value().toString());
    }
}

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐