SpringBoot如何实现,避免消息重复消费
利用redis实现,因此先导入Redis依赖:<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>编写配置文件,增加redis的ip和
·
利用redis实现,因此先导入Redis依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
编写配置文件,增加redis的ip和端口配置
spring:
rabbitmq:
host: 192.168.200.129
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
acknowledge-mode: manual
publisher-confirm-type: simple
publisher-returns: true
redis:
host: 192.168.200.129
port: 6379
修改生产者,
@Test
void contextLoads() throws IOException {
CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());//data,id的d巧记
rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!",messageId);//增加一个data即封装id的对象,方法重载
System.in.read();
}
修改消费者,利用redis和setnx方法的特点和id控制标记法,解决消息被消费者重复消费的问题
@Autowired
private StringRedisTemplate redisTemplate;//换类StringRedisTemplate是RedisTemplate的子类,更加强大
@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
//0. 通过消息属性的头,或者spring被动返回的消息相关,数据比如获取MessageId
String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
//1. 设置key到Redis
if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {//原来是10秒测试redis中是否有消费完消息后id为1的数据,设置为10秒太快消失不便于下面的测试查看
//2. 消费消息,暂时先打印消息来模拟消费,至于消息以后用来干啥得看具体的需求↓
System.out.println("接收到消息:" + msg);
//3. 设置key的value为1
redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 获取Redis中的value即可 如果是1,手动ack
if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
启动测试,可以看到效果。
更多推荐
已为社区贡献3条内容
所有评论(0)