原理

分布式锁要满足 互斥,防死锁的基本要求。

进一步要求 可重入(非必要,但重要),高效。

Redisson下的实现

以下源码阅读基于 redisson:3.12.0
请添加图片描述

标记资源及占有者

特定资源对应的key设置到可访问的redis,并配置一定的超时间和能表示占用者的value(该值可用ObjectId或者类雪花算法,及其他能全局唯一标识线程的算法)。
value主要用于帮助实现可重入特性。

Redison中简单的使用了UUID作为连接管理器的全局唯一标识,再结合本机的线程id,全局唯一标识线程

互斥/可重入

可重入,主要就是通过比较对应资源key的value的值是否与当前线程的ThreadLocal保存的值一致来判断。

防死锁加锁过程

使用带有时效锁去获取资源,并利用看门狗机制做自动续约对可能超时的问题做补偿兜底。
请添加图片描述
具体加锁及释放的4中方式:

RedissonClient client = Redisson.create();

// 获取指定资源的锁
RLock lock = client.getLock("resource-1");

try {
    // 1. 堵塞加锁 配置占用时间是30s
    lock.lock();
    // 2. 堵塞加锁 指定占用时长
    lock.lock(30, TimeUnit.SECONDS);

    // 3. 尝试等待10s内获取锁
    boolean b = lock.tryLock(10, TimeUnit.SECONDS);
    // 4. 尝试等待10s内获取锁,并占用30s
    boolean b1 = lock.tryLock(10, 30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
    e.printStackTrace();
} finally {
    lock.unlock();
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    // 如果有指定则使用时间
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 默认配置的加锁时间是30s
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}
// tryLockInnerAsync 加锁的lua脚本
// KEYS[1]:getName() 资源名称
// ARGV[1]:internalLockLeaseTime 加锁时间; ARGV[2]:getLockName(threadId) 占用资源的线程标识

// 检查对应资源锁是否存在 0表示不存在 1表示存在
if (redis.call('exists', KEYS[1]) == 0) 
then 
    // 不存在,则对资源加锁,并将占用线程的id写入值,用于可重入
    redis.call('hset', KEYS[1], ARGV[2], 1);
    // 设置超时时间 
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return nil; 
end; 

// 如果锁已存在,检查hash表的对应资源域的当前线程标识是否存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) 
then 
    // 存在,则加锁数增加1
    redis.call('hincrby', KEYS[1], ARGV[2], 1); 
    // 更新加锁时间
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return nil; 
end; 
// 如果加锁失败,并且之前没有占用锁,则返回资源key的剩余过期的毫秒数,用于之后到期通知
return redis.call('pttl', KEYS[1]);
// unlockInnerAsync 解锁的lua脚本
// KEYS[1]:getName() 资源名称; KEYS[2]:getChannelName() 监听的channel名称
// ARGV[1]:LockPubSub.UNLOCK_MESSAGE 解锁发送给channel的消息; ARGV[2]: internalLockLeaseTime 加锁时间; ARGV[3]: getLockName(threadId) 占用资源的线程标识

// 检查hash表中的 加锁资源域的当前线程是否还持有锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) 
then 
    // 没有直接退出
    return nil;
end; 
// 定义本地变量,值为当前线程对资源key的加锁次数减1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 

if (counter > 0) 
then 
    // 如果计数器还大于0,说明之前重入了,等待线程后续继续释放
    // 更新过期时间
    redis.call('pexpire', KEYS[1], ARGV[2]); 
    return 0; 
else 
    // 锁已经完全释放
    // 删除表示占用的资源key
    redis.call('del', KEYS[1]); 
    // 推送释放消息给channel,告知所有监听该channel的客户端
    redis.call('publish', KEYS[2], ARGV[1]); 
    return 1; 
end;
return nil;

Redis的EVAL命令

命令格式:EVAL script numkeys key [key …] arg [arg …]

  • script参数是一段 Lua5.1 脚本程序。脚本不必(也不应该[^1])定义为一个 Lua 函数
  • numkeys指定后续参数有几个key,即:key [key …]中key的个数。如没有key,则为0
  • key [key …] 从 EVAL 的第三个参数开始算起,表示在脚本中所用到的那些 Redis 键(key)。在Lua脚本中通过KEYS[1], KEYS[2]获取。
  • arg [arg …] 附加参数。在Lua脚本中通过ARGV[1],ARGV[2]获取。

看门狗机制

看门狗机制是对防死锁但存在执行所需时间片不足的场景的补偿。

通过间隔一定时间对锁自动续约用来实现。

// 自旋加锁源码
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    // tryAcquire 返回的是剩余占用的毫秒数
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }

    // 注册对资源key的订阅
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    if (interruptibly) {
        commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        commandExecutor.syncSubscription(future);
    }

    try {
        // 自旋不断尝试取锁
        while (true) {
            ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            // future 在前面设置好对channel的监听之后,会在收到通知时被唤醒,然后进入下次加锁尝试
            if (ttl >= 0) {
                try {
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    future.getNow().getLatch().acquire();
                } else {
                    future.getNow().getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        // 取锁成功或者被中断之后取消对资源的订阅
        unsubscribe(future, threadId);
    }
//        get(lockAsync(leaseTime, unit));
}
// 看门狗机制的源码
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    // 如果指定了超时时间,则直接使用,不设置看门狗
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        if (ttlRemaining == null) {
            // 取锁成功后,添加到看门狗自动续约调度表中
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

// 添加到过期刷新调度器中,也就是看门狗队列
private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        renewExpiration();
    }
}

这部分就是自动续约的源码,主要是借助netty的定时任务取实现,细节可以直接阅读netty的相关知识

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }
                
                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}

堵塞加锁如何被通知

利用redis的channel发布订阅机制实现

订阅过程
请添加图片描述

发布过程
请添加图片描述

发布订阅命令参数
命令描述
PSUBSCRIBE pattern [pattern …]
订阅一个或多个符合给定模式的频道。
PUBSUB subcommand [argument [argument …]]查看订阅与发布系统状态。
PUBLISH channel message将信息发送到指定的频道。
PUNSUBSCRIBE [pattern [pattern …]]退订所有给定模式的频道。
SUBSCRIBE channel [channel …]订阅给定的一个或多个频道的信息。
UNSUBSCRIBE [channel [channel …]]指退订给定的频道。

总结

看完Redis的分布式锁的运行流程,如果阅读过可重入锁ReentrantLock的源码,了解其流程的同学会发现,两者的相似度非常高.

从如何加锁,到如何实现可重入特性,到如何通知下一个等待锁释放的对象.

特别注意: 因为redis是没有事务这个概念的,虽然通过使用lua脚本做到了正常操作的原子性,但是遇到特殊情况,如redis挂了,那么脚本中的命令也不会全部执行。
故,命令的顺序就非常重要,所有check操作都需要优先,所有设置值得操作必须先带上过期时间,最后才设置value,避免出现一个永远不会过期的对象导致死锁。
对脚本中的命令均要思考,执行到该语句失败,后续命令不执行的情况下,之前设置的key是否会永存?
永存的情况下,程序能否正常执行?
正常执行的情况,这些永存的key是否会导致redis出现严重的内存泄漏?
当然,如果你家的redis挂了,不走恢复逻辑,那也无所谓。

相关推荐

参考资料

  • https://www.cnblogs.com/jelly12345/p/14699492.html
  • https://www.runoob.com/redis/redis-pub-sub.html
  • https://www.cnblogs.com/jelly12345/p/14699492.html
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐