![cover](https://img-blog.csdnimg.cn/b2a8ac8e2c20418cab9a4bdd2c509cf7.png)
springboot kafka发送消息
场景:kafka发送消息,并且根据消息发送到不同的渠道类型(例如发送到WX,DingDing,邮箱),采取不同的线程池处理1.引入依赖<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><versio
·
- 场景:kafka发送消息,并且根据消息发送的不同渠道和消息类型(例如发送到WX,DingDing,邮箱),采取不同的线程池处理
1.引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.8</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
2.kafka配置信息
sobev.kafka.ip=xxx.x.x.x
sobev.kafka.port=9092
sobev.business.topic.name=sobevBusiness
spring.kafka.bootstrap-servers=${sobev.kafka.ip}:${sobev.kafka.port}
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto.offset.reset=earliest
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true
3.发送消息工具类
@Component
@Slf4j
public class KafkaUtils {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 发送kafka消息
*/
public void send(String topicName, String jsonMessage) {
kafkaTemplate.send(topicName, jsonMessage);
}
}
4.注册kafka消息监听器接收消息
由于存在多种消息发送渠道和消息类型,因此需要多个监听器监听不同渠道的不同类型,但是不着急,不用急着把所有类型都写一个Listener
//不同的渠道不同的消息类型都设为单独的类型,由不同consumer消费
public class GroupIdMappingUtils {
/**
* (不同的渠道不同的消息类型拥有自己的groupId)
*/
public static List<String> getAllGroupIds() {
List<String> groupIds = new ArrayList<>();
for (ChannelType channelType : ChannelType.values()) {
for (MessageType messageType : MessageType.values()) {
groupIds.add(channelType.getCodeEn() + "." + messageType.getCodeEn());
}
}
return groupIds;
}
}
定义一个kafka消息监听器,
注意注解
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
prototype
类型 意味着每次请求类都将返回一个新的类,利用这个特性,我们根据消息的发送渠道,类型返回多个KafkaReceiver
类,并用GROUP_ID
区分他们
@Component
//prototype类型 意味着每次请求类都将返回一个新的类
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class KafkaReceiver {
@Autowired
private TaskPendingHolder taskPendingHolder;
@KafkaListener(topics = "#{'${sobev.business.topic.name}'}")
public void consumer(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId){
Optional<String> kafkaMessage = Optional.of(consumerRecord.value());
if(kafkaMessage.isPresent()){
String s = kafkaMessage.get();
//具体消息类
TaskInfo taskInfo = JSON.parseObject(s, TaskInfo.class);
//如果消息的groupId和KafkaHeaders.GROUP_ID一样,则获取特定线程池发送执行发送消息任务
if((taskInfo.getSendChannel() + "." + taskInfo.getMsgType()).equals(topicGroupId)){
System.out.println(topicGroupId + " :" + s);
//根据当前 topicGroupId 路由到不同的线程池处理
taskPendingHolder.route(topicGroupId).execute(具体任务...)
}
}
}
}
根据prototype的特性,注册多个监听器
@Service
public class KafkaReceiverStarter {
@Autowired
ApplicationContext applicationContext;
@Autowired
private TaskPendingHolder taskPendingHolder;
private static List<String> groupIds = GroupIdMappingUtils.getAllGroupIds();
private static Integer idx = 0;
/**
* @Receiver是Prototype类型,每次请求创建新的
*/
@PostConstruct
public void init(){
for (int i = 0; i < groupIds.size(); i++) {
applicationContext.getBean(KafkaReceiver.class);
}
}
/**
* 在执行@KafkaListener解析之前都会调用增强器
*/
@Bean
public static KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer groupIdEnhancer(){
return (attrs, element) -> {
//设置@KafkaListener内的groupId属性
attrs.put("groupId", groupIds.get(idx++));
return attrs;
};
}
}
定义线程池持有容器
@Component
public class TaskPendingHolder {
private Map<String, ExecutorService> taskPendingHolder = new HashMap<>(32);
/**
* 获取得到所有的groupId
*/
private static List<String> groupIds = GroupIdMappingUtils.getAllGroupIds();
/**
* 给每个渠道,每种消息类型初始化一个线程池
*/
@PostConstruct
public void init() {
for (String groupId : groupIds) {
//自定义线程池生成器 根据groupId生成不同的线程池
MyExecutor executor = MyThreadPoolConfig.getExecutor(groupId);
taskPendingHolder.put(groupId, executor);
}
}
/**
* 得到对应的线程池
*/
public ExecutorService route(String groupId) {
return taskPendingHolder.get(groupId);
}
}
未完。。。
参考于github austin 消息推送平台
更多推荐
所有评论(0)