1、添加maven 依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2、配置producer

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @Describe:
 * @Created by 2020-07-27 15:57
 */
@Configuration
public class KafkaProducerConfig {

    /**
     * 主要配置kafka的 servers 和 groupId
     */
    @Autowired
    private KafkaConfig kafkaConfig;

    @Bean
    public ProducerFactory<String, String> dataGroupProducerFactory(){
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getDataGroupServers());
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configs);
    }

    @Bean(name = "kafkaTemplate4DataGroup")
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(dataGroupProducerFactory());
    }
}

2、

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @Describe:
 * @Created by 2020-07-27 15:56
 */
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Autowired
    private KafkaConfig kafkaConfig;


    /**
     * 默认的,只能固定配置一个消费组,若多个消费组,需要 指定配置 @ConditionalOnMissingBean
     */
    /*****************************************************************************************************
     * *********  实现kafka默认的 consumerFactory 和 kafkaListenerContainerFactory bean注入 ****************
     * ***************************************************************************************************
     */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getDataGroupServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_1");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(3);
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
    
    
    /**********************************************************************************************
     * ***********************  自定义配置 多个监听 和  多group **************************************
     * ********************************************************************************************
     */

    @Bean
    @ConditionalOnMissingBean(name = "dataGroupConsumerFactory")
    public ConsumerFactory<String, String> dataGroupConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getDataGroupServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getDataGroupConsumerGroupId());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    @ConditionalOnMissingBean(name = "dataGroupKafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> dataGroupKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(3);
        factory.setConsumerFactory(dataGroupConsumerFactory());
        return factory;
    }

    @Bean
    @ConditionalOnMissingBean(name = "dataGroupConsumerFactory2")
    public ConsumerFactory<String, String> dataGroupConsumerFactory2() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getDataGroupServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getDataGroupConsumerGroupId());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    @ConditionalOnMissingBean(name = "dataGroupKafkaListenerContainerFactory2")
    public ConcurrentKafkaListenerContainerFactory<String, String> dataGroupKafkaListenerContainerFactory2() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(3);
        factory.setConsumerFactory(dataGroupConsumerFactory2());
        return factory;
    }
}

3、kafkaConfig的基本属性配置

@Slf4j
@Data
@Component
@RefreshScope
public class KafkaConfig {

    @Value("${kafka.data.group.bootstrapServers}")
    private String dataGroupServers;

    @Value("${kafka.data.group.consumer.groupId}")
    private String dataGroupConsumerGroupId;
}

4、producerServiceImpl (接口忽略了)

@Service
public class KafkaProducerDataGroupServiceImpl implements IKafkaProducerService {

    @Autowired
    @Qualifier("kafkaTemplate4DataGroup")
    private KafkaTemplate<String, String> kafkaTemplate4DataGroup;

    @Override
    public void sendMessage(String topic, String message) {
        kafkaTemplate4DataGroup.send(topic, message);
    }

    @Override
    public void sendMessage(String topic, String key, String message) {
        kafkaTemplate4DataGroup.send(topic, key, message);
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic){
        List<PartitionInfo> partitionInfos = kafkaTemplate4DataGroup.partitionsFor(topic);
        return partitionInfos;
    }
}

5、消费端服务 

@Service
@Slf4j
public class KafkaConsumerServiceImpl implements IKafkaConsumerService {

    /**
     * kafkaListenerContainerFactory
     * dataGroupKafkaListenerContainerFactory
     * @see cn.quantgroup.vcc.talos.config.kafka.KafkaConsumerConfig
     */
    @KafkaListener(topics = "topic_test",containerFactory = "dataGroupKafkaListenerContainerFactory")
    public void listener(ConsumerRecord<String, String> record) {
        String value = record.value();
        log.info("【receive】:{}", value);
    }

    @KafkaListener(topics = "topic_test",containerFactory = "kafkaListenerContainerFactory")
    public void listener2(ConsumerRecord<String, String> record) {
        String value = record.value();
        log.info("【receive====2 】:{}", value);
    }
}

6、测试验证

    @Autowired
    private IKafkaProducerService kafkaProducerService;

    @GetMapping("test1")
    public JsonResult<SfqPaymentResp> test1(HttpServletRequest request) {
        String topic = "topic_test";
        String msg = "this is a getst";
        kafkaProducerService.sendMessage(topic,msg);

kafka 组的概念简单说明(回补):

1、producer:   topic  key  分区

2、consumer:   group 概念

特别需要注意的是:

1、如果只配置一个消费监听组,则按照 consumerFactory()和kafkaListenerContainerFactory() 修改里面的具体配置即可。

2、若配置多个消费监听组(多个groupId),则至少要把1中kafka 默认的consumerFactory()和kafkaListenerContainerFactory()配置上,然后在配置自定义的consumerFactory 和 kafkaListenerContainerFactory,并且通过注解 @ConditionalOnMissingBean 标识。consumerFactory()和kafkaListenerContainerFactory() 是springboot 集成kafka 时,会先找默认配置,所以默认的配置不能少,否则启动会报错:

Description:

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.

 The following candidates were found but could not be injected:
  - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans of type 'org.springframework.kafka.core.ConsumerFactory' consumerFactory
  - User-defined bean method 'consumerFactory' in 'KafkaConsumerConfig'


 Action:

 Consider revisiting the entries above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.

3、如果不想配置kafka 默认的consumerFactory()和kafkaListenerContainerFactory() ,

针对上面报错google 得到另一种方案是,在application.properties配置文件中,把kafka 自动配置的给过滤掉。

spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration

具体没验证。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐