springboot中kafka监听不同topic,走不同代码逻辑的模式实现思路(便于扩展维护,解耦合)
kafka消息监听
·
一、建一个工厂类实现如下,springboot启动时将不同的监听实现类,存于定义的map中,便于监听后调用。
@Component
@RequiredArgsConstructor
public class KafkaFactory implements InitializingBean, ApplicationContextAware {
private ApplicationContext appContext;
private static final Map<String, KafkaDeal> KAFKA_MAP = new HashMap<>();
public static KafkaFactory getHandler(String topic) {
return KAFKA_MAP.get(topic);
}
@Override
public void afterPropertiesSet() throws Exception {
appContext.getBeansOfType(KafkaFactory.class).values().forEach(handler -> {
KAFKA_MAP.put(handler.getTopic(), handler);
});
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
appContext = applicationContext;
}
}
二、接口 KafkaDeal,定义kafka监听实现类的动作,例如当前实现类 监听的topic名称、执行代码等。以后若新增监听,只需增加此接口的实现类.
public interface KafkaDeal {
String getTopic();
void excute(String msg);
}
三、 KafkaDeal的监听topic实现类(举例,多监听多实现类)
@Service
@RequiredArgsConstructor
@Slf4j
public class ThirdDocKafkaDealBankGua implements ThirdDocKafkaDeal {
@Override
public String getTopic() {
return "kafka_topicName1";
}
@Override
public void excute(String msg) {
//监听执行的逻辑代码.....
}
}
四、kafka监听代码,调用KafkaFactory
/*
* kafka消息监听
*/
@Component
@Slf4j
public class ExternalPaymentListener {
/**
* @param message
*/
@KafkaListener(groupId = "kafka_group1", topics = "kafka_topicName1")
public void TopicMessage1(String message) {
KafkaDeal deal = KafkaFactory.getHandler("kafka_topicName1");
deal.excute(message);
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)