我们都知道事务的四大特性,但是那是针对的数据库的事务。但是Rabbitmq的事务到底是表达何种意思?根据一般概念的规律来说,mq的事务和数据库事务是类似的。我们可以将mq看做是数据库。

rabbitmq提供了与三个事务相关的命令:select、commit、rollback

其中select表示将当前模式设置为标准事务模式,commit表示提交当前事务,rollback表示事物回滚。也就是说select开启事务,通过commit操作之后publish的消息一定在消息队列中,当然如果发生rollback回滚,那么消息队列中的消息就会被撤销掉。AMQP事务大概过程如下图所示:

大概得代码如下:

@Slf4j
@Configuration
public class RabbitConfig {
    /**
     * 消息转化
     * @return
     */
    @Bean
    public MessageConverter customMessageConvert() {
        return new Jackson2JsonMessageConverter();
    }
    @Bean
    public Queue directOneQueue() {
        Map map=new HashMap<>();
        map.put("x-max-priority",10);
        return new Queue("DDD",true,false,false,map);
    }
    @Bean
    public Queue directTwoQueue() {
        Map map=new HashMap<>();
        return new Queue("EEE",true,false,false,map);
    }
    /**
     * 定义一个rabbitmq消息发送器
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
//mq事务是通过事务管理器提交的,这块不能设置为手动提交
//        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setChannelTransacted(true);
//这块也和发送消息确认有关系
//        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
        return rabbitTemplate;
    }
    /**
     * 配置启用rabbitmq事务
     * @param connectionFactory
     * @return
     */
    @Bean("rabbitTransactionManager")
    public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
}

发送消息测试:

    //通过id是否为0决定是否抛出异常
    @GetMapping(value = "/test/{id}")
    @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
    public void test(@PathVariable int id) throws Exception {
        try {
            Test t=new Test();
            t.setName("tianjingle-ceshi");
            byte[] body = JSON.toJSONBytes(t, SerializeConfig.globalInstance);
            //设置消息相关属性
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setMessageId(UUID.randomUUID().toString());
            messageProperties.setContentType(MediaType.APPLICATION_JSON_VALUE);
            messageProperties.setPriority(10);
            messageProperties.setCorrelationId("tianjingle");
            messageProperties.setReplyTo("EEE");
            Message message1 =new Message(body, messageProperties);
            rabbitTemplate.convertAndSend("DDD",message1);
            int z=1/id;
        }catch (Exception e){
            throw new Exception("12");
        }
    }

事务回滚的情况。

事务提交的情况

总结:通过上述实践,我们认为AMQP的事务是完全可靠的,但是事务的加入势必会让消息队列的性能上有所损耗,因为每个步骤都需要broker做出响应。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐