利用redis实现,因此先导入Redis依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

编写配置文件,增加redis的ip和端口配置

spring:
  rabbitmq:
    host: 192.168.200.129
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        acknowledge-mode: manual
    publisher-confirm-type: simple
    publisher-returns: true
  redis:
    host: 192.168.200.129
    port: 6379

修改生产者,

@Test
void contextLoads() throws IOException {
    CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());//data,id的d巧记
    
    rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!",messageId);//增加一个data即封装id的对象,方法重载
    System.in.read();
}

修改消费者,利用redis和setnx方法的特点和id控制标记法,解决消息被消费者重复消费的问题

@Autowired
private StringRedisTemplate redisTemplate;//换类StringRedisTemplate是RedisTemplate的子类,更加强大

@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
    //0. 通过消息属性的头,或者spring被动返回的消息相关,数据比如获取MessageId
    String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
    
    //1. 设置key到Redis
    if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {//原来是10秒测试redis中是否有消费完消息后id为1的数据,设置为10秒太快消失不便于下面的测试查看
        //2. 消费消息,暂时先打印消息来模拟消费,至于消息以后用来干啥得看具体的需求↓
        System.out.println("接收到消息:" + msg);

        //3. 设置key的value为1
        redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
        //4.  手动ack
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }else {
        //5. 获取Redis中的value即可 如果是1,手动ack
        if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    }
}

启动测试,可以看到效果。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐