自定义连接工厂

通过自定义连接工厂连接时,ip、端口都正确的情况下,连接rabbitmq还是不行,报异常

Exception in thread "main" java.util.concurrent.TimeoutException
    at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77)
    at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:317)
    at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:64)
    at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:156)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1106)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1063)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1021)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1182)
[AMQP Connection 192.168.146.128:5672] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected connection driver error occured
java.net.SocketException: Socket Closed
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:184)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:598)
    at java.lang.Thread.run(Thread.java:748)

自定义连接工厂

//创建连接工厂
ConnectionFactory connectionFactory=new ConnectionFactory();
//建立连接
 //连接的用户名 默认guest
connectionFactory.setUsername("guest");
//连接的密码 默认guest
connectionFactory.setPassword("guest");
//虚拟主机名称 默认/
connectionFactory.setVirtualHost("/");
//主机地址
connectionFactory.setHost("192.168.146.128");
//连接端口 java访问默认端口5672,15672是浏览器访问rabbitmq管理页面端口
connectionFactory.setPort(5672);
Connection connection =connectionFactory.newConnection();

报上述异常也描述出连接超时问题,肯定想到设置超时时间

connectionFactory.setConnectionTimeout(30000000);//设置连接超时时间

 然而...

还是一样的错,连接超时(错误同上)

通过官网介绍了解到连接rabbitmq通过amqp和tls有个默认的握手时间,超过这个时间没有连接上就会报上述异常:https://www.rabbitmq.com/configure.html

通过设置握手时间后解决问题,消息发送成功

connectionFactory.setHandshakeTimeout(300000000);//设置握手超时时间

springboot整合rabbitmq时

导入依赖:

<!--引入rabbitmq的启动器-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--web,测试用-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

application.yml配置连接信息

#端口
server:
  port: 8888
spring:
  rabbitmq:
    #主机地址
    host: 192.168.146.128
    #这几个都是默认值,可以不写
    virtual-host: /
    port: 5672
    username: guest
    password: guest

 rabbitmq的配置类:交由spring创建交换机,队列,并绑定

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitmq配置类
 * @author:w
 * @Date:2021/5/14 19:21
 * @description
 */
@Configuration
public class RabbitmqConfig {
    //交换机名称
    public static final String EXCHANGE_NAME="springboot_topic_exchange";
    //队列名称
    public static final String QUEUE_NAME="springboot_queue";

    /**
     * 设置交换机
     *  相当于channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
     * @return
     */
    @Bean("exchange")
    public Exchange setExchange(){
        //durable(true),交换机持久化
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    /**
     *设置队列
     *  相当于channel.queueDeclare(QUEUE_NAME,true,false,false,null);
     * @return
     */
    @Bean("queue")
    public Queue setQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    /**
     * 设置交换机和队列的绑定关系
     *  相当于:channel.queueBind(TOPIC_QUEUE1,TOPIC_EXCHANGE,"wang.#");
     * @param exchange
     * @param queue
     * @return
     */
    @Bean
    public Binding setBinding(@Qualifier("exchange") Exchange exchange,
                              @Qualifier("queue") Queue queue){
        //bind队列到(to)交换机使用路由key是wang.#
        return BindingBuilder.bind(queue).to(exchange).with("wang.#").noargs();
    }
}

测试controller

@RestController
public class SendMsg {
    //通过注入rabbitmq模板对象发送消息
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/send/{msg}/{key}")
    public String sendMsg(@PathVariable String msg, @PathVariable String key){
        /**
         * 参数一:交换机名称
         * 参数二:路由key
         * 参数三:发送的消息
         */
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,key,msg);
        return "消息发送成功";
    }
}

然而在浏览器中访问的时候,出错了...

 

java.net.SocketException: Socket Closed
    at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_231]
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[na:1.8.0_231]
    at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[na:1.8.0_231]
    at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_231]
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[na:1.8.0_231]
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265) ~[na:1.8.0_231]
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) ~[na:1.8.0_231]
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-5.9.0.jar:5.9.0]
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:184) ~[amqp-client-5.9.0.jar:5.9.0]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:665) ~[amqp-client-5.9.0.jar:5.9.0]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_231]

2021-05-15 09:43:54.245 ERROR 10432 --- [nio-8888-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.amqp.AmqpTimeoutException: java.util.concurrent.TimeoutException] with root cause

java.util.concurrent.TimeoutException: null
    at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77) ~[amqp-client-5.9.0.jar:5.9.0]
    at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120) ~[amqp-client-5.9.0.jar:5.9.0]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.9.0.jar:5.9.0]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502) ~[amqp-client-5.9.0.jar:5.9.0]
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:326) ~[amqp-client-5.9.0.jar:5.9.0]
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1139) ~[amqp-client-5.9.0.jar:5.9.0]
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1087) ~[amqp-client-5.9.0.jar:5.9.0]
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connectAddresses(AbstractConnectionFactory.java:560) ~[spring-rabbit-2.2.13.RELEASE.jar:2.2.13.RELEASE]
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:533) ~[spring-rabbit-2.2.13.RELEASE.jar:2.2.13.RELEASE]
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:487) ~[spring-rabbit-2.2.13.RELEASE.jar:2.2.13.RELEASE]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:751) ~[spring-rabbit-2.2.13.RELEASE.jar:2.2.13.RELEASE]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:214) ~[spring-rabbit-2.2.13.RELEASE.jar:2.2.13.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2089) ~[spring-rabbit-2.2.13.RELEASE.jar:2.2.13.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2062) ~[spring-rabbit-2.2.13.RELEASE.jar:2.2.13.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1009) ~[spring-rabbit-2.2.13.RELEASE.jar:2.2.13.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1075) ~[spring-rabbit-2.2.13.RELEASE.jar:2.2.13.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1068) ~[spring-rabbit-2.2.13.RELEASE.jar:2.2.13.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_231]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_231]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_231]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_231]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) ~[spring-web-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) ~[spring-web-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105) ~[spring-webmvc-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878) ~[spring-webmvc-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792) ~[spring-webmvc-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) ~[spring-webmvc-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) ~[spring-webmvc-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:626) ~[tomcat-embed-core-9.0.41.jar:4.0.FR]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:733) ~[tomcat-embed-core-9.0.41.jar:4.0.FR]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.41.jar:9.0.41]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) [tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:888) [tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1597) [tomcat-embed-core-9.0.41.jar:9.0.41]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.41.jar:9.0.41]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_231]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_231]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.41.jar:9.0.41]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_231]

超时问题,通过上面自定义连接工厂时,自然想到设置握手超时时间,但是在application.yml中并没有HandShakeTimeout这个属性

百度了一下,说可以通过自定义RabbitAutoConfiguration,包名+类名完全跟amqp下的.class一致覆盖RabbitAutoConfiguration(由appclassloader加载),在初始化连接的时候设置握手超时时间

 启动时也确实加载了自定义的类

但是还是不行,测试的时候还是连接超时..(崩溃中...)

错误跟上面一致,果断放弃了这种写法(欢迎看到文章的小伙伴指点我什么原因导致的....)

java端考虑下不行,那还是从Linux端入手吧

在Linux上 /etc下hosts文件中添加本地ip,解决访问过慢的问题

/hosts文件可以帮助解决哪些问题

4.1 远程登录linux主机过慢问题


有时客户端想远程登录一台linux主机,但每次登录输入密码后都会等很长一段时间才会进入,这是因为linux主机在返回信息时需要解析ip,如果在linux主机的hosts文件事先加入客户端的ip地址,这时再从客户端远程登录linux就会变很快。

注:这里所说的远程登录不仅仅是ssh,还可能是mysql远程登录,或是文件共享的查询等。

4.2 双机互连

当两台主机只是双机互连时,这时两台主机都需要设置自己的ip,同时在对方的hosts文件里加入自己的ip和主机名。

首先查看Linux的hostname

[root@learnVM etc]# hostnamectl
   Static hostname: learnVM  --配置这个hostname进入hosts文件中
         Icon name: computer-vm
           Chassis: vm
        Machine ID: 04f51be2f7574ca08e7ccc430ec6ee92
           Boot ID: 1c5bee706a094dc7adfbf71431e28acf
    Virtualization: vmware
  Operating System: CentOS Linux 7 (Core)
       CPE OS Name: cpe:/o:centos:centos:7
            Kernel: Linux 3.10.0-327.el7.x86_64
      Architecture: x86-64
[root@learnVM etc]# 

重启服务:

rabbitmq管理页面就会有绑定的交换机和队列和发送的消息

至此,完美解决问题,之前哪个重写类的也欢迎指点我什么问题导致的....

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐