springboot整合rabbitmq集群配置项详解
https://blog.csdn.net/qq_45491757/article/details/105712530springboot整合rabbitmq集群创建方式这里省略 整合开始 1 引入starter<parent><groupId>org.springframework.boot</groupId><artifactId>spring-
https://blog.csdn.net/qq_45491757/article/details/105712530
springboot整合rabbitmq
集群创建方式这里省略 整合开始 1 引入starter
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 12345678910
2 配置文件
rabbitmq: addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host) # port: ##集群配置 addresses之间用逗号隔开 # addresses: ip:port,ip:port password: admin username: 123456 virtual-host: / # 连接到rabbitMQ的vhost requested-heartbeat: #指定心跳超时,单位秒,0为不指定;默认60s publisher-confirms: #是否启用 发布确认 publisher-reurns: # 是否启用发布返回 connection-timeout: #连接超时,单位毫秒,0表示无穷大,不超时 cache: channel.size: # 缓存中保持的channel数量 channel.checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel connection.size: # 缓存的连接数,只有是CONNECTION模式时生效 connection.mode: # 连接工厂缓存模式:CHANNEL 和 CONNECTION listener: simple.auto-startup: # 是否启动时自动启动容器 simple.acknowledge-mode: # 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto simple.concurrency: # 最小的消费者数量 simple.max-concurrency: # 最大的消费者数量 simple.prefetch: # 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量. simple.transaction-size: # 指定一个事务处理的消息数量,最好是小于等于prefetch的数量. simple.default-requeue-rejected: # 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系) simple.idle-event-interval: # 多少长时间发布空闲容器时间,单位毫秒 simple.retry.enabled: # 监听重试是否可用 simple.retry.max-attempts: # 最大重试次数 simple.retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔 simple.retry.multiplier: # 应用于上一重试间隔的乘数 simple.retry.max-interval: # 最大重试时间间隔 simple.retry.stateless: # 重试是有状态or无状态 template: mandatory: # 启用强制信息;默认false receive-timeout: # receive() 操作的超时时间 reply-timeout: # sendAndReceive() 操作的超时时间 retry.enabled: # 发送重试是否可用 retry.max-attempts: # 最大重试次数 retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔 retry.multiplier: # 应用于上一重试间隔的乘数 retry.max-interval: #最大重试时间间隔 1234567891011121314151617181920212223242526272829303132333435363738394041
注:相关配置很多,大家只需要关注一些常用的配置即可 3 Spring AMQP的主要对象 注:如果不了解AMQP请前往官网了解.
类 | 作用 |
---|---|
Queue | 对应RabbitMQ中Queue |
AmqpTemplate | 接口,用于向RabbitMQ发送和接收Message |
RabbitTemplate | AmqpTemplate的实现类 |
@RabbitListener | 指定消息接收方,可以配置在类和方法上 |
@RabbitHandler | 指定消息接收方,只能配置在方法上,可以与@RabbitListener一起使用 |
Message | 对RabbitMQ消息的封装 |
Exchange | 对RabbitMQ的Exchange的封装,子类有TopicExchange、FanoutExchange和DirectExchange等 |
Binding | 将一个Queue绑定到某个Exchange,本身只是一个声明,并不做实际绑定操作 |
AmqpAdmin | 接口,用于Exchange和Queue的管理,比如创建/删除/绑定等,自动检查Binding类并完成绑定操作 |
RabbitAdmin | AmqpAdmin的实现类 |
ConnectionFactory | 创建Connection的工厂类,RabbitMQ也有一个名为ConnectionFactory的类但二者没有继承关系,Spring ConnectionFactory可以认为是对RabbitMQ ConnectionFactory的封装 |
CachingConnectionFactory | Spring ConnectionFactory的实现类,可以用于缓存Channel和Connection |
Connection | Spring中用于创建Channel的连接类,RabbitMQ也有一个名为Connection的类,但二者没有继承关系,Spring Connection是对RabbitMQ Connection的封装 |
SimpleConnection | Spring Connection的实现类,将实际工作代理给RabbitMQ的Connection类 |
MessageListenerContainer | 接口,消费端负责与RabbitMQ服务器保持连接并将Message传递给实际的@RabbitListener/@RabbitHandler处理 |
RabbitListenerContainerFactory | 接口,用于创建MessageListenerContainer |
SimpleMessageListenerContainer | MessageListenerContainer的实现类 |
SimpleRabbitListenerContainerFactory | RabbitListenerContainerFactory的实现类 |
RabbitProperties | 用于配置Spring AMQP的Property类 |
对于发送方而言,需要做以下配置: 1 配置CachingConnectionFactory 2 配置Exchange/Queue/Binding 3 配置RabbitAdmin创建上一步的Exchange/Queue/Binding 4 配置RabbitTemplate用于发送消息,RabbitTemplate通过CachingConnectionFactory获取到Connection,然后想指定Exchange发送 对于消费方而言,需要做以下配置: 1 配置CachingConnectionFactory 2 配置Exchange/Queue/Binding 3 配置RabbitAdmin创建上一步的Exchange/Queue/Binding 4 配置RabbitListenerContainerFactory 5 配置@RabbitListener/@RabbitHandler用于接收消息 在默认情况下主要的配置如下:
配置项 | 默认值 | 作用 |
---|---|---|
host | localhost | RabbitMQ服务器地址 |
port | 5672 | RabbitMQ服务器端口 |
username | 账户名 | guest |
password | 密码 | guest |
virtualHost | RabbitMQ虚拟主机名 | / |
publisherConfirms | false | 设置是否启用生产方确认 |
publisherReturns | false | 设置是否启用生产方消息返回 |
ssl | 对象 | 配置SSL,默认停用 |
template | 对象 | 设置RabbitTemplate |
template.retry | 默认停用 | 设置RabbitTemplate发送消息时的重试,主要用于RabbitTemplate与RabbitMQ之间的网络连接 |
template.mandatory | false | 设置发送消息失败时(无接收queue)是否return 消息,与return callback一并使用 |
template.exchange | “” | 默认发送的exchange |
template.routingKey | “” | 默认发送消息时的routing key |
template.defaultReceiveQueue | null | 默认接收消息的queue |
listener.simple | 对象 | 设置SimpleRabbitListenerContainerFactory |
listener.direct | 对象 | 设置DirectRabbitListenerContainerFactory |
listener.simple.concurrency | null | 并发消费方数量 |
listener.simple.acknowledgeMode | AUTO | 设置消费方确认模式,这里的AUTO与RabbitMQ的自动确认不是一回事 |
listener.simple.prefetch | 250 | 设置消费方一次性接收消息的条数 |
listener.simple.defaultRequeueRejected | true | 当Listener发生异常时是否requeue |
listener.simple.retry | 对象 | 设置Listener的重试机制,默认停用,当启用时,Listener对于消息处理过程中的异常将进行requeue重试,超过重试次数再抛弃,此时AmqpRejectAndDontRequeueException异常也会被重试 |
4 使用: 通过配置类加载的方式:
package com.yd.demo.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitConfig { private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class); public static final String RECEIVEDLXEXCHANGE="spring-ex"; public static final String RECEIVEDLXQUEUE="spring-qu1"; public static final String RECEIVEDLXROUTINGKEY="aa"; public static final String DIRECTEXCHANGE="spring-ex"; public static final String MDMQUEUE="mdmQueue"; public static final String TOPICEXCHANGE="spring-top"; @Value("${spring.rabbitmq.addresses}") private String hosts; @Value("${spring.rabbitmq.username}") private String userName; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; /* @Value("${rabbit.channelCacheSize}") private int channelCacheSize;*/ // @Value("${rabbit.port}") // private int port; /* @Autowired private ConfirmCallBackListener confirmCallBackListener; @Autowired private ReturnCallBackListener returnCallBackListener;*/ @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setAddresses(hosts); cachingConnectionFactory.setUsername(userName); cachingConnectionFactory.setPassword(password); // cachingConnectionFactory.setChannelCacheSize(channelCacheSize); //cachingConnectionFactory.setPort(port); cachingConnectionFactory.setVirtualHost(virtualHost); //设置连接工厂缓存模式: cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION); //缓存连接数 cachingConnectionFactory.setConnectionCacheSize(3); //设置连接限制 cachingConnectionFactory.setConnectionLimit(6); logger.info("连接工厂设置完成,连接地址{}"+hosts); logger.info("连接工厂设置完成,连接用户{}"+userName); return cachingConnectionFactory; } @Bean public RabbitAdmin rabbitAdmin(){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); rabbitAdmin.setAutoStartup(true); rabbitAdmin.setIgnoreDeclarationExceptions(true); rabbitAdmin.declareBinding(bindingMdmQueue()); //声明topic交换器 rabbitAdmin.declareExchange(directExchange()); logger.info("管理员设置完成"); return rabbitAdmin; } @Bean public RabbitListenerContainerFactory listenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setMessageConverter(new Jackson2JsonMessageConverter()); //最小消费者数量 factory.setConcurrentConsumers(10); //最大消费者数量 factory.setMaxConcurrentConsumers(10); //一个请求最大处理的消息数量 factory.setPrefetchCount(10); // factory.setChannelTransacted(true); //默认不排队 factory.setDefaultRequeueRejected(true); //手动确认接收到了消息 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); logger.info("监听者设置完成"); return factory; } @Bean public DirectExchange directExchange(){ return new DirectExchange(DIRECTEXCHANGE,true,false); } @Bean public Queue mdmQueue(){ Map arguments = new HashMap<>(); // 绑定该队列到私信交换机 arguments.put("x-dead-letter-exchange",RECEIVEDLXEXCHANGE); arguments.put("x-dead-letter-routing-key",RECEIVEDLXROUTINGKEY); logger.info("队列交换机绑定完成"); return new Queue(RECEIVEDLXQUEUE,true,false,false,arguments); } @Bean Binding bindingMdmQueue() { return BindingBuilder.bind(mdmQueue()).to(directExchange()).with(""); } @Bean public RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); rabbitTemplate.setMandatory(true); //发布确认 // rabbitTemplate.setConfirmCallback(confirmCallBackListener); // 启用发布返回 // rabbitTemplate.setReturnCallback(returnCallBackListener); logger.info("连接模板设置完成"); return rabbitTemplate; } /* @Bean public TopicExchange topicExchange(){ return new TopicExchange(TOPICEXCHANGE,true,false); }*/ /* *//** * @return DirectExchange *//* @Bean public DirectExchange dlxExchange() { return new DirectExchange(RECEIVEDLXEXCHANGE,true,false); } *//* * * @return Queue *//* @Bean public Queue dlxQueue() { return new Queue(RECEIVEDLXQUEUE,true); } *//* * @return Binding *//* @Bean public Binding binding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(RECEIVEDLXROUTINGKEY); }*/ } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
通过两种方式加载 1 通过配置文件 2 通过配置类 说明:上面是通过配置文件与配置类的方式去加载,常用的配置如上所示。实际使用中要生产方与消费方要分开配置,相关配置也会有小变动,大体配置不变。更多信息可查看官网配置。
完结
更多推荐
所有评论(0)