一、引言

        线上MongoDB出现了如下图所示的报错,服务端链接失败,查询资料得知Linux会销毁服务器上五分钟未使用的Socket通信链接。

        

二、追踪分析

        连接线为虚拟线路,便于理解,实际网络连接中运输层、网络层只进行传输而非有一条固定线路留给客户端与服务器

1、初始链接生成,mongo链接池默认0-100

        mongo默认链接池参数如下:

//线程池空闲时保持的最小连接数

minConnectionsPerHost=20

//线程池允许的最大连接数

maxConnectionsPerHost=100

//最大队列数

threadsAllowedToBlockForConnectionMultiplier=5

//线程池中连接的最大空闲时间

maxConnectionIdleTime = 60*1000

 // 线程池中连接的最长生存时间,采用默认值

maxConnectionLifeTime

//设置服务器选择超时(以毫秒为单位),它定义驱动程序在抛出异常之前等待服务器选择成功的时间
//值为0表示如果没有可用的服务器,它将立即超时。 负值意味着无限期等待
private int serverSelectionTimeout = 1000 * 30;

//连接超时时间,必须大于0
private int connectTimeout = 1000 * 5;

//线程等待连接变为可用的最长时间.

maxWaitTime=6000

        private String description;
        private String applicationName;
        private ReadPreference readPreference = ReadPreference.primary();
        private WriteConcern writeConcern = WriteConcern.ACKNOWLEDGED;
        private ReadConcern readConcern = ReadConcern.DEFAULT;
        private CodecRegistry codecRegistry = MongoClient.getDefaultCodecRegistry();
        private final List<CommandListener> commandListeners = new ArrayList<CommandListener>();
        private final List<ClusterListener> clusterListeners = new ArrayList<ClusterListener>();
        private final List<ServerListener> serverListeners = new ArrayList<ServerListener>();
        private final List<ServerMonitorListener> serverMonitorListeners = new ArrayList<ServerMonitorListener>();

        private int minConnectionsPerHost;
        private int maxConnectionsPerHost = 100;
        private int threadsAllowedToBlockForConnectionMultiplier = 5;
        private int serverSelectionTimeout = 1000 * 30;
        private int maxWaitTime = 1000 * 60 * 2;
        private int maxConnectionIdleTime;
        private int maxConnectionLifeTime;
        private int connectTimeout = 1000 * 10;
        private int socketTimeout = 0;
        private boolean socketKeepAlive = false;
        private boolean sslEnabled = false;
        private boolean sslInvalidHostNameAllowed = false;
        private boolean alwaysUseMBeans = false;

        private int heartbeatFrequency = 10000;
        private int minHeartbeatFrequency = 500;
        private int heartbeatConnectTimeout = 20000;
        private int heartbeatSocketTimeout = 20000;
        private int localThreshold = 15;

        private String requiredReplicaSetName;
        private DBDecoderFactory dbDecoderFactory = DefaultDBDecoder.FACTORY;
        private DBEncoderFactory dbEncoderFactory = DefaultDBEncoder.FACTORY;
        private SocketFactory socketFactory;
        private boolean cursorFinalizerEnabled = true;

 2、分析服务端mongo在Linux的链接活性

        在后面五分钟,只有一个请求进入,响应的只是k1-f1链接

        On Linux, mongod and mongos processes limit the keepalive to a maximum of 300 seconds (5 minutes) on their own sockets by overriding keepalive values greater than 5 minutes.

        Linux上的mongod进程会关闭5分钟无操作的连接,f2链接在服务器端被关闭。

 3、f2链接被关闭后,k2并不知道服务端关闭了对应的通道

 三、解决方案

        解决链接活性的问题其实有很多方案,保活机制参数调整、链接时长调整等都可以实现,作者采取了配置maxConnectionIdleTime: 表示线程池中连接的最大空闲时间, 0标志Udine空闲时间没有限制,超过这个时间会被关闭。

    public Mongo mongo() {
        String[] nodes = this.nodes.split(",\\s*");
        String[] credentials = this.credentials.split(",\\s*");
        List<ServerAddress> serverAddresses = Arrays.stream(nodes)
                .map(node -> {
                    String[] seg = node.split(":");
                    return new ServerAddress(seg[0], Integer.parseInt(seg[1]));
                }).collect(Collectors.toList());
        List<MongoCredential> mongoCredentials = Arrays.stream(credentials)
                .map(credential -> {
                    String[] seg = credential.split(":");
                    return MongoCredential.createScramSha1Credential(seg[0], seg[1], seg[2].toCharArray());
                }).collect(Collectors.toList());
        MongoClientOptions.Builder build = new MongoClientOptions.Builder();
        build.socketTimeout(this.socketTimeout);
        //配置最大空闲时间
        build.maxConnectionIdleTime(this.connectionMaxIdleTimeMs);
        MongoClientOptions options = build.build();

        return new MongoClient(serverAddresses, mongoCredentials, options);
    }

        通过客户端、服务端链接同时关闭,参数设置必须小于五分钟,卡住linux的要求时间点。

 四、源码分析

        使用mongodb的模板提交方法后,最终都会走入提交共同方法,mongo采用异步进行数据的发送和接

public T execute(final InternalConnection connection) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(format("Sending command {%s : %s} to database %s on connection [%s] to server %s",
                                getCommandName(), command.values().iterator().next(),
                                namespace.getDatabaseName(), connection.getDescription().getConnectionId(),
                                connection.getDescription().getServerAddress()));
        }
        long startTimeNanos = System.nanoTime();
        CommandMessage commandMessage = new CommandMessage(namespace.getFullName(), command, slaveOk, fieldNameValidator,
                ProtocolHelper.getMessageSettings(connection.getDescription()));
        ResponseBuffers responseBuffers = null;
        try {
            //通过连接发出信息
            sendMessage(commandMessage, connection);
            //通过连接接收数据
            responseBuffers = connection.receiveMessage(commandMessage.getId());
            if (!ProtocolHelper.isCommandOk(new BsonBinaryReader(new ByteBufferBsonInput(responseBuffers.getBodyByteBuffer())))) {
                throw getCommandFailureException(getResponseDocument(responseBuffers, commandMessage, new BsonDocumentCodec()),
                        connection.getDescription().getServerAddress());
            }

            T retval = getResponseDocument(responseBuffers, commandMessage, commandResultDecoder);

            if (commandListener != null) {
                sendSucceededEvent(connection.getDescription(), startTimeNanos, commandMessage,
                        getResponseDocument(responseBuffers, commandMessage, new RawBsonDocumentCodec()));
            }
            LOGGER.debug("Command execution completed");
            return retval;
        } catch (RuntimeException e) {
            sendFailedEvent(connection.getDescription(), startTimeNanos, commandMessage, e);
            throw e;
        } finally {
            if (responseBuffers != null) {
                responseBuffers.close();
            }
        }
    }

客户端检查stream连接流和isClosed是否关闭标志位,都只是客户端的参数,而不是检查时向服务器通信确认 

    public ResponseBuffers receiveMessage(final int responseTo) {
        //客户端检查stream连接流和isClosed是否关闭标志位,都只是客户端的参数,而不是检查时向服务器通信确认
        notNull("stream is open", stream);
        if (isClosed()) {
            throw new MongoSocketClosedException("Cannot read from a closed stream", getServerAddress());
        }

        CountDownLatch localLatch = new CountDownLatch(1);
        readerLock.lock();
        try {
            //接收数据
            ResponseBuffers responseBuffers = receiveResponseBuffers();
            messages.put(responseBuffers.getReplyHeader().getResponseTo(), responseBuffers);
            readingPhase.getAndSet(localLatch).countDown();
        } catch (Throwable t) {
            exceptionThatPrecededStreamClosing = translateReadException(t);
            close();
            readingPhase.getAndSet(localLatch).countDown();
        } finally {
            readerLock.unlock();
        }

        while (true) {
            if (isClosed()) {
                if (exceptionThatPrecededStreamClosing != null) {
                    throw exceptionThatPrecededStreamClosing;
                } else {
                    throw new MongoSocketClosedException("Socket has been closed", getServerAddress());
                }
            }
            ResponseBuffers myResponse = messages.remove(responseTo);
            if (myResponse != null) {
                connectionListener.messageReceived(new ConnectionMessageReceivedEvent(getId(),
                                                                                      myResponse.getReplyHeader().getResponseTo(),
                                                                                      myResponse.getReplyHeader().getMessageLength()));
                return myResponse;
            }

            try {
                localLatch.await();
            } catch (InterruptedException e) {
                throw new MongoInterruptedException("Interrupted while reading from stream", e);
            }

            localLatch = readingPhase.get();
        }
    }

客户端通过连接读取(REPLY_HEADER_LENGTH是应答头的长度) 

    private ResponseBuffers receiveResponseBuffers() throws IOException {
        //客户端通过连接读取(REPLY_HEADER_LENGTH是应答头的长度)
        ByteBuf headerByteBuffer = stream.read(REPLY_HEADER_LENGTH);
        ReplyHeader replyHeader;
        ByteBufferBsonInput headerInputBuffer = new ByteBufferBsonInput(headerByteBuffer);
        try {
            replyHeader = new ReplyHeader(headerInputBuffer, description.getMaxMessageSize());
        } finally {
            headerInputBuffer.close();
        }

        ByteBuf bodyByteBuffer = null;

        if (replyHeader.getNumberReturned() > 0) {
            bodyByteBuffer = stream.read(replyHeader.getMessageLength() - REPLY_HEADER_LENGTH);
        }
        return new ResponseBuffers(replyHeader, bodyByteBuffer);
    }

 通过套接字读取连接对应的缓冲区,使用流工具从buffer获取字节流,bytesRead == -1说明流是空的,无法从服务器端获取数据

    public ByteBuf read(final int numBytes) throws IOException {
        ByteBuf buffer = bufferProvider.getBuffer(numBytes);
        isTrue("open", !isClosed());

        int totalBytesRead = 0;
        while (totalBytesRead < buffer.limit()) {
            int bytesRead = socketChannel.read(buffer.asNIO());
            if (bytesRead == -1) {
                buffer.release();
                //通过套接字读取连接对应的缓冲区,使用流工具从buffer获取字节流,bytesRead == -1说明流是空的,无法从服务器端获取数据
                throw new MongoSocketReadException("Prematurely reached end of stream", getAddress());
            }
            totalBytesRead += bytesRead;
        }
        return buffer.flip();
    }

五、总结

       1、 MongoDB客户端与服务端没有自动建立保活机制,导致连接存活不同步(maxConnectionIdleTime 进行了补足,理解是不方便进行逻辑修改,只能对bug进行补偿 ,毕竟单纯监听客户端连接使用时间并和服务器进行同步销毁是比较简单的,相当于加功能迭代)

        2、客户端进行连接流和通道是否关闭时,没有实时校验(理解是实时校验消耗资源,所以mongo初期不做此消耗)

        3、作者是通过配置maxConnectionIdleTime限制连接的最大空闲时间,其他参数未实验,感兴趣的同学可以自己试试。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐