设计一个分布式锁应当满足以下三个条件,这也是高效的使用分布式锁所需要的最低的保证:

  1. 互斥性。在任何时间点,只能有一个客户端可以持有锁。
  2. 死锁释放性。最终总是可以获得锁,不会出现死锁,即使在一个被客户端锁住的资源损坏了或分区了的情况下。
  3. 容错性。只要大多数的redis节点可用,那么客户端就可以获取和释放锁。

首先来讲一下单节点上锁

单节点锁

先设计一个最简单的方案,一个线程调用加锁操作时,其实就是检查锁变量是否为0。如果是0,就把锁的变量值设置为1,表示已经占有了这个锁。如果不是0,说明这个锁已经被其它线程占有,则返回错误信息。当使用完毕时要释放锁,就是说要把锁变量值设置为0,这样其它线程就可以占有锁。

  1. 如果key值为0,则表示目前没有线程占有锁;
  2. 如果key值为1,则表示已经有线程占有锁。

 

我们可以使用set nx命令来实现这个方案,这个命令在执行时会判断键值对是否存在。如果不存在,就说明还没有客户端占有这个锁,就设置键值对的值为1,表示锁已经成功占有;如果存在就不作任何设置,因为这意味着已经有其它客户端占有了这个锁。对于释放锁来说,我们在执行完业务操作后可以使用del命令来删除锁变量,用删除key来代替上述方案中把key设置为0。

这个简单的方案有两个问题。

第一个问题,假如客户端A已经成功占有锁,但是假如客户端B一不小心删除了这个锁变量,然后客户端C发起加锁并成功,这时候客户端A和C就同时进入了临界区产生了冲突,不满足条件1-互斥性。这个问题的根源在于这个方案不能区分目前是谁占有锁,因为锁变量要么不存在要么值为1,不能区分客户端,导致锁被误删。为了对客户端进行区分,我们可以要求客户端各自生成一个唯一的签名,在申请锁的时候用这个唯一的签名去设置锁变量。在释放锁的时候要先检查一下锁变量的值是否等于自己的签名,只有相等才能删掉锁变量释放锁。如果锁变量值不等于自己的签名说明锁是被其它客户端占有,不能删。比如假设客户端A的签名为666666,客户端b的签名为88888888,锁变量约定好为lock_key。客户端A现申请锁,通过‘set lock_key 666666 nx ’,返回成功。然后客户端B也要申请锁,于是向服务器y发送‘set lock_key 88888888 nx',这时由于客户端A已经设置了lock_key所以会返回失败,锁没有申请成功。等到客户端A处理完完业务,它首先使用get lock_key获取锁变量值,判断值等于自己的签名666666,知道锁是自己占有,然后del lock_key把锁变量删掉,释放锁。之后客户端B就可以重试申请锁。

第二个问题,假如某个客户端用set nx成功设置锁变量占有锁之后,还没来得及释放锁就突然宕机了。这样锁就一直被这个客户端占有,其它客户端就一直拿不到锁。这情况不满足条件2-死锁释放性。针对这个问题,最简单的解决方案是给锁变量设置一个过期时间,这样即使占有锁的客户端意外宕机之后,redis服务器也会在锁变量过期之后自动删除锁变量。或者说锁最终一定会在某段时间后释放。这样其它客户端在一段时间后依然可以申请占有锁。redis中可以使用set 加ex/px选项来设置过期时间。

这里补充一个如果客户端不随机生成一个唯一签名导致误删的场景,客户端A首先成功申请了锁,但是由于操作时间比较长导致锁变量过期被删除了。然后客户端B可以成功加锁,这时候客户端A刚好操作完成以为锁还是它的于是就把锁给删了!假设碰巧此时客户端C也申请加锁,那么B和C就冲突了,这会导致很严重的问题!

综上,我们可以使用set nx ex/px来申请锁。补充很重要的一点就是操作的原子性,对于申请锁,因为只使用了一条命令,所以天然就是原子操作。而对于释放释放,需要先get key,然后判断判断键值,才能删除key,三个操作不能通过一条命令来完成。为了保证操作的原子性,可以使用事务和luau脚本。

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

现在根据这个方案,对于单个节点加锁的实现如下。

class CRedLock {
public:
                            CRedLock();
    virtual                 ~CRedLock();
public:
    bool                    Initialize();
private:
    bool                    LockInstance(redisContext *c, const char *resource,
                                         const char *val, const int ttl);
    void                    UnlockInstance(redisContext *c, const char *resource,
                                           const char *val);
    sds                     GetUniqueLockId();//生成唯一签名
private:

private:
    sds                     m_unlockScript;         // 解锁脚本

};

加锁代码,redisContext表示一条和一个服务器节点的连接,resource就是定义好的锁变量key,val是客户端随机生成的签名,TTL是锁变量过期时间。

bool CRedLock::LockInstance(redisContext *c, const char *resource,
                            const char *val, const int ttl) {
    redisReply *reply;
    //往服务器发送命令设置锁变量
    reply = (redisReply *)redisCommand(c, "set %s %s px %d nx",
                                       resource, val, ttl);
    if (reply) {
        printf("Set return: %s [null == fail, OK == success]\n", reply->str);
    }
    if (reply && reply->str && strcmp(reply->str, "OK") == 0) {
        freeReplyObject(reply);
        return true;//回复OK则表示设置成功,否则就是失败
    }
    if (reply) {
        freeReplyObject(reply);
    }
    return false;
}

释放锁代码:

// ----------------
// 对redismaster解锁
//m_unlockScript  = sdsnew("if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end");
// ----------------
void CRedLock::UnlockInstance(redisContext *c, const char *resource,
                              const char *val) {
    int argc = 5;
    //通过lua脚本来释放锁,先判断再删除
    char *unlockScriptArgv[] = {(char*)"EVAL",
                                m_unlockScript,
                                (char*)"1",
                                (char*)resource,
                                (char*)val};
    redisReply *reply = RedisCommandArgv(c, argc, unlockScriptArgv);
    if (reply) {
        freeReplyObject(reply);
    }
}

多节点分布式锁 

 由于单个节点时没法实现高可靠高可用性的,比如节点突然宕机之后锁变量就有可能丢失了,这会影响业务的正常运行。为了解决单点故障问题就引入了基于多个节点的分布式锁。redis官方分布式锁方案是:RedLock。

RedLock算法的基本思路是让客户端依次和多个redis实例请求加锁,对于单个节点加锁有可能成功也有可能失败,最后统计一下成功加锁的节点数量,如果对超过一半的节点返回成功那就可以认为这个分布式锁成功加锁了;否则认为是加锁失败。这样即使某些节点宕机依然可以成功加锁。举个例子,假如有5个redis服务器节点,客户端A要申请锁时就依次往这5个节点发送加锁命令,如果有3个及以上的节点返回成功因为是成功加锁。对于锁的释放就比较简单了,客户端只需要在所有的redis实例上执行锁的释放命令,而不用管客户端是否之前在此节点上成功的获取了锁。这里不用担心锁被误删,因为单节点释放锁的时候如果锁不是自己的就直接返回成功,不会去删除锁变量。

上述的方案还有两个问题。第一个问题是对单个节点加锁是要耗费一定时间的,那么对多个节点加锁就会耗费更多的时间,加上锁变量是加了过期时间的,所以有可能会出现这样一种情况:假设锁变量的TTL有效时间是10秒,但是申请锁的过程总共耗费了11秒,也就是说申请锁的时候超时,这种情况下不能算是成功加锁。因为有些设置成功的比较早的节点锁变量可能已经过期被删除了。

举个例子,假设有五个服务器节点,客户端A成功在三个节点成功加锁,但是由于后面的一个节点申请等待时间过长导致第一个成功的节点上的锁变量已经超时被删除了。也就是说现在有3个节点还可以设置锁,这时候客户端B发起了加锁流程并成功获得了3个节点的成功回复,获得了锁。这时候两个客户端就同时处在临界区,锁失效了。也就是说即使我们已经获得了超过半数节点的成功回复,也还是要检查一下是否超时。

第二个问题,每台机器的时钟不可能完全一样,或者说有点节点时间跑得快一点,有的慢一点,总之不可能完全一样,这就需要加入一个时钟漂移参数来对有效时间进行校正。

这里我们得出一个公式

validityTime = TTL - (endTime - startTime) - drift

validityTime为锁有效时间, TTL为锁变量存活时间,超时时间,endTime为最后成功的时候, startTime为开始尝试加锁的时候, drift为时钟漂移修正值。

到这里我们的方案已经差不多了,但是还可以对一些细节完善一下。

失败重试。当一个客户端尝试锁失败时,这个客户端应该在一个随机延时后进行重试,之所以加个随机延时是为了避免不同客户端同时竞争重试导致谁都拿不到锁。

尽快释放。如果客户端申请失败了,那么它应该马上释放所持有的锁。比如5个节点只拿到两个节点的锁,这时应该马上释放掉那两个锁,既然失败了就要把机会留给别人,下次再试。

 

bool CRedLock::Lock(const char *resource, const int ttl, CLock &lock) {
    sds val = GetUniqueLockId();//生成一个随机的唯一值,用来设置锁变量值
    if (!val) {
        return false;
    }
    lock.m_resource = sdsnew(resource);
    lock.m_val = val;
    printf("Get the unique id is %s\n", val);
    int retryCount = m_retryCount;//重试次数
    do {
        int n = 0;
        int startTime = (int)time(NULL) * 1000;//获取开始时间startTime
        int slen = (int)m_redisServer.size();//获取服务器节点的数量
        //挨个去服务器节点加锁
        for (int i = 0; i < slen; i++) {
            if (LockInstance(m_redisServer[i], resource, val, ttl)) {
                n++;//如果加锁成功就加一,统计成功节点的数量
            }
        }
        //Add 2 milliseconds to the drift to account for Redis expires
        //precision, which is 1 millisecond, plus 1 millisecond min drift
        //for small TTLs.
        int drift = (ttl * m_clockDriftFactor) + 2;//计算时钟漂移
        //计算有效时间
        int validityTime = ttl - ((int)time(NULL) * 1000 - startTime) - drift;
        printf("The resource validty time is %d, n is %d, quo is %d\n",
               validityTime, n, m_quoRum);
        //如果成功节点数量达到m_quoRum而且有效时间大于0就认为加锁成功
        if (n >= m_quoRum && validityTime > 0) {
            lock.m_validityTime = validityTime;
            return true;
        } else {
            Unlock(lock);
        }
        // Wait a random delay before to retry
        //如果加锁失败就设置一个随机的休眠时间
        int delay = rand() % m_retryDelay + floor(m_retryDelay / 2);
        usleep(delay * 1000);
        retryCount--;
    } while (retryCount > 0);
    //进过retryCount 次尝试都没成功就返回失败
    return false;
}

这个方案在尝试加锁的时候是串行挨个往服务器节点申请锁,一个更理想的方案是并行申请,在同一时间向所有redis实例发送SET指令。 

释放锁代码: 

bool CRedLock::Unlock(const CLock &lock) {
    int slen = (int)m_redisServer.size();
    for (int i = 0; i < slen; i++) {
        UnlockInstance(m_redisServer[i], lock.m_resource, lock.m_val);
    }
    return true;
}

生成唯一随机值得代码,redis作者推荐的方案是从/dev/urandom中生成的20字节的随机值。


// ----------------
// 获取唯一id
// ----------------
sds CRedLock::GetUniqueLockId() {
    unsigned char buffer[20];
    if (read(m_fd, buffer, sizeof(buffer)) == sizeof(buffer)) {
        //获取20byte的随机数据
        sds s;
        s = sdsempty();
        for (int i = 0; i < 20; i++) {
            s = sdscatprintf(s, "%02X", buffer[i]);
        }
        return s;
    } else {
        //读取失败
        printf("Error: GetUniqueLockId %d\n", __LINE__);
    }
    return NULL;
}

 

class CRedLock {
public:
                            CRedLock();
    virtual                 ~CRedLock();
public:
    bool                    Initialize();
    bool                    AddServerUrl(const char *ip, const int port);
    void                    SetRetry(const int count, const int delay);
    bool                    Lock(const char *resource, const int ttl, CLock &lock);
    bool                    ContinueLock(const char *resource, const int ttl,
                                         CLock &lock);
    bool                    Unlock(const CLock &lock);
private:
    bool                    LockInstance(redisContext *c, const char *resource,
                                         const char *val, const int ttl);
    bool                    ContinueLockInstance(redisContext *c, const char *resource,
                                                 const char *val, const int ttl);
    void                    UnlockInstance(redisContext *c, const char *resource,
                                           const char *val);
    sds                     GetUniqueLockId();
    redisReply *            RedisCommandArgv(redisContext *c, int argc, char **inargv);
private:
    static int              m_defaultRetryCount;    // 默认尝试次数3
    static int              m_defaultRetryDelay;    // 默认尝试延时200毫秒
    static float            m_clockDriftFactor;     // 电脑时钟误差0.01
private:
    sds                     m_unlockScript;         // 解锁脚本
    int                     m_retryCount;           // try count
    int                     m_retryDelay;           // try delay
    int                     m_quoRum;               // majority nums
    int                     m_fd;                   // rand file fd
    vector<redisContext *>  m_redisServer;          // redis master servers
    CLock                   m_continueLock;         // 续锁
    sds                     m_continueLockScript;   // 续锁脚本
};

如果业务的执行由客户端的一系列步骤所组成,默认可以将锁的有效时间设置的更小,并实现一个锁的ttl延长机制。如果客户端的计算工作只处理了一半,同时锁的有效时间已经不多了,它可以通过一个Lua脚本的来给所有的redis实例延长key的ttl,并且此key的值保持不变。

客户端应该只有当它可以在锁的有效期之内在多数redis实例上延长锁时才会考虑锁的重新获取。基本上,锁的延长算法非常类似于获取锁的算法。也是通过lua脚本实现,先根据签名判断锁是否是自己的,如果是那就先删除,然后重新设置,并加上一个新的过期时间。

// ----------------
// 对redismaster续锁, 私有函数 
//m_continueLockScript = sdsnew("if redis.call('get', KEYS[1]) == ARGV[1] then redis.call('del', KEYS[1]) end return redis.call('set', KEYS[1], ARGV[2], 'px', ARGV[3], 'nx')");

// ----------------
bool CRedLock::ContinueLockInstance(redisContext *c, const char *resource,
                                    const char *val, const int ttl) {
    int argc = 7;
    sds sdsTTL = sdsempty();
    sdsTTL = sdscatprintf(sdsTTL, "%d", ttl);
    char *continueLockScriptArgv[] = {(char*)"EVAL",
                                      m_continueLockScript,
                                      (char*)"1",
                                      (char*)resource,
                                      m_continueLock.m_val,
                                      (char*)val,
                                      sdsTTL};
    redisReply *reply = RedisCommandArgv(c, argc, continueLockScriptArgv);
    sdsfree(sdsTTL);
    if (reply) {
        printf("Set return: %s [null == fail, OK == success]\n", reply->str);
    }
    if (reply && reply->str && strcmp(reply->str, "OK") == 0) {
        freeReplyObject(reply);
        return true;
    }
    if (reply) {
        freeReplyObject(reply);
    }
    return false;
}

 

 

 

 

 

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐