参考学习: www.mayikt.com

RabbitMQ

一、SpringBoot整合RabbitMQ

Springboot整合RabbitMQ使用RabbitTemplate模板

1.1 Product生产者:

1.1.1 导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.1.2 使用JAVA配置类初始化交换机、对列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitmq 配置交换机 - 队列
 *
 * @author zx
 * @date 2022年04月24日 9:11
 */
@Configuration
public class RabbitMQConfig {
    //用户登录后发送 微信公众号信息 和 记录日志
    /**
     * 定义交换机 用户登录交换机
     */
    private String EXCHANGE_USERLOGIN_NAME = "/mayikt_user_login";

    /**
     * 微信公众号队列
     */
    private String FANOUT_WX_QUEUE = "fanout_wx_queue";
    /**
     * 日志队列
     */
    private String FANOUT_LOG_QUEUE = "fanout_log_queue";
    /**
     * topic 主题对应的交换机
     */
    public static final String EXCHANGE_TOPIC_NAME = "boot_topic_exchange";
    /**
     * topic 主题模式对应的队列
     */
    public static final String QUEUE_TOPIC_NAME = "boot_queue_topic";


    /**
     * 配置smsQueue
     */
    @Bean
    public Queue wxQueue() {
        return new Queue(FANOUT_WX_QUEUE);
    }

    /**
     * 配置emailQueue
     */
    @Bean
    public Queue logQueue() {
        return new Queue(FANOUT_LOG_QUEUE);
    }

    /**
     * 配置fanoutExchange
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_USERLOGIN_NAME);
    }


    /**
     * 绑定交换机 sms
     *
     * @param wxQueue        wx通知队列
     * @param fanoutExchange 交换机
     *                       sms队列绑定到交换机
     */
    @Bean
    public Binding bindingSmsFanoutExchange(Queue wxQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(wxQueue).to(fanoutExchange);
    }

    /**
     * 绑定交换机 email
     *
     * @param logQueue       用户登录日志队列
     * @param fanoutExchange 交换机
     *                       email队列绑定到交换机
     */
    @Bean
    public Binding bindingEmailFanoutExchange(Queue logQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(logQueue).to(fanoutExchange);
    }


    //------------------------topic主题模式-----------------------------------------


    /**
     * 定义交换机
     */
    @Bean("bootExchange")
    public Exchange bootExchange() {
        //durable 是否持久化默认为true
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_NAME).durable(true).build();
    }


    /**
     * 定义对列
     */
    @Bean("bootQueue")
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE_TOPIC_NAME).build();
    }


    /**
     * 对列和交换机绑定
     */
    @Bean
    public Binding bindQueueExchange(Exchange bootExchange, Queue bootQueue) {
        return BindingBuilder.bind(bootQueue).to(bootExchange).with("boot.#").noargs();
    }
}

1.1.3 发送消息

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsgStr")
    @ApiOperation("发送消息(主题模式)")
    public BaseResponse<?> sendMsgStr(String msg) {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC_NAME, "boot.haha", msg);
        return BaseResponse.ok();
    }

2.1 Comsumer 消费者

2.1.1 导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.1.2 监听消息

监听消息自动应答

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * rabbitmq 监听对列 [topic模式]
 * @author zx
 * @date 2022年04月26日 21:56
 */
@Component
@Slf4j
public class TopicListenerServer {

    @RabbitListener(queues = "boot_queue_topic")
    public void topicListener(Message message){
        log.info("topic主题接收到消息:{}",message);
    }
}

二、消息可靠投递

消息可靠投递这种方式可以用来解决分布式事物。确保消息投递到Broker、确保消息消费,如果消费消息失败MQ自动重试(网络延迟造成消费失败),另外如果是代码层面抛出异常导致消费消息失败我们可以进行记录日志人工补偿

依赖还是导入springboot整合rabbitmq的依赖,配置类也是通用(测试的时候使用上面创建好的topic 交换机和队列—注意,在测试的时候使用topic主题并没有在启动的时候去创建,而是使用到发送消息的时候才会在broker上去为我们自动创建【懒加载模式】)

RabbitMQ提供了2中方式控制消息可靠投递模式:

  • confirm 确认模式
  • return 退回模式

RabbitMQ整个消息投递路径:

生产者product ------>Broker 【MQ服务】——>Exchange交换机 ——>Queue对列 ——消费者comsumer

  • 消息投递到Exchange交换机,则会返回一个confirmCallback (√)
  • Exchange交换机到Queue,如果失败则会返回一个returnCallback

利用两个callback控制消息的可靠投递

2.1 Product生产者确保消息投递到Broker

2.1.1 开启confirm模式

spring:
  rabbitmq:
    ####连接地址
    host: 192.168.0.101
    ####端口号
    port: 5672
    ####账号
    username: admin
    ####密码
    password: admin
    ### 地址
    virtual-host: /mayikt-user
    # 确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    # 确认消息已发送到队列
    publisher-returns: true

2.1.2 设置回调函数

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * 确认机制(消息发送到服务回调)
 */
@Component
@Slf4j
public class RabbitmqSendCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void run() {
        rabbitTemplate.setConfirmCallback(this);
        设置交换机处理失败消息的处理模式 如果消息没有路由到queue,则返回消息发送方法
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String faileCause) {
        if (ack) {
            log.info("消息发送成功");
        } else {
            log.info("消息发送失败,进行容错处理");
        }
        log.info("消息发送到交换机时的回调函数, ack:" + ack + "  消息体:" + faileCause);
    }


    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("消息从交换机发送到队列时失败的回调函数, 调用失败!!!"+returned);
    }
}


2.1.3 发送消息

    @Autowired
    private RabbitTemplate rabbitTemplate;
 /**
     * 确认模式
     * 步骤:
     * 1.开启确认模式     # 确认消息已发送到交换机(Exchange)
     * publisher-confirm-type: correlated
     * # 确认消息已发送到队列
     * publisher-returns: true
     * <p>
     * 2.在rabbittemplate定义confirmcallback回调函数
     *
     * 回退模式:
     * 步骤:
     * 1.开启回退模式
     * 2.设置returnCallback
     * 3.设置exchang处理消息的模式:
     *      3.1 如果消息没有路由到queue,默认是丢弃
     *      3.2 如果消息没有路由到queue,则返回消息发送方法
     */

    @GetMapping("/sendMsgConfirm")
    @ApiOperation("发送消息(主题模式)确认模式")
    public BaseResponse<?> sendMsgConfirm(String msg) {
        //product --->exchange 【confirmCallback 回调函数】
        //rabbitTemplate.setConfirmCallback((correlationData, ack, faileCause) -> {
        //    log.info("correlationData:{}", correlationData);
        //    log.info("ack:{}", ack);
        //    log.info("faileCause:{}", faileCause);
        //});

        //exchange ----routingKey---->queue 【returnsCallback 回调函数】
        //设置交换机处理失败消息的处理模式 如果消息没有路由到queue,则返回消息发送方法
        //rabbitTemplate.setMandatory(true);


        //rabbitTemplate.setReturnsCallback(returned -> log.info("returnedMessage:{}", returned));


        String routingKey = RabbitMQConfig.CONFIRM_ROUTING_KEY;
        String exchange = RabbitMQConfig.EXCHANGE_TOPIC_CONFIRM;

        //如果交换机名称写错了。那么confirmCallback 回调失败的。
        //如果设置routingkey 是一个错误,那么returncallback 会进行回调调用
        rabbitTemplate.convertAndSend(exchange, routingKey, msg);




        //CorrelationData correlationData = new CorrelationData();
        //correlationData.setId("xxxx");
        //correlationData.setReturned(new ReturnedMessage());
        //rabbitTemplate.convertAndSend(exchange, routingKey, msg,new CorrelationData());

        return BaseResponse.ok();
    }

2.2 Comsumer消费端

2.2.1 修改配置为手动ACK模式

spring:
  rabbitmq:
    ####连接地址
    host: 192.168.0.101
    ####端口号
    port: 5672
    ####账号
    username: admin
    ####密码
    password: admin
    ### 地址
    virtual-host: /mayikt-user
    listener:
      simple: ## 修改配置为手动ack模式
        acknowledge-mode: manual

  application:
    name: mq-consuer-server   # 应用名称
  datasource:
    type: com.zaxxer.hikari.HikariDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.32.101:3306/test_query?useSSL=false&serverTimezone=GMT%2B8&characterEncoding=utf8&allowMultiQueries=true&autoReconnect=true
    hikari:
      username: root
      password: root
      connection-test-query: select 1
      maximum-pool-size: 5

# mybatis-plus 配置
mybatis-plus:
  mapper-locations: classpath:/mapper/**/*.xml
  global-config:
    db-config:
      id-type: auto
      logic-delete-field: isDeleted
      logic-delete-value: 1   #表示删除
      logic-not-delete-value: 0 # 未删除
      #打印mybatisplus执行sql日志
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
server:
  port: 8081

2.2.3 消费消息

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


/**
 * 手动ACK消费消息
 *
 * @author zx
 * @date 2022年04月27日 21:47
 */
@Component
@Slf4j
@RabbitListener(queues = "boot_queue_confirm_topic")
public class ComfirmMsgComsumerListener {
   @RabbitHandler
    public void topicListener(String data, Message message, Channel channel) throws IOException {
       long deliveryTag = message.getMessageProperties().getDeliveryTag();
       try {
           log.info("message:{}",message);
           log.info("data:{}",data);
            int i = 1/0;
           //手动确认 消费成功
           channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
       }catch (Exception e){
           log.info("消息处理失败,消息内容:{},记录日志人工手动补偿", data);
            消息被否定。multiple:是否批量处理.true:将一次性ack所有小于deliveryTag的消息,requeue:为true时,重新入队
           //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
           channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
       }


    }

}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐