1.背景

随着业务的发展,服务进行拆分,由原先的一个kafka根据业务线拆分成了多个kafka,现在底层又不支持多kafka数据源,只能自己对kafka进行二次封装

2.kafka配置说明 

下面配置了两套kafka,

主kafka为 primary: kafka_db_data1和第二个数据源kafka_db_data2,以后有第三个kafka可以按照同样的方式进行配置。

kafkas:
  kafka:
    enabled: true
    primary: kafka_db_data1
    producer:
      enabled: true
    consumer:
      enabled: true
    source:
      kafka_db_data1:
        bootstrap-servers: 172.16.**.**:9092,172.16.**.**:9092,172.16.**.**:9092
        producer:
          enabled: true
          retries: 0
          acks: 1
          batch-size: 16384
          properties:
            linger: 50
          buffer-memory: 33554432
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          enabled: true
          group-id: test_group
          max-poll-records: 100
          fetch-max-wait: 100
          fetch-min-size: 100
          enable-auto-commit: true
          heartbeat-interval: 1000
          client-id: ""
          auto-offset-reset: latest
          auto-commit-interval: 1000
          workNum: 1
          pollTime: 100
      kafka_db_data2:
        bootstrap-servers: 172.16.**.**:9092,172.16.**.**:9092,172.16.**.**:9092
        producer:
          enabled: true
          retries: 0
          acks: 1
          batch-size: 16384
          properties:
            linger: 50
          buffer-memory: 33554432
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          enabled: true
          group-id: test_group
          max-poll-records: 100
          fetch-max-wait: 100
          fetch-min-size: 100
          enable-auto-commit: true
          heartbeat-interval: 1000
          client-id: ""
          auto-offset-reset: latest
          auto-commit-interval: 1000
          workNum: 1
          pollTime: 100

3.加载kafka源

kafka配置加载

3.1KafkaProperties

@ConditionalOnProperty(prefix = "kafkas.kafka",name = "enabled")
@ConfigurationProperties(prefix = "kafkas.kafka")
public class KafkaProperties {

    private String primary;
    private Map<String, KafkaDetail> source;

}

 KafkaDetail

public class KafkaDetail {

    private String bootstrapServers;
    private Producer producer;
    private Consumer consumer;
    private Listener listener;

}

Producer

public class Producer {

    private int retries;
    private int batchSize;
    private int bufferMemory;
    private String acks;
    private Properties properties;
    private Boolean enabled;

}

Consumer

public class Consumer {

    private String groupId;
    private Integer maxPollRecords = 100;
    private String fetchMaxWait;
    private String fetchMinSize;
    private boolean enableAutoCommit = true;
    private Integer heartbeatInterval = 3000;
    private String clientId;
    private String autoOffsetRest = "latest";
    private Integer autoCommitInterval = 5000;
    private Integer workerNum = 1;
    private Boolean enabled;
    private Integer pollTime = 100;
    private Integer retries = 0;

}

3.2KafkaAutoConfiguration

加载配置类,此类初始化了多个producer,将多个producer初始化好后,放入Map,交由spring进行管理

package com.rongss.boot.middleware.kafka;

import com.google.common.collect.Maps;
import com.rongss.boot.middleware.kafka.producer.KafkaTemplate;
import com.rongss.boot.middleware.kafka.producer.impl.KafkaTemplateImpl;
import com.rongss.boot.middleware.kafka.properties.KafkaDetail;
import com.rongss.boot.middleware.kafka.properties.KafkaProperties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;


@Configuration
@ConditionalOnProperty(prefix = "kafkas.kafka",name = "enabled")
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaAutoConfiguration {

    private KafkaProducer<String, String> kafkaProducer;

    @Autowired
    private KafkaProperties kafkaProperties;

    public Properties producerConfigs(KafkaDetail kafkaDetail) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaDetail.getBootstrapServers());
        props.put(ProducerConfig.RETRIES_CONFIG, kafkaDetail.getProducer().getRetries());
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaDetail.getProducer().getBatchSize());
        props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaDetail.getProducer().getProperties().getLinger());
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaDetail.getProducer().getBufferMemory());
        props.put(ProducerConfig.ACKS_CONFIG, kafkaDetail.getProducer().getAcks());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    public KafkaProducer<String, String> kafkaProducer(KafkaDetail kafkaDetail) {
        kafkaProducer = new KafkaProducer<>(producerConfigs(kafkaDetail));
        return kafkaProducer;
    }

    @Bean
    @ConditionalOnProperty(prefix = "kafkas.kafka.producer",name = "enabled")
    public Map<String,KafkaTemplate> kafkaTemplate(){
        Map<String,KafkaDetail> sources = kafkaProperties.getSource();
        Map<String,KafkaTemplate> templateMap = new HashMap<>();

        for (String key : sources.keySet()){
            KafkaDetail kafkaDetail = sources.get(key);
            if(kafkaDetail.getProducer() != null && kafkaDetail.getProducer().getEnabled()){
                KafkaTemplate template = new KafkaTemplateImpl(kafkaProducer(kafkaDetail));
                templateMap.put(key,template);
            }
        }
        return templateMap;
    }

    public Map<String, Object> consumerConfigs(KafkaDetail kafkaDetail) {
        Map<String, Object> props = Maps.newHashMap();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaDetail.getBootstrapServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaDetail.getConsumer().getGroupId());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaDetail.getConsumer().getMaxPollRecords());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaDetail.getConsumer().isEnableAutoCommit());
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, kafkaDetail.getConsumer().getFetchMaxWait());
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaDetail.getConsumer().getHeartbeatInterval());
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaDetail.getConsumer().getClientId());
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaDetail.getConsumer().getAutoCommitInterval());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaDetail.getConsumer().getAutoOffsetRest());
        return props;
    }

}

producer发消息

消息实体

public class MessageEvent implements Serializable {

    private static final long serialVersionUID = -2624253925403159396L;
    /**
     * 事件序列ID
     */
    private String txId;
    /**
     * 话题的名字
     */
    private String topic;
    /**
     * 话题的名字
     */
    private String tag;
    /**
     * 需要传递的领域对象
     */
    private Object body;
    /**
     * 传递的领域对象的唯一标识,用来构建消息的唯一标识,不检测重复,可以为空,不影响消息收发
     */
    private String bodyKey;
    /**
     * 事件创建时间
     */
    private long createdDate = System.currentTimeMillis();
    /**
     * 消息类型
     */
    private MessageType messageType;


    /**
     * 方便的生成TxId的方法
     *
     * @return
     */
    public String generateTxId() {
        if (null == txId) {
            txId = getTopic() + ":" + getTag() + ":";
            if (StringUtil.isNullOrEmpty(bodyKey)) {
                txId = txId + getCreatedDate() + ":" + UUID.randomUUID().toString();
            } else {
                txId = txId + bodyKey;
            }
        }
        return txId;
    }

}

producer接口类

public interface KafkaTemplate {

    Future<RecordMetadata> send(MessageEvent event);

    Future<RecordMetadata> send(MessageEvent event, Callback callback);
}

实现

public class KafkaTemplateImpl implements KafkaTemplate {

    private Logger log = LoggerFactory.getLogger(KafkaTemplateImpl.class);

    private KafkaProducer kafkaProducer;

    public KafkaTemplateImpl(KafkaProducer kafkaProducer){
        this.kafkaProducer = kafkaProducer;
    }

    private ProducerRecord<String, Object> getRecord(MessageEvent event){
        if(event == null) {
            throw new RuntimeException("event is null.");
        }
        if (StringUtils.isEmpty(event.getTopic())  || null == event.getBody()) {
            throw new RuntimeException("topic, or body is null.");
        }
        return new ProducerRecord<>(event.getTopic(), event.getBody());
    }

    @Override
    public Future<RecordMetadata> send(MessageEvent event) {
        return kafkaProducer.send(getRecord(event));
    }

    @Override
    public Future<RecordMetadata> send(MessageEvent event, Callback callback) {
        return kafkaProducer.send(getRecord(event),callback);
    }


}

多实例多消费者

首先在这里准备了几个自定义注解

@ListenerHandler //写在类上,让spring扫描到,表示该类使用了kafka的消费者
@ListenerExecute //写在具体方法上,监听器会将相应的topic数据推送至该方法

ListenerExecute注解

@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface ListenerExecute {

    String message() default "";

    String topic() default "";

    String tags() default "";

    int retry() default 0;

    MessageType action();

}

KafkaListenerProcessor

主要是将ListenerExecute注解 和 KafkaListener进行关联

@Configuration
public class ListenerProcessor implements BeanPostProcessor {


    @Autowired
    private ApplicationContext applicationContext;


    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        Class<?> clazz = AopUtils.getTargetClass(bean);
        ListenerExecute annotation = clazz.getAnnotation(ListenerExecute.class);
        if (null != annotation) {
            final Method[] methods = bean.getClass().getMethods();
            for (Method method : methods) {
                final ListenerExecute methodAnnotation = method.getAnnotation(ListenerExecute.class);
                if (methodAnnotation != null) {
                    final MessageType action = methodAnnotation.action();
                    addListener(methodAnnotation.source(), bean, method, action, methodAnnotation.topic(), methodAnnotation.tags(),methodAnnotation.reMsg());
                }
            }
        }
        return bean;
    }

    private void addListener(String source, Object bean, Method method, MessageType action, String topic, String tags,String reMsg){
        KafkaListener kafkaListener = applicationContext.getBean(KafkaListener.class);
        if(null == kafkaListener){
            throw new NullPointerException("KafkaListener is null ! please check kafka.consumer.enabled = true");
        }
        kafkaListener.addListener(source, bean, method, topic, tags, reMsg);
    }
}

具体监听类

此类初始化时启动,扫描项目中注解@ListenerHandler 和 @ListenerExecute,监听相应的方法,并把数据推送给消费的具体方法

@Component
@ConditionalOnProperty(prefix = "kafkas.kafka.consumer",value = "enabled")
public class KafkaListener implements Listener {

    @Autowired
    private Environment env;

    @Autowired
    private KafkaAutoConfiguration kafkaAutoConfiguration;

    @Autowired
    private KafkaProperties kafkaProperties;

    @Override
    public void addListener(String source, Object bean,Method method, String topicKey, String tags, String reMsg) {
        Map<String, KafkaDetail> kafkaConfings = kafkaProperties.getSource();
        KafkaDetail kafkaDetail = kafkaConfings.get(source);

        if(kafkaDetail.getConsumer() != null && kafkaDetail.getConsumer().getEnabled()){
            Integer workNum = kafkaDetail.getConsumer().getWorkerNum();
            for (int i=0;i<workNum;i++){
                log.info("init kafka consumer start source:{}", source);
                openListener(kafkaDetail, bean, method, topicKey);
            }
        }
    }

    private void openListener(KafkaDetail kafkaDetail, Object bean,Method method, String topicKey){
        topicKey = topicKey.replace(" ","");
        if(StringUtils.isEmpty(topicKey)){
            throw new RuntimeException("topic is null");
        }
       final String topicValue = topicKey.substring(2,topicKey.length()-1);
        new Thread(){
            @Override
            public void run() {

                KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaAutoConfiguration.consumerConfigs(kafkaDetail));
                String topic = env.getProperty(topicValue);
                consumer.subscribe(Arrays.asList(topic));
                Parameter[] parameters = method.getParameters();
                while (true){
                    try {
                        ConsumerRecords<String, String> records = consumer.poll(kafkaDetail.getConsumer().getPollTime());
                        if(!records.isEmpty()){
                            List<String> list = new ArrayList<>(records.count());
                            for (ConsumerRecord record : records) {
                                list.add(record.value().toString());
                            }
                            Object[] params = new Object[parameters.length];
                            params[0] = list;

                            log.info("kafka commit info fail topic={}, message={}",topic,JSON.toJSON(list));
                            Boolean ret = invoke(bean, method,params);

                            if(!kafkaDetail.getConsumer().isEnableAutoCommit()){
                                if(ret){
                                    log.info("kafka commit info fail ret={}, topic={}, message={}",ret,topic,JSON.toJSON(list));
                                    consumer.commitAsync();
                                } else if(!ret && kafkaDetail.getConsumer().getRetries().intValue() > 0){
                                    for (int i = 1;i<kafkaDetail.getConsumer().getRetries().intValue();i++) {
                                        ret = invoke(bean, method, params);
                                        if (ret) {
                                            consumer.commitAsync();
                                            break;
                                        }
                                        if(i == kafkaDetail.getConsumer().getRetries().intValue() - 1){
                                            log.info("kafka commit info retries fail topic={}, message={}",topic,JSON.toJSON(list));
                                            consumer.commitAsync();
                                        }
                                    }
                                }
                            }
                        }
                    } catch (IllegalAccessException e) {
                        log.error("消费器不合法访问!",e);
                    } catch (InvocationTargetException e) {
                        log.error("消费器调用handler失败!",e);
                    } catch (Throwable e){
                        log.error("Throwable!Consumer error",e);
                    }
                }
            }
        }.start();
    }


    public Boolean invoke(Object bean, Method method, Object[] params) throws Exception{
        Object obj = method.invoke(bean,params);
        return obj == null ? false : (Boolean)obj;
    }

    public String[] getMethodParamName(Class<?> clazz, String methodName, Class<?>[] parameterTypes) {
        try {
            Method method = clazz.getDeclaredMethod(methodName, parameterTypes);
            LocalVariableTableParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer();
            return parameterNameDiscoverer.getParameterNames(method);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

使用方式

参数详解:

@Value 表示从spring管理的Map中区key值为kafka_db_data1的KafkaTemplate对象

@ListenerExecute 定义了source来源,可以匹配到kafka的数据源,topic可配置

@Slf4j
@Component
@ListenerHandler
public class KafkaDemo {


    @Value("#{kafkaTemplate.kafka_db_data1}")
    private KafkaTemplate kafkaDbData1;

    public void kafkaProducerDemo(){
        MessageEvent event = new MessageEvent();
        event.setBody("我是具体数据");
        event.setTopic("demo_topic");
        kafkaDbData1.send(event);
    }

    @ListenerExecute(source = "kafka_db_data1", topic = "${kafkas.kafka.source.kafka_db_data.topic.comments}", action = MessageType.KAFKA, reMsg = "data")
    public void kafkaDemo(List<String> data) {
        log.info(JSON.toJSONString(data));
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐