业务场景中,我们可能会遇到消息队列顺序消费的情况,那么怎么保证顺序消费呢

一.首先 我们要保证消息投递的顺序性

1.RocketMQ中(RabbitMQ中同理)我们要确保我们需要顺序消费的消息进入同一个queue中

顺序发布:对于指定的一个 Topic,客户端将按照一定的先后顺序发送消息

举例:订单的顺序流程是:创建、付款、物流、完成,订单号相同的消息会被先后发送到同一个队列中,

根据MessageQueueSelector里面自定义策略,根据同个业务id放置到同个queue里面,如订单号取模运算再放到selector中,同一个模的值都会投递到同一条queue

public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            //如果是订单号是字符串,则进行hash,得到一个hash值
          Long id = (Long) arg;
          long index = id % mqs.size();
          return mqs.get((int)index);
   }

2.kafka中我们要确保我们的消息进入同一个partition中,消息投递的时候需要制定partition,代码中的1即是partition值

            producer.send(new ProducerRecord<>("mumu-topic",1,"zwt-key","zwt-value"), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null){
                        System.out.println("发送状态:"+metadata.toString());
                    }else{
                        exception.printStackTrace();
                    }
                }
            });

二.其次 我们要保证消费端消费的顺序性

1.RocketMQ中(RabbitMQ中同理)应该使用MessageListenerOrderly,自带单线程消费消息,不能再Consumer端再使用多线程去消费,消费端分配到的queue数量是固定的,集群消费会锁住当前正在消费的队列集合的消息,所以会保证顺序消费。

2.kafka中消费端消费组只能有一个节点,这样就确保了消息的有序性

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐