如何实现kafka的消息重发机制
如何实现kafka的消息重发机制最近做到项目中有手机推送,和短信推送相关,既然做到推送相关,肯定会遇到发送失败的问题,在并发量很高的情况下,很难确保每一条推送或短信都成功发送给用户。因此就需要失败重发机制,失败重发可以通过代码逻辑去实现(将失败的推送已日志的形式存入数据库,再手写定时任务去重新发送),但是高并发的情况下,通过查询数据库的方式效率会很慢,因此在项目中引用了kafka消息队列。相比于r
如何实现kafka的消息重发机制
最近做到项目中有手机推送,和短信推送相关,既然做到推送相关,肯定会遇到发送失败的问题,在并发量很高的情况下,很难确保每一条推送或短信都成功发送给用户。因此就需要失败重发机制,失败重发可以通过代码逻辑去实现(将失败的推送已日志的形式存入数据库,再手写定时任务去重新发送),但是高并发的情况下,通过查询数据库的方式效率会很慢,因此在项目中引用了kafka消息队列。相比于rabbitmq等kafka是没有自己的重发机制的。本文介绍如何通过kafka自己设计一套重发机制。
- 生产者创建topic
springboot集成kafka的教程暂时就不贴了,首先创建三个topic,分别为正常队列,第一次重发队列,第二次重发队列,代码入下:
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @description: kafka生产者配置
* @throws
* @author zhu.sm
* @date 2022/3/2 13:29
*/
@Configuration
public class KafkaInitialConfiguration {
// 创建一个名为realDataVos的Topic并设置分区数为8,分区副本数为2
@Bean
public NewTopic pushAll() {
return new NewTopic("pushAll", 8, (short) 2);
}
//重试队列1
@Bean
public NewTopic resetPushAllFirst() {
return new NewTopic("resetPushAllFirst", 8, (short) 2);
}
//重试队列2
@Bean
public NewTopic resetPushAllSecond() {
return new NewTopic("resetPushAllSecond", 8, (short) 2);
}
}
2.消费者消费
业务中的所有推送消息只会进"pushAll"正常发送的队列,消息体如下:
@Data
public class PushForm implements Serializable {
@ApiModelProperty(value="用户ids",required = true)
@NotEmpty
private String[] userIds;
@ApiModelProperty(value="推送标题",required = true)
@NotBlank
private String title;
@ApiModelProperty(value="推送内容",required = true)
@NotBlank
private String body;
@ApiModelProperty(value="参数",required = false)
private Map<String,String> data;
}
userIds中是需要推送的所有用户id,我们需要将成功的id消费掉,失败的id组成新的消息体,进入"resetPushAllFirst"重试队列1:
/**
* kafka
*
* @param records
* @return
*/
@KafkaListener(id = "pushAll", topics = {"pushAll"}, containerFactory = "infoPushKafkaListenerContainerFactory")
public ResponseData<Void> pushAll(List<ConsumerRecord<?, ?>> records) {
log.info("pushAll推送消息进入消费");
for (ConsumerRecord<?, ?> record : records) {
String message = (String) record.value();
PushForm pushForm = JSONUtil.toBean(message, PushForm.class);
//推送业务逻辑
PushForm errPushForm = pushService.pushAll(pushForm.getUserIds(), "", pushForm.getTitle(), pushForm.getBody(), pushForm.getData(),0);
if (errPushForm == null) {
log.info("推送已消费:{}", message);
} else {
log.info("失败消费:{}",errPushForm.toString());
//errPushForm为失败的消息体,进入重发队列1
kafkaTemplate.send("resetPushAllFirst", JSONUtil.toJsonStr(errPushForm));
}
}
log.info("pushAll结束消费");
return ResponseData.success();
}
pushAll业务方法中,处理推送失败的userId,重新塞入消息体:
if (errUserIds != null && errUserIds.size() > 0) {
PushForm pushForm = new PushForm();
pushForm.setUserIds(errUserIds.toArray(new String[errUserIds.size()]));
pushForm.setTitle(title);
pushForm.setBody(body);
pushForm.setData(data);
return pushForm;
} else {
return null;
}
第一次重发队列消费:
@KafkaListener(id = "resetPushAllFirst", topics = {"resetPushAllFirst"}, containerFactory = "infoPushKafkaListenerContainerFactory")
public ResponseData<Void> resetPushAllFirst(List<ConsumerRecord<?, ?>> records) {
log.info("第一次重试推送消息进入消费");
for (ConsumerRecord<?, ?> record : records) {
String message = (String) record.value();
PushForm pushForm = JSONUtil.toBean(message, PushForm.class);
PushForm errPushForm = pushService.pushAll(pushForm.getUserIds(), "", pushForm.getTitle(), pushForm.getBody(), pushForm.getData(),1);
if (errPushForm == null) {
log.info("第一次重试推送已消费:{}", message);
} else {
//逻辑一样,入股第一次重发还是失败,将失败的用户重新塞入消息体,进入第二次重发
kafkaTemplate.send("resetPushAllSecond", JSONUtil.toJsonStr(errPushForm));
}
}
// ack.acknowledge();
log.info("pushAll结束消费");
return ResponseData.success();
}
第二次重发队列:
一共发送三次,每一次的记录都会记录在数据库中,如果第三次还是失败的话,不自动重发,从系统中去手动重发,排查原因,具体根据什么需求就重发几次,我们的项目中目前是需要三次。
@KafkaListener(id = "resetPushAllSecond", topics = {"resetPushAllSecond"}, containerFactory = "infoPushKafkaListenerContainerFactory")
public ResponseData<Void> resetPushAllSecond(List<ConsumerRecord<?, ?>> records) {
log.info("第二次重试推送消息进入消费");
for (ConsumerRecord<?, ?> record : records) {
String message = (String) record.value();
PushForm pushForm = JSONUtil.toBean(message, PushForm.class);
PushForm errPushForm = pushService.pushAll(pushForm.getUserIds(), "", pushForm.getTitle(), pushForm.getBody(), pushForm.getData(), 2);
if (errPushForm == null) {
log.info("第二次重试推送已消费:{}", message);
}
}
return ResponseData.success();
}
页面上可以看到,每一次的推送日志,三次都发送失败的可以手动重发:
本文主要讲的是使用kafka实现重发机制,可以参考博主的设计思路结合项目的具体需求去做。
更多推荐
所有评论(0)