【完美解决】SpringBoot整合rabbitmq “Failed to declare queue(s)“
【完美解决】SpringBoot整合rabbitmq "Failed to declare queue(s)"
文章目录
一、复现问题
在测试rabbitmq的时候,启动生产者没有问题,启动消费者后突然发现了如下的问题
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[email.queue]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:700) [spring-rabbit-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:584) [spring-rabbit-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:571) [spring-rabbit-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1338) [spring-rabbit-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1183) [spring-rabbit-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1012) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:52) ~[amqp-client-5.7.3.jar:5.7.3]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_60]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_60]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_60]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_60]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1190) ~[spring-rabbit-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at com.sun.proxy.$Proxy70.queueDeclarePassive(Unknown Source) ~[na:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:679) [spring-rabbit-2.2.3.RELEASE.jar:2.2.3.RELEASE]
... 5 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'email.queue' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.7.3.jar:5.7.3]
... 14 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'email.queue' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:522) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599) ~[amqp-client-5.7.3.jar:5.7.3]
... 1 common frames omitted
错误描述的信息比较长,我们可以从中筛选出重要的信息,比如caused by后面的信息,下面把caused by的信息摘出来
Failed to declare queue(s)
reply-code=404, reply-text=NOT_FOUND - no queue 'email.queue' in vhost '/'
从中可以得出,在rabbitmq中没有创建此队列,打开rabbitmq的管理控制台,这时显示rabbitmq没有队列,所以消费者去监听队列时显示404找不到队列的错误
那就证明了一个问题,生产者没有创建队列成功,或者说就是没有创建队列。看了网上的几篇帖子说的是在rabbitmq的管理控制台手动创建,这怎么能忍的了呢!能让电脑干的事儿就不能让人干。接下来就分享如何在代码中创建队列!
二、问题解决—在生产者代码创建队列
下面请出我们今天的主角—RabbitAdmin;
1.RabbitAdmin是什么?
RabbitAdmin是Spring AMQP封装的一个对象。 在使用RabbitAdmin的时候我们只是配置一个RabbitAdmin对象交给Spring容器做管理。
2.RabbitAdmin如何自动创建队列?
1.配置RabbitAdmin实例
//配置连接工厂
@Bean
public CachingConnectionFactory cachingConnectionFactory(){
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(rabbitmqHost);
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(){
//需要传入
RabbitAdmin rabbitAdmin = new RabbitAdmin(cachingConnectionFactory());
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
2.使用RabbitAdmin实例显式声明队列(主要是这一步)
@Autowired
private RabbitAdmin rabbitAdmin;
//声明邮件队列
@Bean
public Queue emailQueue(){
Queue queue = new Queue(MAIL_QUEUE_NAME, true);
//显式声明邮件队列
rabbitAdmin.declareQueue(queue);
return queue;
}
两个步骤搞定!此时再查看RabbitMQ管理控制台已经创建好了队列
三、源码分析—知其然更要知其所以然
那么RabbitAdmin自动创建队列原理是什么?
1.重点看下RabbitAdmin的initiallize()方法
下面的代码块就是从spring上下文环境(ApplicationContext也叫IOC容器)中拿到咱们配置Exchange、Queue、Binding的Bean实例
Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
this.applicationContext.getBeansOfType(Exchange.class).values());
Collection<Queue> contextQueues = new LinkedList<Queue>(
this.applicationContext.getBeansOfType(Queue.class).values());
Collection<Binding> contextBindings = new LinkedList<Binding>(
this.applicationContext.getBeansOfType(Binding.class).values());
Collection<DeclarableCustomizer> customizers =
this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values();
拿到之后就开始创建了,就是下面的declareXXX方法
this.rabbitTemplate.execute(channel -> {
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
declareQueues(channel, queues.toArray(new Queue[queues.size()]));
declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
return null;
});
2.什么时候调用的initiallize()方法?
我们观察到RabbitAdmin这个类实现了InitializingBean接口,该接口是Spring创建bean实例初始化过程后的回调函数,也就是预留给开发者的扩展点。该接口提供的方法是afterPropertiesSet(),此方法中调用了initiallize()方法。那么就可以知道在spring创建bean实例时就已经创建好了队列
更多推荐
所有评论(0)