事务消息

RocketMQ事务流程概要

RocketMQ 的事务消息,是指Producer端消息发送事件和本地事务事件,同时成功或同时失败
RocketMQ实现事务主要分为两个阶段: 正常事务的发送及提交、事务信息的补偿流程(都是针对生产者 因为事务只出现在DataBase中 有些情况需要将消息存储在数据库中 如果发生事务问题…)

整体流程为:

  • 正常事务发送与提交阶段
  1. 生产者发送一个半消息给broker(半消息是指的暂时不能消费的消息)
  2. 服务端响应
  3. 开始执行本地事务
  4. 根据本地事务的执行情况执行Commit或者Rollback
  • 事务信息的补偿流程
  1. 如果broker长时间没有收到本地事务的执行状态,会向生产者发起一个确认会查的操作请求
  2. 生产者收到确认会查请求后,检查本地事务的执行状态
  3. 根据检查后的结果执行Commit或者Rollback操作 补偿阶段主要是用于解决生产者在发送Commit或者Rollbacke操作时发生超时或失败的情况

在这里插入图片描述

RocketMQ事务流程关键

  • 事务消息在一阶段对用户不可见

事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的,也就是说消费者不能直接消费.这里RocketMQ实现方法是原消息的主题与消息消费队列,然后把主题改成RMQ_SYS_TRANS_HALF_TOPIC.这样由于消费者没有订阅这个主题,所以不会消费.

  • 如何处理第二阶段的发送消息?

在本地事务执行完成后回向Broker发送Commit或者Rollback操作,此时如果在发送消息的时候生产者出故障了,要保证这条消息最终被消费,broker就会向服务端发送回查请求,确认本地事务的执行状态.当然RocketMQ并不会无休止的发送事务状态回查请求,默认是15次,如果15次回查还是无法得知事务的状态,RocketMQ默认回滚消息(broker就会将这条半消息删除)

  • 事务的三种状态:

TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息

TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。

TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。

使用
创建生产者时我们不在简单地创建DefaultMQProducer 而是RocketMQ事务专属的 TransactionMQProducer 并且不再简单地发送消息了 而是设置一个事务监听器 setTransactionListener(new TransactionListener(){…}); 实现接口方法 并且由于监听器需要等待本地事务的执行情况我们不能再生产者发送完消息后关闭

Producer

/**
 * 事务消息生产者
 */
public class TransactionMessageProducer {
    /**
     * 事务消息监听实现
     */
    private final static TransactionListener transactionListenerImpl = new TransactionListener() {

        /**
         * 在发送消息成功时执行本地事务
         * @param msg
         * @param arg producer.sendMessageInTransaction的第二个参数
         * @return 返回事务状态
         * LocalTransactionState.COMMIT_MESSAGE:提交事务,提交后broker才允许消费者使用
         * LocalTransactionState.RollbackTransaction:回滚事务,回滚后消息将被删除,并且不允许别消费
         * LocalTransactionState.Unknown:中间状态,表示MQ需要核对,以确定状态
         */
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // TODO 开启本地事务(实际就是我们的jdbc操作)

            // TODO 执行业务代码(插入订单数据库表)
            // int i = orderDatabaseService.insert(....)
            // TODO 提交或回滚本地事务(如果用spring事务注解,这些都不需要我们手工去操作)

            // 模拟一个处理结果
            int index = 8;
            /**
             * 模拟返回事务状态
             */
            switch (index) {
                case 3:
                    System.out.printf("本地事务回滚,回滚消息,id:%s%n", msg.getKeys());
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                case 5:
                case 8:
                    return LocalTransactionState.UNKNOW;
                default:
                    System.out.println("事务提交,消息正常处理");
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }

        /**
         * Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),
         * 由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback
         * @param msg
         * @return 返回事务状态
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            // 根据业务,正确处理: 订单场景,只要数据库有了这条记录,消息应该被commit
            String transactionId = msg.getTransactionId();
            String key = msg.getKeys();
            System.out.printf("回查事务状态 key:%-5s msgId:%-10s transactionId:%-10s %n", key, msg.getMsgId(), transactionId);

            if ("id_5".equals(key)) { // 刚刚测试的10条消息, 把id_5这条消息提交,其他的全部回滚。
                System.out.printf("回查到本地事务已提交,提交消息,id:%s%n", msg.getKeys());
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                System.out.printf("未查到本地事务状态,回滚消息,id:%s%n", msg.getKeys());
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
    };

    public static void main(String[] args) throws MQClientException, IOException {
        // 1. 创建事务生产者对象
        // 和普通消息生产者有所区别,这里使用的是TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("GROUP_TEST");

        // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
        producer.setNamesrvAddr("192.168.100.242:9876");

        // 3. 设置事务监听器
        producer.setTransactionListener(transactionListenerImpl);

        // 4. 启动生产者
        producer.start();

        for (int i = 0; i < 10; i++) {
            String content = "Hello transaction message " + i;
            Message message = new Message("TopicTest", "TagA", "id_" + i, content.getBytes(RemotingHelper.DEFAULT_CHARSET));

            // 5. 发送消息(发送一条新订单生成的通知)
            SendResult result = producer.sendMessageInTransaction(message, i);

            System.out.printf("发送结果:%s%n", result);
        }

        System.in.read();
        // 6. 停止生产者
        producer.shutdown();
    }
}

Consumer 整个事务消息环节与Consumer相关不大,所以不用对原来的Consumer进行修改 正常接收消息即可.

事务消息的使用约束

  1. 事务消息不支持定时和批量
  2. 为了避免一个消息被多次检查,导致半数队列消息堆积,RocketMQ限制了单个消息的默认检查次数为15次,通过修改broker配置文件中的transactionCheckMax参数进行调整
  3. 特定的时间段之后才检查事务,通过broker配置文件参数transactionTimeout或用户配置CHECK_IMMUNITY_TIME_IN_SECONDS调整时间
  4. 一个事务消息可能被检查或消费多次
  5. 提交过的消息重新放到用户目标主题可能会失败
  6. 事务消息的生产者ID不能与其他类型消息的生产者ID共享

分布式事务场景分析

分布式事务,是一个在每个微服务项目中都绕不开的问题。常见的解决分案有通过Redis、zk、mq、seata等方式处理。这篇博文全面的分析一下RocketMq中事务消息的机制。

场景案例

事务的经典场景有很多,如银行转账、订单库存等等。相对于分布式事务来说,订单系统和库存系统间的事务场景更为形象。如:用户操作下单,我们首先需要生成一条订单信息,然后库存系统需要针对订单中的商品进行库存扣减的操作。这两步操作必须保证数据的一致性,否则会出现库存超扣等情况。

RocketMQ事务消息设计分析

第一种情况如图所示,在本地事务提交前发送事务消息。若在创建订单信息时发生了异常,而此时事务消息已经成功发送,库存系统消费事务消息就会导致订单并没有创建成功,而库存却被扣减。
在这里插入图片描述
进而有了第二种情况,如图所示,在本地事务提交完成后再发送事务消息。若在发送事务消息的过程发生了异常,如网络波动等等,将会出现订单已创建完成,而库存系统永远也监听不到消息,导致库存无法正常扣减。
在这里插入图片描述
综合第一和第二种情况,汇总成第三种方案如图所示。在本地事务执行前,先向MQ发送前置的Prepared消息,在本地事务执行完毕后,再发送确认的消息,告知MQ当前事务消息需提交/回滚。如果事务正常提交成功,那么这条消息将会被消息消费方监听到;如果事务回滚,MQ会丢弃这条消息,消息消费方无法监听到这条消息。以上情况对应 事务消息生产者的设计思路 图中的 1、2、3、4步骤。
在这里插入图片描述
继续分析,如果上图的第二步中,发送确认消息的过程中,出现异常,没有发送成功怎么办?RocketMQ会定期(默认60s)扫描Prepared消息,如果迟迟没有收到确认消息,将会执行事务回查的逻辑,主动去消息生产方确认事务状态。对应 事务消息生产者的设计思路 图中的 5、6、7步骤。综上,是事务消息中生产者的设计思路,保证本地事务和事务消息一致性。

消费事务消息

在这里插入图片描述
如上图中,在事务消息者中,如果步骤4返回了消费失败或者超时未响应的情况,怎么办?RocketMQ对待事务消息的处理和普通消息一样。如果消费失败或超时,将会把这条消息加入到重试队列中,不断是重复执行步骤3、4,如果重复的次数达到阈值,那么可能需要人工介入处理。

如果消费方本地事务执行成功,仅仅是在确认消息时失败呢? 那么这里又会出现另一个问题 重复消费? 这里就需要具体的业务模块去处理消息的幂等性。如接住Redis来处理。如在本地事务执行前先去查询redis中当前消息是否已经消费,执行成功后再向redis写入一条成功消费的记录,这样就能保证消费不会被重复消费了。

Q&A
Q:从一致性方面考虑,直接采用RPC也可以完成,RPC也支持重试,为什么还要采用MQ?

A:首先应该分清MQ和RPC的应用场景,在现在微服务的架构下,所有人都强调低耦合高内聚,做业务上的解耦,直接采用RPC的方式就会出现强依赖,与微服务的理念背道而驰。

Q:为什么事务消息消费失败后,需要人工介入处理?

A:首先对于一个复杂的系统来讲,将实现整个业务逻辑回滚的代价是巨大的,不但系统复杂度将大大提升,而且还会引入新的问题,如在回滚的过程中又出现了其他事务异常,又该如何处理?其次在一个健壮的系统中出现事务回滚的情况本来就是概率极低的情况,在程序设计时,需要衡量一下为解决这个问题付出的人力物力成本值不值得。

Q:为什么不直接在消息服务层面解决重复消费的问题?

A:消费重复消费解决可以从两个方面考虑。第一 消费方处理消息的业务逻辑保持幂等性,只要保持幂等性,不管重复消费多少次,结果都是一样的;第二保证每条消息都有唯一编号且保证消费处理成功和去重表的日志同时出现,正常情况下出现重复消费的概率并不大,如果消费系统对所有的消费都做处理的话,对系统的吞吐量和高可用会产生影响,所以最好由各自业务系统决定如果处理重复消费。

Q:RocketMQ没能从根本上结果分布式事务问题

A:RocketMQ自身没办法做到像本地事务处理添加@Transactional注解就可以完成事务的提交和回滚。如果有需要,可以尝试使用seata中间件来处理分布式事务。

参考文章:
https://blog.csdn.net/D1842501760/article/details/123142298
https://blog.csdn.net/lishuzhen5678/article/details/122666090

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐