Kafka 自定义多数据源

写在开头

工作需要对接很多第三方数据源,大多是通过Kafka落地的,导致项目中充满了大量雷同的Kafka配置,冗余代码很多,多人开发的情况下时间长了十分难以维护。网上大多的博客源码都是只有两个数据源,配置也都很多不适合我的需求,因此开发了一个简化配置、减少重复代码,可以简单配置即可开箱即用的Kafka数据源模板。

大致的思路是,自定义配置文件,通过spring解析完成之后在初始化期间根据配置文件实例化多个Kafka containerFactory和Kafka Template对象并加入到spring 管理,然后在使用的时候直接通过bean id 引入即可。

1. 自定义配置文件

配置文件的格式可以根据自己的需要定义,只要在解析的时候能处理成你需要的数据就行,下面是我定义的模板。

#多数据源kafka 配置模板说明,不做打包
kafka:
  cust:
    server:
      1:
        consumerConf: #kafka消费端核心配置,可以接收所有kafka消费工厂中的配置  ,其中group_id不要在这里配置,在@KafkaListener 中配置,不然会导致多个消费端公用一个配置,consumerExtra可以有多个,每个代表一个消费端,公用同级的consumerConf配置
          "[bootstrap.servers]": localhost:9092
#          "[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
#          "[value.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
#          "[auto.offset.reset]": latest
          "[auto.commit.interval.ms]": 1000
        consumerExtra: #消费端工厂配置,可设置简单的参数,也可以直接设置工厂
          one:  # 实例化到spring中的beanName , 这个很重要,这个就是@KafkaListener 中工厂配置的名称
            ackDiscarded: true
              #          consumerFactory:
            polltimeout: 4000
            factoryFilter: #过滤工厂,如果消息需要开启过滤,首先ackDiscarded这个要设置成true
              serializerClass: com.gtja.bmall.gtkafka.entity.TestDto  #消息序列化成的对象
              filterFiled: name #要对比的字段,即serializerClass定义的属性名
              filterTarget: zhangsan #要对比的值,从filterFiled获取到的值会和filterTarget进行比较,相同则保留消息
          one22:  # 实例化到spring中的beanName
            ackDiscarded: true
            #          consumerFactory:
            polltimeout: 4000
            factoryFilter:
              custStrategy: com.gtja.bmall.gtkafka.config.kafkaconfig.factorystrategy.CustFactoryStrategy  #定义的过滤策略工厂,更自由化

        producerConf: #生产端核心配置
          "[bootstrap.servers]": localhost:9092
#          "[acks]": all
#          "[key.serializer]": org.apache.kafka.common.serialization.StringSerializer
#          "[value.serializer]": org.apache.kafka.common.serialization.StringSerializer
#          "[batch.size]": 16384
          "[retries]": 2
#          "[buffer.memory]": 33554432
        producerBeanName: produ  #生产端需要特别设置beanname , 一个server 中只能有一个producerConf 和producerBeanName
      2:
        consumerConf: #kafka消费端核心配置
          "[bootstrap.servers]": localhost:9092
          #          "[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
          #          "[value.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
          #          "[auto.offset.reset]": latest
          "[auto.commit.interval.ms]": 2000
        consumerExtra: #消费端工厂配置,可设置简单的参数,也可以直接设置工厂
          two:  # 实例化到spring中的beanName
            ackDiscarded: true
            #          consumerFactory:
            polltimeout: 4000
        producerConf:
          "[bootstrap.servers]": localhost:9092
          #          "[acks]": all
          #          "[key.serializer]": org.apache.kafka.common.serialization.StringSerializer
          #          "[value.serializer]": org.apache.kafka.common.serialization.StringSerializer
          #          "[batch.size]": 16384
          "[retries]": 3
        #          "[buffer.memory]": 33554432
        producerBeanName: produ22  #生产端需要特别设置beanname


2. 加载配置文件

这块是最麻烦的,因为Kafka需要的参数比较多,并且还要支持自己的业务场景,导致yml中要嵌套很多层,需要细心,不然解析很容易出错。以下是解析的入口。

@Slf4j
@Getter
@Setter
@PropertySource(value = {"classpath:application-kafkaconf.yml"}, factory = YmlPropertySourceFactory.class)
@ConfigurationProperties(prefix = "kafka.cust")
@Configuration
public class KafkaConfigProperty {
    @NestedConfigurationProperty
    private Map<String, KafkaDataSourceConfig> server = new LinkedHashMap<>();
}

因为@PropertySource 默认不支持解析yml,所以需要自定义一个工厂,内容如下。

@Slf4j
public class YmlPropertySourceFactory implements PropertySourceFactory {
    @Override
    public PropertySource<?> createPropertySource(String name, EncodedResource encodedResource) throws IOException {
        PropertySource<?> propertySource = null;
        try {
            List<PropertySource<?>> load = new YamlPropertySourceLoader().load(name, encodedResource.getResource());
            propertySource = new YamlPropertySourceLoader().load(name, encodedResource.getResource()).get(0);
        }catch (Exception e){
            propertySource = new PropertySource<Object>("") {
                @Override
                public Object getProperty(String name) {
                    return null;
                }
            };
            log.error("load resource error "+ e);
        }
        return propertySource;
    }
}

3. 解析配置文件

因为配置文件是多层嵌套,解析的时候也要一层层来,以我的配置如例。

KafkaDataSourceConfig 对应的类型如下

@Data
@Slf4j
@Accessors(chain = true)
public class KafkaDataSourceConfig {
    private Map<String, Object> producerConf = new LinkedHashMap() {
        {
            put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
            put("value.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
            put("acks", "all");
            put("batch.size", "16384");
            put("batch.size", "16384");
            put("buffer.memory", "33554432");
        }
    };//生产者配置,这里是一些默认配置,不能随便改,要改的话初始化判断条件也要改
    private Map<String, Object> consumerConf = new LinkedHashMap() {
        {
            put("key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class);
            put("value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class);
            put("auto.offset.reset", "latest");
        }
    };//消费者配置,这里是一些默认配置,不能随便改,要改的话初始化判断条件也要改

    private Map<String, KafkaExtraConfig> consumerExtra = new LinkedHashMap();//工厂配置
    private Map<String, KafkaExtraConfig> producerExtra = new LinkedHashMap();//工厂配置

    private String producerBeanName;//生产端需要单独设置beanname


}

注意这里的producerConf 和consumerConf 这里其实可以不用Map接收,可以再创建一个对象,这样默认值的赋值会优雅许多,也方便后续的判断,但是由于Kafka的配置都是xxx.xxx,实体类的字段名不能包含点,所以需要再多个额外的步骤进行转换,我这边就讨巧用Map接收直接和Kafka的配置对应。

这两个Map中的默认值可以根据自己的需要定义,由于我的需求中消费端居多,生产端较少,所以这里实例化的基准不一样,消费端实例化数量以consumerExtra中的key的数量为准,而生产端实例化数量是以KafkaConfigProperty#server 中的key数量为准。

KafkaExtraConfig 对象的内容为下

@Data
@Slf4j
public class KafkaExtraConfig {

    private Boolean ackDiscarded = false;//是否可以丢弃消息,默认false

    private Class<? extends KafkaListenerContainerFactory> consumerFactory;//自定义消费工厂

    private long polltimeout = 3000;//消息拉取超时时间

    private Boolean batchListener = false;//批量监听

    private Integer concurrency = 1;//KafkaMessageListenerContainer 实例数量

    private KafkaConsuFactoryFilterConfig factoryFilter;//消费过滤工厂配置

    private Class<? extends ProducerFactory> producerFactory;//自定义生产工厂
}

这里也配置了一些常用参数,在配置文件中定义,可以根据需要增减,其中 consumerFactory producerFactory 可以自定义实现类,然后在配置文件中指定全限定类名。

factoryFilter 内容如下

@Data
@Slf4j
public class KafkaConsuFactoryFilterConfig {
    private Class<?> serializerClass;//要序列化成的对象
    private Class<? extends RecordFilterStrategy> custStrategy;//指定自定义策略,上面参数就会失效
    private Map<String, String> filtermap = new LinkedHashMap<>();//过滤的字段集合,key为要对比的字段,value为比较的值,支持逗号分隔
}

这里是我工作的特殊场景,custStrategy可以指定过滤工厂,serializerClass filtermap 这两个参数是简约化配置,可以序列化成对象后根据filtermap中 的key和value来进行过滤,满足条件的消息才会保留。

4. 初始化
@Component
@Slf4j
public class KafkaProducerCreator implements InitializingBean {
    @Autowired
    private KafkaConfigProperty properties;//kafka核心配置文件
    @Autowired
    private DefaultListableBeanFactory beanFactory;//springbean 工厂
    private Map<String, KafkaDataSourceConfig> kafkaDataSourceConfig; 

    /**
     * @Description: 初始化kafka多数据源配置
     * @Param: []
     * @return: void
     */
    private void createKafkaDataSourceBean() {
        try {
            createKafkaInstance();
        } catch (Exception e) {
            log.error("初始化kafka config失败 " + e);
        }
    }


    /**
     * @Description: 初始化生产端和消费端
     * @Param: []
     * @return: void
     */
    private void createKafkaInstance() throws IllegalAccessException, InstantiationException, ClassNotFoundException {
        if (!kafkaDataSourceConfig.isEmpty()) {
            for (String server : kafkaDataSourceConfig.keySet()) {
                KafkaDataSourceConfig kafkaDataSourceConfig = this.kafkaDataSourceConfig.get(server);
                Map<String, Object> consumerConf = kafkaDataSourceConfig.getConsumerConf();//核心配置
                Map<String, Object> produceConf = kafkaDataSourceConfig.getProducerConf();
                newKafkaConsumerInstance(kafkaDataSourceConfig, consumerConf);
                newKafkaProducerInstance(kafkaDataSourceConfig, produceConf);
            }
        }
    }

    /**
     * @Description: 实例化生产者,以配置的server下面的produceeConf的数量为基准
     * @Param: [kafkaDataSourceConfig, produceConf]
     * @return: void
     */
    private void newKafkaProducerInstance(KafkaDataSourceConfig kafkaDataSourceConfig, Map<String, Object> produceConf) throws ClassNotFoundException {
        if (!produceConf.isEmpty() && StringUtils.isNoneBlank(kafkaDataSourceConfig.getProducerBeanName()) && produceConf.size() > 6) {
//          暂时不考虑producerExtra中的配置 todo
            custStringToClass(produceConf);
            DefaultKafkaProducerFactory defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(produceConf);
            String producerBeanName = kafkaDataSourceConfig.getProducerBeanName();
            KafkaTemplate<String, String> stringStringKafkaTemplate = new KafkaTemplate<String, String>(defaultKafkaProducerFactory);
            stringStringKafkaTemplate.setBeanName(producerBeanName);
            beanFactory.registerSingleton(producerBeanName, stringStringKafkaTemplate);
            log.info("create kafka kafkatemplate  ,bean id is {} , factory config  is {}",producerBeanName,stringStringKafkaTemplate.getProducerFactory().getConfigurationProperties().toString());
        }

    }

    /**
     * @Description: 实例化消费端, 以配置的consumerExtra为基准
     * @Param: [kafkaDataSourceConfig, consumerConf]
     * @return: void
     */
    private void newKafkaConsumerInstance(KafkaDataSourceConfig kafkaDataSourceConfig, Map<String, Object> consumerConf) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
        if (!consumerConf.isEmpty() && consumerConf.size() > 3) {
            Map<String, KafkaExtraConfig> extra = kafkaDataSourceConfig.getConsumerExtra();
            for (String factory : extra.keySet()) {
//                   代表配置了多个工厂相关
                ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaFac = new ConcurrentKafkaListenerContainerFactory<>();
                KafkaExtraConfig kafkaExtraConfig = extra.get(factory);
                if (kafkaExtraConfig.getConsumerFactory() != null) {
//                       代表使用了自定义工厂
//                    todo 或许可以更细化点,把消费工厂和过滤工厂整合到一起
                    Class<? extends KafkaListenerContainerFactory> consumerFactory = kafkaExtraConfig.getConsumerFactory();
                    kafkaFac = (ConcurrentKafkaListenerContainerFactory<Integer, String>) consumerFactory.newInstance();
                } else {
//                   默认 常用配置
                    kafkaFac.setAckDiscarded(kafkaExtraConfig.getAckDiscarded());//是否开始消息过滤
                    kafkaFac.setConcurrency(kafkaExtraConfig.getConcurrency());
                    kafkaFac.setBatchListener(kafkaExtraConfig.getBatchListener());
                    kafkaFac.getContainerProperties().setPollTimeout(kafkaExtraConfig.getPolltimeout());//拉取超时时间,默认3000
                }
                custStringToClass(consumerConf);
                DefaultKafkaConsumerFactory<Object, Object> defaultKafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(consumerConf);
                kafkaFac.setConsumerFactory(defaultKafkaConsumerFactory);
                KafkaConsuFactoryFilterConfig factoryFilter = kafkaExtraConfig.getFactoryFilter();
                if(kafkaExtraConfig.getAckDiscarded()){
                    setFilterFactory(kafkaFac, factoryFilter);
                }
                beanFactory.registerSingleton(factory, kafkaFac);
                log.info("create kafka consumer ,bean id is {} , properties is {} , factory config is {}",factory,kafkaFac.getContainerProperties().toString(),kafkaFac.getConsumerFactory().getConfigurationProperties().toString());
            }

        }
    }

    /**
     * @Description: 设置工厂过滤策略
     * @Param: [defaultKafkaConsumerFactory]
     * @return: void
     */
    private void setFilterFactory(ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaFac, KafkaConsuFactoryFilterConfig factoryFilter) throws IllegalAccessException, InstantiationException {
        if (factoryFilter != null) {
//           代表配置了过滤策略
            if (factoryFilter.getCustStrategy() != null) {
//               代表使用了自定义策略
                kafkaFac.setRecordFilterStrategy(factoryFilter.getCustStrategy().newInstance());
            } else if(!factoryFilter.getFiltermap().isEmpty()){
                kafkaFac.setRecordFilterStrategy((ConsumerRecord<Integer, String> consumerRecord) -> {
                    try {
                        Object o = JSONObject.parseObject(consumerRecord.value().toString(), factoryFilter.getSerializerClass());
//                        log.info("序列化的对象为:::" + o);
                        return judgeFiledMap(o, factoryFilter.getFiltermap());
                    } catch (Exception e) {
                        log.error("filter error topic is {},value is {}", consumerRecord.topic(), consumerRecord.value());
                        return true;
                    }
                });
            }
        }
    }

    /**
     * @Description: 计算多条件下的过滤规则
     * @Param: [o, filterMap]
     * @return: boolean
    */
    private boolean judgeFiledMap(Object o, Map<String,String> filterMap) {
        Class<?> aClass = o.getClass();
        Field[] declaredFields = aClass.getDeclaredFields();
        List<Boolean> execValue = new ArrayList<>();
        for (String filterKey : filterMap.keySet()) {
            boolean execBoo = false;
            for (Field declaredField : declaredFields) {
                declaredField.setAccessible(true);
                execBoo = getaBoolean(o, filterMap, declaredField, filterKey);
                if (execBoo){
                    break;//代表这个key满足条件了,不需要再循环了
                }
            }
            if(execBoo){
                execValue.add(execBoo);//用来计算所有的key对应的条件是否满足,少一个都代表有问题
            }else {
                return true;//这个true代表丢弃消息
            }
        }
        return ! (execValue.size() == filterMap.size() );//要取反,不然逻辑不对
    }

    /**
     * @Description: 计算每个key是否满足条件,满足就返回true ,否则返回false
     * @Param: [o, filterMap, declaredField, filterKey]
     * @return: boolean
    */
    private boolean getaBoolean(Object o, Map<String, String> filterMap, Field declaredField, String filterKey) {
        if(declaredField.getName().equals(filterKey)){
            try {
                Object o1 = declaredField.get(o); //value值
                String[] filtervalues = StringUtils.split(filterMap.get(filterKey), ",");
                for (String filtervalue : filtervalues) {
                    if (o1 instanceof String) {
                        if (filtervalue.equals(o1.toString())) {//只要配置的值中有一个满足条件就保留
                            return true;
                        }
                    }
                }
            } catch (IllegalAccessException e) {
                log.error("反射获取字段失败, 对象为 " + o + "  字段为" + declaredField.getName() + " 过滤的值为 " + filterKey);
                return false;
            }
        }
        return false;
    }

    /**
     * @Description: 消费过滤工厂 简单化配置,需要反射获取字段值和对应的设置的过滤值进行比较
     * @Param: [o, filterFiled, filterString]
     * @return: boolean
     */
    private boolean judgeFiled(Object o, String filterFiled, String filterString) {
        Class<?> aClass = o.getClass();
        Field[] declaredFields = aClass.getDeclaredFields();
        for (Field declaredField : declaredFields) {
            declaredField.setAccessible(true);
            if (declaredField.getName().equals(filterFiled)) {
                try {
                    Object o1 = declaredField.get(o);
                    if (o1 instanceof String) {
                        if (filterString.equals(o1.toString())) {
                            return false;
                        }
                    }
                } catch (IllegalAccessException e) {
                    log.error("反射获取字段失败, 对象为 " + o + "  字段为" + declaredField.getName() + " 过滤的值为 " + filterString);
                    return true;
                }
            }
        }
        return true;
    }
    private void custStringToClass(Map<String, Object> map) throws ClassNotFoundException {
        for (String key : map.keySet()) {
//            "org.apache.kafka.common"
            if (map.get(key) instanceof String && StringUtil.containsIgnoreCase(map.get(key).toString(), "org.apache.kafka.common")) {
                map.put(key, Class.forName(map.get(key).toString()));
            }
        }
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        kafkaDataSourceConfig = properties.getServer();
        createKafkaDataSourceBean();
    }
}

通过Spring进行初始化,代码比较简单,核心思路就是注入解析好的配置对象,然后一步步实例化成Kafka Consumer 和Kafka Template 对象。这里是最重要的地方,需要花费时间好好调试,按照自己的需求进行实例化。beanFactory.registerSingleton(factory, kafkaFac) beanFactory.registerSingleton(producerBeanName, stringStringKafkaTemplate) 这两步执行完,Spring中就有我们需要的对象了,注意这里的beanName ,很重要,使用的时候需要用到。

5. 使用

消费端使用

@EnableKafka
@Component
public class TestConsu {

    @KafkaListener(topics = {"test"}, containerFactory = "one",groupId = "custgroup")
    public void listenerone (String data) {
        System.out.println("test1=================="+data);
    }
}

这里的containerFactory 就是beanFactory.registerSingleton(factory, kafkaFac) 这里的factory,根据你的配置会实例化多个,也就可以在多个@KafkaListener 中引用。

生产端使用

@RestController
@RequestMapping(value = "/test")
@CrossOrigin
@Slf4j
public class TestController {
    @Autowired
    private TestService testService;
    @Autowired
    private KafkaConfigProperty kafkaConfigProperty;
    @Autowired
    private KafkaProducerCreator kafkaProducerCreator;
    @Autowired(required = false)
    @Qualifier("produ")
    KafkaTemplate kafkaTemplate;

    @Autowired(required = false)
    @Qualifier("produ22")
    KafkaTemplate kafkaTemplate2;
    @GetMapping("/test")
    public Object batchQueryLastAV(){
        TestDto testDto = new TestDto();
        testDto.setMsg("今天");
        testDto.setName("lisi");
        ListenableFuture aaaaaaaaaa = kafkaTemplate.send("test", JSON.toJSONString(testDto));
        final Properties properties = System.getProperties();
        return ResultUtil.success(testService.test());
    }

    @GetMapping("/test2")
    public Object batchQueryLastAV2(){
        TestDto testDto = new TestDto();
        testDto.setMsg("999999999");
        testDto.setName("zhangsan");
        ListenableFuture aaaaaaaaaa = kafkaTemplate2.send("test2",JSON.toJSONString(testDto));
        final Properties properties = System.getProperties();
        return ResultUtil.success(testService.test());
    }
}

这里的kafkaTemplate 需要通过@Qualifier("produ") 指定beanid, id在beanFactory.registerSingleton(producerBeanName, stringStringKafkaTemplate) 这里实例化。

6. 总结

其实还有很多可以完善的地方,比如可以定义additional-spring-configuration-metadata.json 来让yml配置提示,也可以工程化成一个依赖包,后面有时间再弄吧。

源码 https://github.com/wkfyynh/multikafka-datasource.git

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐