作为rabbitMQ的生产者,发送消息到MQ的过程中,是通过routingkey发送给交换机,由交换机进行路由,把信息发送的最终的队列中。而rabbitMQ消费的时候,是要明确指明消费的队列的。

消费模式

rabbitMQ的消费模式分为两种,推模式和拉模式。推模式使用的是Basic.Consume 进行消费,而拉模式通过调用Basic.Get进行消费。推模式用于持续的获取消息,在推模式中,RabbitMQ会不断的推送消息给消费者,不过推送的数量可以通过Basic.Qos进行限制。拉模式可以单条的获取信息。

消费端的确认和拒绝

为了保证消息可以从队列可靠的到达消费者,RabbitMQ提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数。

  • 如果autoAck参数为true,RabbitMQ会自动把发送出去的消息置为确认,并且从内存(磁盘)中删除,不关消费者是否真正进行了正确消费。
  • 如果autoAck参数设置为false,RabbitMQ会等待消费者显式地回复确认信号后才从内存或者磁盘中删除。

采用消息确认机制后,只要设置autoAck为false,消费者就有足够的时间处理消息,不用担心处理消息过程中,消费者进程断掉导致消息丢失的问题。RabbitMQ会一直等到消费者显式调用Basic.ack命令为止。

Springboot使用amqp进行消费

在springboot中,对消息进行消费有两种方式。

  • 轮询:使用rabbitMQTemplate.receive()等相关方法进行消费,每次消费一条。
  • 注册侦听器:这个方式也是更为灵活,更为常用的。通过@Bean设置监听器端点和@RabbitListener注解的方式实现。

代码演示

 @Test
    public void receive() throws UnsupportedEncodingException {
        Message receive = rabbitTemplate.receive("queue-msg”);//指定队列名称
        System.out.println(new String(receive.getBody(),"utf-8"));
        Object o = rabbitTemplate.receiveAndConvert("queue-msg”);//可以支持类型转换
        System.out.println(o);
    }


这是最简单的消费方式,但是可以看到,虽然简单,功能也很少。
那么更常用的是使用配置监听器的方式

@Component
public class RabbitMQReceiver {

    @RabbitListener(queues = "test.topic",ackMode = "MANUAL")
    @RabbitHandler()
    public void receive(String msg,Channel channel, Message message) throws IOException, InterruptedException {
        System.out.println("接收到消息:RabbitMQReceiver"+message);
        Thread.sleep(2000);
        channel.basicQos(2);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }


    @RabbitListener(queues = "test.topic",ackMode = "MANUAL")
    @RabbitHandler()
    public void receive2(String msg,Channel channel, Message message) throws IOException, InterruptedException {
        System.out.println("wqerdf:RabbitMQReceiver"+message);
        Thread.sleep(2000);
        channel.basicQos(2);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}
/**
这里通过RabbitListener注解指定了队列名称,看到queues应该已经想到了,可以指定多个; ackMode指定了消息需要人为的确认
需要注意的是,如果多个消费者方法,每个方法都要有一个RabbitListener
*/
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐