• 场景:kafka发送消息,并且根据消息发送的不同渠道和消息类型(例如发送到WX,DingDing,邮箱),采取不同的线程池处理

1.引入依赖

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>2.7.8</version>
		</dependency>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.78</version>
		</dependency>

2.kafka配置信息

sobev.kafka.ip=xxx.x.x.x
sobev.kafka.port=9092
sobev.business.topic.name=sobevBusiness

spring.kafka.bootstrap-servers=${sobev.kafka.ip}:${sobev.kafka.port}
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto.offset.reset=earliest
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true

3.发送消息工具类

@Component
@Slf4j
public class KafkaUtils {
    @Autowired
     private KafkaTemplate kafkaTemplate;
    /**
     * 发送kafka消息
     */
    public void send(String topicName, String jsonMessage) {
        kafkaTemplate.send(topicName, jsonMessage);
    }
}

4.注册kafka消息监听器接收消息
由于存在多种消息发送渠道和消息类型,因此需要多个监听器监听不同渠道的不同类型,但是不着急,不用急着把所有类型都写一个Listener

//不同的渠道不同的消息类型都设为单独的类型,由不同consumer消费
public class GroupIdMappingUtils {
    /**
     * (不同的渠道不同的消息类型拥有自己的groupId)
     */
    public static List<String> getAllGroupIds() {
        List<String> groupIds = new ArrayList<>();
        for (ChannelType channelType : ChannelType.values()) {
            for (MessageType messageType : MessageType.values()) {
                groupIds.add(channelType.getCodeEn() + "." + messageType.getCodeEn());
            }
        }
        return groupIds;
    }
}

定义一个kafka消息监听器,
注意注解@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
prototype类型 意味着每次请求类都将返回一个新的类,利用这个特性,我们根据消息的发送渠道,类型返回多个KafkaReceiver类,并用GROUP_ID区分他们

@Component
//prototype类型 意味着每次请求类都将返回一个新的类
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class KafkaReceiver {

    @Autowired
    private TaskPendingHolder taskPendingHolder;

    @KafkaListener(topics = "#{'${sobev.business.topic.name}'}")
    public void consumer(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId){
        Optional<String> kafkaMessage = Optional.of(consumerRecord.value());
        if(kafkaMessage.isPresent()){
            String s = kafkaMessage.get();
            //具体消息类
            TaskInfo taskInfo = JSON.parseObject(s, TaskInfo.class);
            //如果消息的groupId和KafkaHeaders.GROUP_ID一样,则获取特定线程池发送执行发送消息任务
            if((taskInfo.getSendChannel() + "." + taskInfo.getMsgType()).equals(topicGroupId)){
                System.out.println(topicGroupId + " :" + s);
                //根据当前 topicGroupId 路由到不同的线程池处理
              taskPendingHolder.route(topicGroupId).execute(具体任务...)
            }
        }
    }
}

根据prototype的特性,注册多个监听器

@Service
public class KafkaReceiverStarter {

    @Autowired
    ApplicationContext applicationContext;
    
    @Autowired
    private TaskPendingHolder taskPendingHolder;

    private static List<String> groupIds = GroupIdMappingUtils.getAllGroupIds();

    private static Integer idx = 0;

    /**
     * @Receiver是Prototype类型,每次请求创建新的
     */
    @PostConstruct
    public void init(){
        for (int i = 0; i < groupIds.size(); i++) {
            applicationContext.getBean(KafkaReceiver.class);
        }
    }
    /**
     * 在执行@KafkaListener解析之前都会调用增强器
     */
    @Bean
    public static KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer groupIdEnhancer(){
        return (attrs, element) -> {
        //设置@KafkaListener内的groupId属性
            attrs.put("groupId", groupIds.get(idx++));
            return attrs;
        };
    }
}

定义线程池持有容器

@Component
public class TaskPendingHolder {
    private Map<String, ExecutorService> taskPendingHolder = new HashMap<>(32);

    /**
     * 获取得到所有的groupId
     */
    private static List<String> groupIds = GroupIdMappingUtils.getAllGroupIds();

    /**
     * 给每个渠道,每种消息类型初始化一个线程池
     */
    @PostConstruct
    public void init() {
        for (String groupId : groupIds) {
        //自定义线程池生成器 根据groupId生成不同的线程池
            MyExecutor executor = MyThreadPoolConfig.getExecutor(groupId);
            
            taskPendingHolder.put(groupId, executor);
        }
    }
    /**
     * 得到对应的线程池
     */
    public ExecutorService route(String groupId) {
        return taskPendingHolder.get(groupId);
    }


}

未完。。。

参考于github austin 消息推送平台

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐