springboot整合RabbitMQ消费端手动ACK确认机制
ack——acknowledge(vt. 承认;答谢;报偿;告知已收到;确认的意思),在RabbitMQ中指代的是消费者收到消息后确认的一种行为,关注点在于消费者能否实际接收到MQ发送的消息。其提供了三种确认方式:自动确认acknowledge=“none”:当消费者接收到消息的时候,就会自动给到RabbitMQ一个回执,告诉MQ我已经收到消息了,不在乎消费者接收到消息之后业务处理的成功与否。手动
ack——acknowledge(vt. 承认;答谢;报偿;告知已收到;确认的意思),在RabbitMQ中指代的是消费者收到消息后确认的一种行为,关注点在于消费者能否实际接收到MQ发送的消息。
其提供了三种确认方式:
-
自动确认acknowledge=“none”:当消费者接收到消息的时候,就会自动给到RabbitMQ一个回执,告诉MQ我已经收到消息了,不在乎消费者接收到消息之后业务处理的成功与否。
-
手动确认acknowledge=“manual”:当消费者收到消息后,不会立刻告诉RabbitMQ已经收到消息了,而是等待业务处理成功后,通过调用代码的方式手动向MQ确认消息已经收到。当业务处理失败,就可以做一些重试机制,甚至让MQ重新向消费者发送消息都是可以的。
-
根据异常情况确认acknowledge=“auto”:该方式是通过抛出异常的类型,来做响应的处理(如重发、确认等布拉不拉布拉)。这种方式比较麻烦。
当消息一旦被消费者接收到,会立刻自动向MQ确认接收,并将响应的message从RabbitMQ消息缓存中移除,但是在实际的业务处理中,会出现消息收到了,但是业务处理出现异常的情况,在自动确认的模式下,该条业务处理失败的message就相当于被丢弃了。
如果设置了手动确认,则需要在业务处理完成之后,手动调用channel.basicAck(),手动的签收,
如果业务处理失败,则手动调用channel.basicNack()方法拒收,并让MQ重新发送该消息。
如果不做任何关于acknowledge的配置,默认就是自动确认签收的。
消费端手动ACK确认机制
1. 消费端配置:开启ack
2. 通过channel的basicAck方法手动签收,通过basicNack方法拒绝签收
/*
通过channel的basicAck方法手动签收,通过basicNack方法拒绝签收
*/
@Override
@RabbitListener(queues = {RabbitMQConfig.DIRECT_QUEUE}) //监听
public void receiveMessage(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try{
//String message= (String)amqpTemplate.receiveAndConvert(RabbitMQConfig.DIRECT_QUEUE);
System.out.println("接收的mq消息:"+message);
// 业务处理 异常测试
// System.out.println("业务处理"+1/0);
// long deliveryTag 消息接收tag boolean multiple 是否批量确认
System.out.println("deliveryTag="+deliveryTag);
/**
* 无异常就确认消息
* basicAck(long deliveryTag, boolean multiple)
* deliveryTag:取出来当前消息在队列中的的索引;
* multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认
* deliveryTag为5及其以下的消息;一般设置为false
*/
if(deliveryTag==5){
channel.basicAck(deliveryTag,true);
}
}catch (Exception e){
e.printStackTrace();
/**
* 有异常就绝收消息
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
* requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
* false:将消息丢弃
*/
// long deliveryTag, boolean multiple, boolean requeue
try {
channel.basicNack(deliveryTag,false,true);
// long deliveryTag, boolean requeue
// channel.basicReject(deliveryTag,true);
Thread.sleep(1000); // 这里只是便于出现死循环时查看
/*
* 一般实际异常情况下的处理过程:记录出现异常的业务数据,将它单独插入到一个单独的模块,
* 然后尝试3次,如果还是处理失败的话,就进行人工介入处理
*/
} catch (Exception e1) {
e1.printStackTrace();
}
}
3. 测试
3.1 测试消费1条消息
由于代码里面deliveryTag为5才会确认,所以这条消息Unacked,如下图:
3.2 测试消费5条消息
3.3 拒签测试
更多推荐
所有评论(0)