RabbitMQ–了解中间件、常用的中间件、分布式系统使用中间件、Docker安装rabbitmq及遇到的问题、RabbitMQ核心组成、消息模式
Springboot整合RabbitMQ(Fanout、Direct、Topic模式)、设置队列信息TTL、死信队列、RabbitMQ磁盘监控,内存控制
Springboot+Rabbitmq消费者注解详解、改序列化方式
Docker简易部署RabbitMQ集群、分布式事务解决方案+案例(可靠生产、可靠消费)
Springboot+RabbitMQ+ACK机制(生产方确认(全局、局部)、消费方确认)、知识盲区

springboot+rabbitmq消费者

最近在学习rabbitmq技术,在整理资料时发现整合springboot+rabbitmq的消费者方式,网上有不同的写法,为了满足我的好奇心,网上查阅资料然后整理一下,以作记录。

一般需要使用到两个注解:

  • RabbitListener
  • RabbitHandler

@RabbitListener

看一下这个注解的源码

在这里插入图片描述

看到:这个注解可以声明在方法注解上,一般我们使用时只需要在类或者方法上即可。

通常使用:

  • queues: 表示要监听的队列,可以监听多个队列
@RabbitListener(queues = {"queue_work", "a_queue"})

其实,@RabbitListener注解中有很多的属性,一般只使用queues完全够用,不推荐在此@RabbitListener注解中去创建和绑定交换机和队列,维护起来非常麻烦,推荐使用配置类的方式创建和绑定交换机和队列

@RabbitHandler

如果@RabbitListener是声明在方法上的,这个注解可以不使用。如下:

@RabbitListener(queues = {"queue_work", "a_queue"})
public void receiveEmail2(String va) {
    System.out.println("--消费者2接收到了普通模式队列信息-->" + va);
}

但是当@RabbitListener注解声明在类上面时,这个注解是和@RabbitListener配合使用的。

假设有这样的需求:需要根据队列中发送的数据格式,进行不同的处理。比如:公司内部决定,遇到byte[]这种的数据需要按照方法一处理,遇到字符串格式传输的数据需要按照方法二处理。这个时候就需要使用@RabbitHandler来配合@RabbitListener来处理。

以下两个方法,只有接收到相同类型的数据的方法,才回去执行处理这个消息。

@Component
@RabbitListener(queues = "work_queue")
public class EmailConsumer {

    @RabbitHandler
    public void handler1(String message) {
        System.out.println(message);
    }

    @RabbitHandler
    public void handler2(byte[] message) {
        System.out.println(new String(message));
    }
    
}

MessageConvert&序列化

涉及到网络数据传输,那么序列化是不可避免的。生产者以某种规则将消息数据转为byte[]进行发送,消费者以约定的规则将byte[]进行解析。

  • Message类中的body属性,是我们的真正要传输的消息数据。Rabbitmq中有一个MessageConvert接口来处理消息的序列化,现有的序列化处理类SimpleMessageConverter(默认的)、Jackson2JsonMessageConverter 等。
  • 调用 convertAndSend方法后,会使用MessageConvert 进行消息的序列化
  • SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理;若是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差。
  • 如果数据量比较大,要考虑使用Jackson2JsonMessageConverter 等序列化形式提高性能。

使用JSON序列化

通常情况下我们如果不配置序列化的处理类,那么我们可以在传递参数时直接给传递一个json字符串或者字节数组,然后接收方再通过同样的方式解析即可。


// 发送方
public void sendMessage(Order order) {
	rabbitTemplate.convertAndSend("user_exchange", "user_queue", JSON.toJSONString(user));
}

// 接收方
@RabbitListener(queues = "user_queue")
public void messageConsumer(String msg) throws Exception {
    System.out.println("消息:" + msg);
	// 解析对象
    User order = JSON.parseObject(msg, User.class);
   // 后续处理
}

配置使用Jackson2JsonMessageConverter

若传递数据时不想手动将对象序列化为其他格式,那么我们可以配置序列化处理类来进行处理。

生产者需要配置RabbitTemplate

@Configuration
public class RabbitMqConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
}

消费者需要使用相同的序列化类

@Configuration
public class MyRabbitListenerConfigurer implements RabbitListenerConfigurer{

    //以下配置RabbitMQ消息服务
    @Autowired
    public ConnectionFactory connectionFactory;

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        // 设置转换器
        factory.setMessageConverter(new MappingJackson2MessageConverter());
        return factory;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
        rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

}

生产者发送消息

@Autowired
private RabbitTemplate rabbitTemplate;

public void send(User user) throws Exception {
    MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
    System.out.println(messageConverter);
    rabbitTemplate.convertAndSend("user_exchange", "user_queue", user);
}

看下控制台日志

在这里插入图片描述

接收者,直接接收相应 的数据类型

@RabbitListener(queues = "user_queue")
public void receiveEmail3(User user) {
    System.out.println("接收到消息==>" + user);
}

接收到的信息

在这里插入图片描述

注意:要传输的实体类必须要实现序列化接口,并且一定要提供一个无参构造函数!!!

建议

推荐使用手动将对象序列化的方式进行数据传输,否则再去配置序列化的处理类比较麻烦。如果手动序列化方式的地方过多,那么可以考虑配置序列化类

参考

SpringBoot 整合 RabbitMQ 修改序列化方式

如果要改为Json序列化格式,可以看下这篇文章RabbitMQ:@RabbitListener 与 @RabbitHandler 及 消息序列化

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐