一、建一个工厂类实现如下,springboot启动时将不同的监听实现类,存于定义的map中,便于监听后调用。 

@Component
@RequiredArgsConstructor
public class KafkaFactory implements InitializingBean, ApplicationContextAware {
    private ApplicationContext appContext;

    private static final Map<String, KafkaDeal> KAFKA_MAP = new HashMap<>();

    public static KafkaFactory getHandler(String topic) {

        return KAFKA_MAP.get(topic);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        appContext.getBeansOfType(KafkaFactory.class).values().forEach(handler -> {
            KAFKA_MAP.put(handler.getTopic(), handler);
        });
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        appContext = applicationContext;
    }

}

二、接口 KafkaDeal,定义kafka监听实现类的动作,例如当前实现类 监听的topic名称、执行代码等。以后若新增监听,只需增加此接口的实现类.

public interface KafkaDeal {

    String getTopic();

    void excute(String msg);
}

三、 KafkaDeal的监听topic实现类(举例,多监听多实现类)

@Service
@RequiredArgsConstructor
@Slf4j
public class ThirdDocKafkaDealBankGua implements ThirdDocKafkaDeal {


    @Override
    public String getTopic() {
        return "kafka_topicName1";
    }

    @Override
    public void excute(String msg) {
    //监听执行的逻辑代码.....
    }
}

四、kafka监听代码,调用KafkaFactory 

/*
 * kafka消息监听
 */

@Component
@Slf4j
public class ExternalPaymentListener {
     /**
     * @param message
     */
    @KafkaListener(groupId = "kafka_group1", topics = "kafka_topicName1")
    public void TopicMessage1(String message) {
        KafkaDeal deal = KafkaFactory.getHandler("kafka_topicName1");
        deal.excute(message);
    }

}

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐