重复消费问题

为了解决消费端因为种种原因而造成的消息丢失问题,我们都知道根源在于因为RabbitMQ的自动ack机制,所以为了避免以上问题,我们会选中手动ack,以确保消息不会因为某些原因而丢失。

但随之而来的也有一个问题如果忘记ack,或者又因为种种原因消费者端没能给RabbitMQ对应ack,无法确认消息已经被消费完了,那这条未被“约束”的消息也许就会被另一个消费者消费,就会造成重复消费问题

如果是进行增加,或者一些非幂等性操作,比如扣费业务,那可就完犊子了

而其中用Redis似乎是对解决重复消费问题的一个比较好的方案:

思路如下:

在消费者消费消息之前,先将消息的id放到Redis中

最明了的方式:设置id为0时为正在执行业务,id为1是为业务执行成功,消费完毕

若手动ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,若key已经存在,则获取他的值,如果值为0,即当前消息还没消费完,当前消费者就什么都不做,如果值为1,则直接ack操作。

当然,这也会有一种非常严重的隐患

若第一个消费者在执行业务时,出现了死锁问题,便会无法获取key的资源

为此我们应该在将消息id放入Redis时,为它设置一个生存时间,也可以叫过期时间,避免在极端情况下的死锁问题。

 

话不多,先来测试实践一下

一、测试依赖导导导

<dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

 

 二、在测试前用了一个先前编写好的创建连接的工具类,就不用一直配置连接信息

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQClient {

    public static Connection getConnection(){
        // 创建Connection工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("test");
        factory.setPassword("test");
        factory.setVirtualHost("/test");

        // 创建Connection
        Connection conn = null;
        try {
            conn = factory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 返回Connection
        return conn;
    }
}

 三、生产者代码实现

import java.util.UUID;


public class Publisher {
    @Test
    public void publish() throws Exception {
        //使用工具类创建消息队列连接
        Connection connection = RabbitMQClient.getConnection();
        //创建Channel
        Channel channel = connection.createChannel();
        //指定路由规则及消息属性
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(1)  //0:消息不持久化 1:消息持久化
                .messageId(UUID.randomUUID().toString())   //生成随机信息id
                .build();
        String msg = "This is MQ Test!"; //消息体
        channel.basicPublish("","MqTest",true,properties,msg.getBytes());
        // 参数1:exchange:指定exchange,使用"",使用默认的交换机
        // 参数2:routingKey:指定路由的规则,使用具体的队列名称。
        // 参数3:mandatory:若exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,会将消息返还给生产者。
        // 参数4:BasicProperties :指定传递的消息所携带的properties属性。
        // 参数5 byte[]:指定发布的具体消息

        System.out.println("生产者发布消息成功!");
        channel.close();
        connection.close();
    }
}

四、消费者代码实现

import com.yj.utils.RabbitMQClient;
import com.rabbitmq.client.*;
import org.junit.Test;
import redis.clients.jedis.Jedis;

import java.io.IOException;


public class Consumer {
    @Test
    public void consume() throws Exception {
        Connection connection = RabbitMQClient.getConnection();
        Channel channel = connection.createChannel();

        //声明队列
        channel.queueDeclare("MqTest",true,false,false,null);
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true,设置false后MQ重启后队列全部删除)
        //参数3:exclusive - 是否排外(conn.close()当前队列会被自动删除,还有当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        //自定义监听器监听队列
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                //1、连接redis
                Jedis jedis = new Jedis("127.0.0.1", 6379);
                //2、获取队列中消息id
                String messageId = properties.getMessageId();
                //3、setnx到redis中,value默认为0,设置过期时间10s
                String result = jedis.set(messageId, "0", "NX", "EX", 10);

                if (result != null && result.equalsIgnoreCase("OK")){
                    System.out.println("接收到信息:" + new String(body,"UTF-8"));

                    //4、若成功添加,即消费成功,设置消息id,表示正在执行业务
                    //这里就不设置1了,6868为了在测试数据库中显眼些,
                    jedis.set(messageId,"68686868");

                    //5、手动ack
                    channel.basicAck(envelope.getDeliveryTag(),false);
                    //deliveryTag:该消息的index;
                    //multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

                }else {
                    //6、如果2中的setnx失败,获取key对应的value,如果是0什么都不做,如果是1则手动ack
                    String s = jedis.get(messageId);
                    if ("1".equalsIgnoreCase(s)){
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }
            }
        };

        channel.basicConsume("MqTest",true,consumer);
        System.out.println("消费者开始监听队列!");
        System.in.read();//让程序不停止,只有键盘录入数据才会往下走

        // 释放资源
        channel.close();
        connection.close();
    }
}

五、简单测试一下

 

 

测试成功!

 终:看来还是可行的+_+

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐