Lettuce是一个基于Netty的Redis客户端,可以使用较少的统一管理的IO线程处理全部的数据。Lettuce提供了同步,异步和响应式命令。
lettuce文档:https://lettuce.io/core/release/reference/

Lettuce读写命令流程

  1. 通过RedisClient初始化连接(netty参数,重试机制等等):
    通过RedisClient.connect获取到连接StatefulRedisConnectionImpl

    在创建连接StatefulRedisConnectionImpl过程中,会初始化BootStrap、Channel和DefaultEndpoint等重要成员。StatefulRedisConnectionImpl绑定了一个Channel,Channel也绑定了IO线程(Bootstrap#connect)。

  2. 通过StatefulRedisConnectionImpl获取同步/异步/响应式连接,再执行Redis命令。

    命令首先被封装成AsyncCommand,然后通过DefaultEndpoint发送命令,DefaultEndpoint内部会依靠Netty IO线程使用Channel异步发送命令,此时用户线程已经能获取到一个AsyncCommand,只是没有响应的数据。

    命令的响应首先被Netty IO线程接收到,从绑定的对应channel的CommandHandler里面取出对应的AsyncCommand,解析且封装响应到output,此时之前用户线程就能从AsyncCommand拿到响应了。
    在这里插入图片描述

看门狗重连机制

连接正常时的有序性

lettuce是通过DefaultEndpoint缓存本地命令,CommandHandler与redis服务器进行读写命令。每次在DefaultEndpoint会用sharedLock保证命令有序性,防止有重连或者未flush的情况;每次CommandHandler向服务端发送一条命令,就会往stack底部压入一条命令,TCP是有序的,Redis是单线程处理命令,所以从服务端响应过来了结果,能够对应上之前往stack压入的命令,这时CommandHandler只需要pop出来顶端的命令,即可对应上。

连接异常时的乱序现象

但是在连接有问题时,channel还是同一个的话,就不再是有序的了。比如本地先后发送了两条命令,redis响应第一条命令时连接断开,在第二条命令响应前连接又好了,就会导致客户端只收到第二条命令,并且会与第一条命令进行匹配,这样就乱序了。

看门狗重连

channel不可用时,清空stack,关闭channel。通过io.lettuce.core.protocol.ReconnectionHandler#reconnect重新创建一个channel,由于Channel的BootStrap是同一个,所以其pipleline最终没有什么区别,功能一致,因为DefaultEndpoint是同一个对象。

看门狗配置

io.lettuce.core.ClientOptions#autoReconnect

注意事项

PUBSUB

Lettuce提供的PubSub命令,是通过监听器来实现,不会阻塞当前线程(Jedis的sub是通过死循环监听)。但由于一旦执行过sub命令,就不能再执行(取消)订阅之外的其他命令了,所以需要注意PUBSUB连接的使用。

重连OOM

OOM:重连会导致DefaultEndpoint的commandBuffer或者disconnectedBuffer不断扩增!可通过io.lettuce.core.ClientOptions#disconnectedBehavior更改为REJECT_COMMANDS解决。

因为两个buffer的size等于clientOptions.getRequestQueueSize() ,而requestQueueSize默认配置是Integer.MAX_VALUE
下面通过展示下发送单个命令的过程,解释下OOM问题缘由。

public <K, V> Collection<RedisCommand<K, V, ?>> write(Collection<? extends RedisCommand<K, V, ?>> commands) {

        LettuceAssert.notNull(commands, "Commands must not be null");

        try {
            sharedLock.incrementWriters();
			//1. 如果开启了看门狗自动重连,但是连接不可用,且没有设置拒绝策略,这里的判断会通过,导致跳到3或者4,可能触发OOM
            validateWrite(commands.size());

            if (autoFlushCommands) {

                if (isConnected()) {
                    //2.直接发送命令
                    writeToChannelAndFlush(commands);
                } else {
                    //3.暂存命令到disconnectBuffer
                    writeToDisconnectedBuffer(commands);
                }

            } else {
                //4.写入commandBuffer,等到一次flush
                writeToBuffer(commands);
            }
        } finally {
            sharedLock.decrementWriters();
            if (debugEnabled) {
                logger.debug("{} write() done", logPrefix());
            }
        }

        return (Collection<RedisCommand<K, V, ?>>) commands;
    }

假设关闭看门狗机制,这个连接完全不可用了,无法向服务端发送任何命令

不宜创建过多连接

如果频繁创建StatefulRedisConnectionImpl,就会导致频繁创建BootStrap、Channel、IO Thread等,这样会导致资源浪费。因为IO线程会绑定一个内存分配池,会有内存缓存,如果没有固定IO线程组,这容易导致OOM。

切记使用后关闭RedisClient

通过shutdown正确释放RedisClient下的所有资源,避免资源泄露。

丢包、超时,不会直接从stack移除命令

如果是服务端丢包,当在timeout期间内一直没有数据返回给客户端,客户端会直接记录result为CancelException。由于Redis和客户端之间是传送的数据流,当网络异常,这段数据流就是空的,所以解析出来为null,对应的command结果也就是null,进而不应该直接从stack移除命令

服务端下线,会清空stack

相当于服务端关闭了该sokect

如果客户端恰巧在handler处理,那么会激发调用handler的exceptionCaught方法,此时可能触发connectionWatchdog重连;

如果客户端没有其他操作,那么客户端最终会激发调用channelInactive【会将defaultEndPoint的channel设置为null,如果没有用lettuce的重连机制,一定要自行建立channel,否则就无法发送命令,直接会io.lettuce.core.protocol.DefaultEndpoint#validateWrite抛出Currently not connected. Commands are rejected.】和channelUnregistered方法,那么再有command就会被存储在DefaultEndpoint的disconnectedBuffer中。

CancelException

当DefaultEndpoint被关闭时,比如外层的StatefulConnection关闭了,就会导致endpoint关闭,进来的command和之前暂存的command都会被抛出 CancelException

如果是阻塞式命令或者multi,不建议共享连接。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐