Kafka 自定义多数据源
Kafka 自定义多数据源
Kafka 自定义多数据源
写在开头
工作需要对接很多第三方数据源,大多是通过Kafka落地的,导致项目中充满了大量雷同的Kafka配置,冗余代码很多,多人开发的情况下时间长了十分难以维护。网上大多的博客源码都是只有两个数据源,配置也都很多不适合我的需求,因此开发了一个简化配置、减少重复代码,可以简单配置即可开箱即用的Kafka数据源模板。
大致的思路是,自定义配置文件,通过spring解析完成之后在初始化期间根据配置文件实例化多个Kafka containerFactory和Kafka Template对象并加入到spring 管理,然后在使用的时候直接通过bean id 引入即可。
1. 自定义配置文件
配置文件的格式可以根据自己的需要定义,只要在解析的时候能处理成你需要的数据就行,下面是我定义的模板。
#多数据源kafka 配置模板说明,不做打包
kafka:
cust:
server:
1:
consumerConf: #kafka消费端核心配置,可以接收所有kafka消费工厂中的配置 ,其中group_id不要在这里配置,在@KafkaListener 中配置,不然会导致多个消费端公用一个配置,consumerExtra可以有多个,每个代表一个消费端,公用同级的consumerConf配置
"[bootstrap.servers]": localhost:9092
# "[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
# "[value.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
# "[auto.offset.reset]": latest
"[auto.commit.interval.ms]": 1000
consumerExtra: #消费端工厂配置,可设置简单的参数,也可以直接设置工厂
one: # 实例化到spring中的beanName , 这个很重要,这个就是@KafkaListener 中工厂配置的名称
ackDiscarded: true
# consumerFactory:
polltimeout: 4000
factoryFilter: #过滤工厂,如果消息需要开启过滤,首先ackDiscarded这个要设置成true
serializerClass: com.gtja.bmall.gtkafka.entity.TestDto #消息序列化成的对象
filterFiled: name #要对比的字段,即serializerClass定义的属性名
filterTarget: zhangsan #要对比的值,从filterFiled获取到的值会和filterTarget进行比较,相同则保留消息
one22: # 实例化到spring中的beanName
ackDiscarded: true
# consumerFactory:
polltimeout: 4000
factoryFilter:
custStrategy: com.gtja.bmall.gtkafka.config.kafkaconfig.factorystrategy.CustFactoryStrategy #定义的过滤策略工厂,更自由化
producerConf: #生产端核心配置
"[bootstrap.servers]": localhost:9092
# "[acks]": all
# "[key.serializer]": org.apache.kafka.common.serialization.StringSerializer
# "[value.serializer]": org.apache.kafka.common.serialization.StringSerializer
# "[batch.size]": 16384
"[retries]": 2
# "[buffer.memory]": 33554432
producerBeanName: produ #生产端需要特别设置beanname , 一个server 中只能有一个producerConf 和producerBeanName
2:
consumerConf: #kafka消费端核心配置
"[bootstrap.servers]": localhost:9092
# "[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
# "[value.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
# "[auto.offset.reset]": latest
"[auto.commit.interval.ms]": 2000
consumerExtra: #消费端工厂配置,可设置简单的参数,也可以直接设置工厂
two: # 实例化到spring中的beanName
ackDiscarded: true
# consumerFactory:
polltimeout: 4000
producerConf:
"[bootstrap.servers]": localhost:9092
# "[acks]": all
# "[key.serializer]": org.apache.kafka.common.serialization.StringSerializer
# "[value.serializer]": org.apache.kafka.common.serialization.StringSerializer
# "[batch.size]": 16384
"[retries]": 3
# "[buffer.memory]": 33554432
producerBeanName: produ22 #生产端需要特别设置beanname
2. 加载配置文件
这块是最麻烦的,因为Kafka需要的参数比较多,并且还要支持自己的业务场景,导致yml中要嵌套很多层,需要细心,不然解析很容易出错。以下是解析的入口。
@Slf4j
@Getter
@Setter
@PropertySource(value = {"classpath:application-kafkaconf.yml"}, factory = YmlPropertySourceFactory.class)
@ConfigurationProperties(prefix = "kafka.cust")
@Configuration
public class KafkaConfigProperty {
@NestedConfigurationProperty
private Map<String, KafkaDataSourceConfig> server = new LinkedHashMap<>();
}
因为@PropertySource
默认不支持解析yml,所以需要自定义一个工厂,内容如下。
@Slf4j
public class YmlPropertySourceFactory implements PropertySourceFactory {
@Override
public PropertySource<?> createPropertySource(String name, EncodedResource encodedResource) throws IOException {
PropertySource<?> propertySource = null;
try {
List<PropertySource<?>> load = new YamlPropertySourceLoader().load(name, encodedResource.getResource());
propertySource = new YamlPropertySourceLoader().load(name, encodedResource.getResource()).get(0);
}catch (Exception e){
propertySource = new PropertySource<Object>("") {
@Override
public Object getProperty(String name) {
return null;
}
};
log.error("load resource error "+ e);
}
return propertySource;
}
}
3. 解析配置文件
因为配置文件是多层嵌套,解析的时候也要一层层来,以我的配置如例。
KafkaDataSourceConfig
对应的类型如下
@Data
@Slf4j
@Accessors(chain = true)
public class KafkaDataSourceConfig {
private Map<String, Object> producerConf = new LinkedHashMap() {
{
put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
put("value.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
put("acks", "all");
put("batch.size", "16384");
put("batch.size", "16384");
put("buffer.memory", "33554432");
}
};//生产者配置,这里是一些默认配置,不能随便改,要改的话初始化判断条件也要改
private Map<String, Object> consumerConf = new LinkedHashMap() {
{
put("key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class);
put("value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class);
put("auto.offset.reset", "latest");
}
};//消费者配置,这里是一些默认配置,不能随便改,要改的话初始化判断条件也要改
private Map<String, KafkaExtraConfig> consumerExtra = new LinkedHashMap();//工厂配置
private Map<String, KafkaExtraConfig> producerExtra = new LinkedHashMap();//工厂配置
private String producerBeanName;//生产端需要单独设置beanname
}
注意这里的producerConf 和consumerConf 这里其实可以不用Map接收,可以再创建一个对象,这样默认值的赋值会优雅许多,也方便后续的判断,但是由于Kafka的配置都是xxx.xxx
,实体类的字段名不能包含点,所以需要再多个额外的步骤进行转换,我这边就讨巧用Map接收直接和Kafka的配置对应。
这两个Map中的默认值可以根据自己的需要定义,由于我的需求中消费端居多,生产端较少,所以这里实例化的基准不一样,消费端实例化数量以consumerExtra
中的key的数量为准,而生产端实例化数量是以KafkaConfigProperty#server
中的key数量为准。
KafkaExtraConfig
对象的内容为下
@Data
@Slf4j
public class KafkaExtraConfig {
private Boolean ackDiscarded = false;//是否可以丢弃消息,默认false
private Class<? extends KafkaListenerContainerFactory> consumerFactory;//自定义消费工厂
private long polltimeout = 3000;//消息拉取超时时间
private Boolean batchListener = false;//批量监听
private Integer concurrency = 1;//KafkaMessageListenerContainer 实例数量
private KafkaConsuFactoryFilterConfig factoryFilter;//消费过滤工厂配置
private Class<? extends ProducerFactory> producerFactory;//自定义生产工厂
}
这里也配置了一些常用参数,在配置文件中定义,可以根据需要增减,其中 consumerFactory producerFactory
可以自定义实现类,然后在配置文件中指定全限定类名。
factoryFilter
内容如下
@Data
@Slf4j
public class KafkaConsuFactoryFilterConfig {
private Class<?> serializerClass;//要序列化成的对象
private Class<? extends RecordFilterStrategy> custStrategy;//指定自定义策略,上面参数就会失效
private Map<String, String> filtermap = new LinkedHashMap<>();//过滤的字段集合,key为要对比的字段,value为比较的值,支持逗号分隔
}
这里是我工作的特殊场景,custStrategy
可以指定过滤工厂,serializerClass filtermap
这两个参数是简约化配置,可以序列化成对象后根据filtermap
中 的key和value来进行过滤,满足条件的消息才会保留。
4. 初始化
@Component
@Slf4j
public class KafkaProducerCreator implements InitializingBean {
@Autowired
private KafkaConfigProperty properties;//kafka核心配置文件
@Autowired
private DefaultListableBeanFactory beanFactory;//springbean 工厂
private Map<String, KafkaDataSourceConfig> kafkaDataSourceConfig;
/**
* @Description: 初始化kafka多数据源配置
* @Param: []
* @return: void
*/
private void createKafkaDataSourceBean() {
try {
createKafkaInstance();
} catch (Exception e) {
log.error("初始化kafka config失败 " + e);
}
}
/**
* @Description: 初始化生产端和消费端
* @Param: []
* @return: void
*/
private void createKafkaInstance() throws IllegalAccessException, InstantiationException, ClassNotFoundException {
if (!kafkaDataSourceConfig.isEmpty()) {
for (String server : kafkaDataSourceConfig.keySet()) {
KafkaDataSourceConfig kafkaDataSourceConfig = this.kafkaDataSourceConfig.get(server);
Map<String, Object> consumerConf = kafkaDataSourceConfig.getConsumerConf();//核心配置
Map<String, Object> produceConf = kafkaDataSourceConfig.getProducerConf();
newKafkaConsumerInstance(kafkaDataSourceConfig, consumerConf);
newKafkaProducerInstance(kafkaDataSourceConfig, produceConf);
}
}
}
/**
* @Description: 实例化生产者,以配置的server下面的produceeConf的数量为基准
* @Param: [kafkaDataSourceConfig, produceConf]
* @return: void
*/
private void newKafkaProducerInstance(KafkaDataSourceConfig kafkaDataSourceConfig, Map<String, Object> produceConf) throws ClassNotFoundException {
if (!produceConf.isEmpty() && StringUtils.isNoneBlank(kafkaDataSourceConfig.getProducerBeanName()) && produceConf.size() > 6) {
// 暂时不考虑producerExtra中的配置 todo
custStringToClass(produceConf);
DefaultKafkaProducerFactory defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(produceConf);
String producerBeanName = kafkaDataSourceConfig.getProducerBeanName();
KafkaTemplate<String, String> stringStringKafkaTemplate = new KafkaTemplate<String, String>(defaultKafkaProducerFactory);
stringStringKafkaTemplate.setBeanName(producerBeanName);
beanFactory.registerSingleton(producerBeanName, stringStringKafkaTemplate);
log.info("create kafka kafkatemplate ,bean id is {} , factory config is {}",producerBeanName,stringStringKafkaTemplate.getProducerFactory().getConfigurationProperties().toString());
}
}
/**
* @Description: 实例化消费端, 以配置的consumerExtra为基准
* @Param: [kafkaDataSourceConfig, consumerConf]
* @return: void
*/
private void newKafkaConsumerInstance(KafkaDataSourceConfig kafkaDataSourceConfig, Map<String, Object> consumerConf) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
if (!consumerConf.isEmpty() && consumerConf.size() > 3) {
Map<String, KafkaExtraConfig> extra = kafkaDataSourceConfig.getConsumerExtra();
for (String factory : extra.keySet()) {
// 代表配置了多个工厂相关
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaFac = new ConcurrentKafkaListenerContainerFactory<>();
KafkaExtraConfig kafkaExtraConfig = extra.get(factory);
if (kafkaExtraConfig.getConsumerFactory() != null) {
// 代表使用了自定义工厂
// todo 或许可以更细化点,把消费工厂和过滤工厂整合到一起
Class<? extends KafkaListenerContainerFactory> consumerFactory = kafkaExtraConfig.getConsumerFactory();
kafkaFac = (ConcurrentKafkaListenerContainerFactory<Integer, String>) consumerFactory.newInstance();
} else {
// 默认 常用配置
kafkaFac.setAckDiscarded(kafkaExtraConfig.getAckDiscarded());//是否开始消息过滤
kafkaFac.setConcurrency(kafkaExtraConfig.getConcurrency());
kafkaFac.setBatchListener(kafkaExtraConfig.getBatchListener());
kafkaFac.getContainerProperties().setPollTimeout(kafkaExtraConfig.getPolltimeout());//拉取超时时间,默认3000
}
custStringToClass(consumerConf);
DefaultKafkaConsumerFactory<Object, Object> defaultKafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(consumerConf);
kafkaFac.setConsumerFactory(defaultKafkaConsumerFactory);
KafkaConsuFactoryFilterConfig factoryFilter = kafkaExtraConfig.getFactoryFilter();
if(kafkaExtraConfig.getAckDiscarded()){
setFilterFactory(kafkaFac, factoryFilter);
}
beanFactory.registerSingleton(factory, kafkaFac);
log.info("create kafka consumer ,bean id is {} , properties is {} , factory config is {}",factory,kafkaFac.getContainerProperties().toString(),kafkaFac.getConsumerFactory().getConfigurationProperties().toString());
}
}
}
/**
* @Description: 设置工厂过滤策略
* @Param: [defaultKafkaConsumerFactory]
* @return: void
*/
private void setFilterFactory(ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaFac, KafkaConsuFactoryFilterConfig factoryFilter) throws IllegalAccessException, InstantiationException {
if (factoryFilter != null) {
// 代表配置了过滤策略
if (factoryFilter.getCustStrategy() != null) {
// 代表使用了自定义策略
kafkaFac.setRecordFilterStrategy(factoryFilter.getCustStrategy().newInstance());
} else if(!factoryFilter.getFiltermap().isEmpty()){
kafkaFac.setRecordFilterStrategy((ConsumerRecord<Integer, String> consumerRecord) -> {
try {
Object o = JSONObject.parseObject(consumerRecord.value().toString(), factoryFilter.getSerializerClass());
// log.info("序列化的对象为:::" + o);
return judgeFiledMap(o, factoryFilter.getFiltermap());
} catch (Exception e) {
log.error("filter error topic is {},value is {}", consumerRecord.topic(), consumerRecord.value());
return true;
}
});
}
}
}
/**
* @Description: 计算多条件下的过滤规则
* @Param: [o, filterMap]
* @return: boolean
*/
private boolean judgeFiledMap(Object o, Map<String,String> filterMap) {
Class<?> aClass = o.getClass();
Field[] declaredFields = aClass.getDeclaredFields();
List<Boolean> execValue = new ArrayList<>();
for (String filterKey : filterMap.keySet()) {
boolean execBoo = false;
for (Field declaredField : declaredFields) {
declaredField.setAccessible(true);
execBoo = getaBoolean(o, filterMap, declaredField, filterKey);
if (execBoo){
break;//代表这个key满足条件了,不需要再循环了
}
}
if(execBoo){
execValue.add(execBoo);//用来计算所有的key对应的条件是否满足,少一个都代表有问题
}else {
return true;//这个true代表丢弃消息
}
}
return ! (execValue.size() == filterMap.size() );//要取反,不然逻辑不对
}
/**
* @Description: 计算每个key是否满足条件,满足就返回true ,否则返回false
* @Param: [o, filterMap, declaredField, filterKey]
* @return: boolean
*/
private boolean getaBoolean(Object o, Map<String, String> filterMap, Field declaredField, String filterKey) {
if(declaredField.getName().equals(filterKey)){
try {
Object o1 = declaredField.get(o); //value值
String[] filtervalues = StringUtils.split(filterMap.get(filterKey), ",");
for (String filtervalue : filtervalues) {
if (o1 instanceof String) {
if (filtervalue.equals(o1.toString())) {//只要配置的值中有一个满足条件就保留
return true;
}
}
}
} catch (IllegalAccessException e) {
log.error("反射获取字段失败, 对象为 " + o + " 字段为" + declaredField.getName() + " 过滤的值为 " + filterKey);
return false;
}
}
return false;
}
/**
* @Description: 消费过滤工厂 简单化配置,需要反射获取字段值和对应的设置的过滤值进行比较
* @Param: [o, filterFiled, filterString]
* @return: boolean
*/
private boolean judgeFiled(Object o, String filterFiled, String filterString) {
Class<?> aClass = o.getClass();
Field[] declaredFields = aClass.getDeclaredFields();
for (Field declaredField : declaredFields) {
declaredField.setAccessible(true);
if (declaredField.getName().equals(filterFiled)) {
try {
Object o1 = declaredField.get(o);
if (o1 instanceof String) {
if (filterString.equals(o1.toString())) {
return false;
}
}
} catch (IllegalAccessException e) {
log.error("反射获取字段失败, 对象为 " + o + " 字段为" + declaredField.getName() + " 过滤的值为 " + filterString);
return true;
}
}
}
return true;
}
private void custStringToClass(Map<String, Object> map) throws ClassNotFoundException {
for (String key : map.keySet()) {
// "org.apache.kafka.common"
if (map.get(key) instanceof String && StringUtil.containsIgnoreCase(map.get(key).toString(), "org.apache.kafka.common")) {
map.put(key, Class.forName(map.get(key).toString()));
}
}
}
@Override
public void afterPropertiesSet() throws Exception {
kafkaDataSourceConfig = properties.getServer();
createKafkaDataSourceBean();
}
}
通过Spring进行初始化,代码比较简单,核心思路就是注入解析好的配置对象,然后一步步实例化成Kafka Consumer 和Kafka Template 对象。这里是最重要的地方,需要花费时间好好调试,按照自己的需求进行实例化。beanFactory.registerSingleton(factory, kafkaFac) beanFactory.registerSingleton(producerBeanName, stringStringKafkaTemplate)
这两步执行完,Spring中就有我们需要的对象了,注意这里的beanName ,很重要,使用的时候需要用到。
5. 使用
消费端使用
@EnableKafka
@Component
public class TestConsu {
@KafkaListener(topics = {"test"}, containerFactory = "one",groupId = "custgroup")
public void listenerone (String data) {
System.out.println("test1=================="+data);
}
}
这里的containerFactory 就是beanFactory.registerSingleton(factory, kafkaFac)
这里的factory,根据你的配置会实例化多个,也就可以在多个@KafkaListener
中引用。
生产端使用
@RestController
@RequestMapping(value = "/test")
@CrossOrigin
@Slf4j
public class TestController {
@Autowired
private TestService testService;
@Autowired
private KafkaConfigProperty kafkaConfigProperty;
@Autowired
private KafkaProducerCreator kafkaProducerCreator;
@Autowired(required = false)
@Qualifier("produ")
KafkaTemplate kafkaTemplate;
@Autowired(required = false)
@Qualifier("produ22")
KafkaTemplate kafkaTemplate2;
@GetMapping("/test")
public Object batchQueryLastAV(){
TestDto testDto = new TestDto();
testDto.setMsg("今天");
testDto.setName("lisi");
ListenableFuture aaaaaaaaaa = kafkaTemplate.send("test", JSON.toJSONString(testDto));
final Properties properties = System.getProperties();
return ResultUtil.success(testService.test());
}
@GetMapping("/test2")
public Object batchQueryLastAV2(){
TestDto testDto = new TestDto();
testDto.setMsg("999999999");
testDto.setName("zhangsan");
ListenableFuture aaaaaaaaaa = kafkaTemplate2.send("test2",JSON.toJSONString(testDto));
final Properties properties = System.getProperties();
return ResultUtil.success(testService.test());
}
}
这里的kafkaTemplate
需要通过@Qualifier("produ")
指定beanid, id在beanFactory.registerSingleton(producerBeanName, stringStringKafkaTemplate)
这里实例化。
6. 总结
其实还有很多可以完善的地方,比如可以定义additional-spring-configuration-metadata.json
来让yml配置提示,也可以工程化成一个依赖包,后面有时间再弄吧。
源码 https://github.com/wkfyynh/multikafka-datasource.git
更多推荐
所有评论(0)