转载自:https://juejin.cn/post/6844904057920831496

实现原理

基本思路

分布式系统中有很多个节点,但这些节点都访问一个共同的Redis实例(或Redis集群)。所以可以使用 setnx(set if not exists) 指令执行key操作,只允许被一个客户端节点执行,但其它节点再来执行时会失败,这样就可以保证同一时刻只有一个节点占有锁。节点在执行完业务逻辑后需要将key删除(相当于释放锁)。这里为了防止业务执行时发生了异常而导致死锁,还需要给key设置一个过期时间(到了时间自动释放锁),以便其它节点可以继续获取锁。

  • 使用setnx执行成功后一定要设置key的失效时间,或者直接使用命令set key value px millseconds nx
  • 每个key对应的value一定要是唯一的值,可以使用UUID生成一个随机ID。
  • 删除key之前需要验证value是否一致,防止错误地释放锁。

缺陷

事实上这类琐最大的缺点就是它加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:在Redis的master节点上拿到了锁, 但是这个加锁的key还没有同步到slave节点。 master故障,发生故障转移,slave节点升级为master节点,由于salve节点上没有这个加锁的key,所以导致锁丢失。

Lua脚本

因为加锁和解锁涉及到一个以上的命令,为了保证原子性,所以一般使用lua脚本来执行命令。
加锁脚本:

-- 首先尝试使用setnx设置值
local result = redis.call('setnx', KEYS[1], ARGV[2]);
-- 如果成功,则设置key的失效时间
if result == 1 then
  redis.call('pexpire', KEYS[1], tonumber(ARGV[1]));
  -- 设置成功的话就返回空值,与后面返回的失效时间区别开
  return nil 
else
  -- key操作失败,说明有其它节点正在操作,直接返回key的失效时间
  return redis.call('pttl', KEYS[1]) 
end
复制代码

解锁脚本:

-- 首先获取key对应的值
local result = redis.call('get', KEYS[1]);
-- 然后验证value,如果相同则删除该key
if result == ARGV[1] then 
  redis.call('del', KEYS[1])
  return 1 
else
  return nil 
end
复制代码

使用lettuce客户端实现分布式锁

这里基于lettuce客户端实现一个可重入的分布式Redis锁。首先需要引入lettuce的依赖:

<!-- lettuce Redis的高级客户端 -->
<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>5.1.8.RELEASE</version>
</dependency>
复制代码

关于lettuce的使用,可以参考lettuce官方github的wiki

锁的实现

因为加锁和解锁的脚本需要频繁地调用,为了减少网络IO的消耗,所以先将加锁和解锁的脚本缓存到了服务器本地,redis会使用sha1算法将脚本内容变成一个40位16进制组成的字符串作为该脚本的唯一ID并返回给客户端,客户端只需发送这个唯一ID给服务器就可以执行脚本了。在lettuce中实现如下:

/**
 * 两个原子变量,用于存储加锁和解锁脚本的sha ID
 */
private final AtomicReference<String> LUA_SHA_LOCK = new AtomicReference<>();
private final AtomicReference<String> LUA_SHA_UNLOCK = new AtomicReference<>();

// 加锁的lua脚本
// 键不存在则创建,并设置键的的过期时间为5s, 如果已存在则创建失败,
final String lockLua = "local result = redis.call('setnx', KEYS[1], ARGV[2]); " +
        "if result == 1 then redis.call('pexpire', KEYS[1], tonumber(ARGV[1])) " +
        "return nil else return redis.call('pttl', KEYS[1]) end";
// 将脚本缓存到服务器,并保存它的唯一ID到原子变量中
LUA_SHA_LOCK.compareAndSet(null, commands.scriptLoad(lockLua));
// 释放锁的lua脚本
final String unlockLua = "local result = redis.call('get', KEYS[1]);" +
        "if result == ARGV[1] then redis.call('del', KEYS[1]) " +
        "return 1 else return nil end";
// 将脚本缓存到服务器,并保存它的唯一ID到原子变量中
LUA_SHA_UNLOCK.compareAndSet(null, commands.scriptLoad(unlockLua));
复制代码

为了保证值的唯一性,这里使用UUID来生成随机ID作为值,并将该值存在本地线程变量中,当重入锁或解锁时可以直接从变量中拿以保证相同的key拥有的value一致。然后执行lua脚本尝试获取锁,如果获取锁成功则直接返回。否则一直阻塞,并不断尝试获取锁直到拿到锁。

我这里的实现是有问题的,因为不能及时通知其它线程去获取锁。Redisson的分布式锁是通过Redis的发布订阅机制实现通知的则可以及时通知其它线程去抢锁(Java的ReentrantLock则是通过队列实现的)

这里有个坑,就是ScriptOutputType类型和真实返回类型不一样,比如这里我设置的是Integer,它实例返回类型是Long(可以自己打印class查看对象类型)

private Boolean tryLock(String key) {
    String[] keys = new String[]{key};
    // 失效时间为5秒
    String ttl = "5000";
    String value = getValueByKey(key);
    // 如果没有设置过值
    if (value == null) {
        // 锁的值使用UUID生成随机ID以保证值的唯一性
        value = UUID.randomUUID().toString();
        // 将新生成的值放入集合中
        values.get().put(key, value);
    }
    String[] args = new String[]{ttl, value};
    // 使用脚本唯一ID执行lua脚本
    Long result = commands.evalsha(LUA_SHA_LOCK.get(), ScriptOutputType.INTEGER, keys, args);
    // 返回空说明加锁成功
    if (result == null) {
        return true;
    }
    boolean isLock;
    // 一直阻塞,直到拿到锁
    while (true) {
        try {
            // 等待一小会儿
            Thread.sleep(5);
            // 继续尝试获取锁
            isLock = this.tryLock(key);
            if (isLock) {
                return true;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
复制代码

下面贴出完整的代码:

public class Lock {
    private Logger logger = LoggerFactory.getLogger(Lock.class);
    /**
     * 当前线程的锁集合
     */
    private ThreadLocal<Map<String, Integer>> lockers = new ThreadLocal<>();
    /**
     * 当前线程锁的key和value集合
     */
    private ThreadLocal<Map<String, String>> values = new ThreadLocal<>();
    private RedisClient client;
    private RedisCommands<String, String> commands;
    /**
     * 两个原子变量,用于存储加锁和解锁脚本的sha ID
     */
    private final AtomicReference<String> LUA_SHA_LOCK = new AtomicReference<>();
    private final AtomicReference<String> LUA_SHA_UNLOCK = new AtomicReference<>();

    /**
     * Lock的构造函数
     * @param client RedisClient的实例
     */
    public Lock(RedisClient client) {
        this.client = client;
        StatefulRedisConnection<String, String> connection = client.connect();
        commands = connection.sync();
        // 加锁的lua脚本
        // 键不存在则创建,并设置键的的过期时间为5s, 如果已存在则创建失败,
        final String lockLua = "local result = redis.call('setnx', KEYS[1], ARGV[2]); " +
                "if result == 1 then redis.call('pexpire', KEYS[1], tonumber(ARGV[1])) " +
                "return nil else return redis.call('pttl', KEYS[1]) end";
        // 将脚本缓存到服务器,并保存它的唯一ID到原子变量中
        LUA_SHA_LOCK.compareAndSet(null, commands.scriptLoad(lockLua));
        // 释放锁的lua脚本
        final String unlockLua = "local result = redis.call('get', KEYS[1]);" +
                "if result == ARGV[1] then redis.call('del', KEYS[1]) " +
                "return 1 else return nil end";
        // 将脚本缓存到服务器,并保存它的唯一ID到原子变量中
        LUA_SHA_UNLOCK.compareAndSet(null, commands.scriptLoad(unlockLua));
    }

    /**
     * 尝试获取锁
     * @param key 锁的键
     * @return 是否成功获取锁
     */
    private Boolean tryLock(String key) {
        String[] keys = new String[]{key};
        // 失效时间为5秒
        String ttl = "5000";
        String value = getValueByKey(key);
        // 如果没有设置过值
        if (value == null) {
            // 锁的值使用UUID生成随机ID以保证值的唯一性
            value = UUID.randomUUID().toString();
            // 将新生成的值放入集合中
            values.get().put(key, value);
        }
        String[] args = new String[]{ttl, value};
        // 如果创建键成功,则说明加锁成功
        Long result = commands.evalsha(LUA_SHA_LOCK.get(), ScriptOutputType.INTEGER, keys, args);
        if (result == null) {
            return true;
        }
        boolean isLock;
        // 一直阻塞,直到拿到锁
        while (true) {
            try {
                // 这里的实现是有问题的
                // Redisson实现的分布式锁是使用发布订阅实现的(Java的reentrantLock通过队列实现的)
                // 从而可以及时通知其它线程去抢锁
                Thread.sleep(5);
                // 继续尝试获取锁
                isLock = this.tryLock(key);
                if (isLock) {
                    return true;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 尝试释放锁
     * @param key 锁的键
     * @return 是否成功释放锁
     */
    private boolean tryRelease(String key) {
        String[] keys = new String[]{key};
        String[] args = new String[]{getValueByKey(key)};
        // 释放锁
        Long result = commands.evalsha(LUA_SHA_UNLOCK.get(), ScriptOutputType.INTEGER, keys, args);
        return result != null;
    }

    /**
     * 获取当前线程的锁
     * @param key 锁的键
     * @return 当前线程的锁和该锁的重入次数
     */
    private Integer getLockerCnt(String key) {
        // 获取当前线程的锁集合
        Map<String, Integer> map = lockers.get();
        // 如果集合不为空,返回key对应的值
        if (map != null) {
            return map.get(key);
        }
        lockers.set(new HashMap<>(4));
        return null;
    }

    /**
     * 获取锁对应的值
     * @param key 锁的键
     * @return 锁对应的值
     */
    private String getValueByKey(String key) {
        // 获取当前线程的锁和对应值的键值对集合
        Map<String, String> map = values.get();
        // 如果集合不为空,返回key对应的值
        if (map != null) {
            return map.get(key);
        }
        values.set(new HashMap<>(4));
        return null;
    }

    /**
     * 加可重入锁
     * @param key 锁的键
     * @return 是否成功
     */
    public boolean lock(String key){
        Integer refCnt = getLockerCnt(key);
        if (refCnt != null) {
            // 如果锁已持有,则锁的引用计数加1
            lockers.get().put(key, refCnt + 1);
            return true;
        }
        // 尝试加锁
        boolean ok = this.tryLock(key);
        // 如果加锁失败,则返回
        if (!ok) {
            return false;
        }
        // 加锁成功,引用计数设置为1
        lockers.get().put(key, 1);
        return true;
    }

    /**
     * 释放可重入锁
     * @param key 锁的键
     * @return 是否成功
     */
    public boolean unlock(String key) {
        Integer refCnt = getLockerCnt(key);
        // 当前未持有锁
        if (refCnt == null) {
            return false;
        }
        // 锁的引用数减1
        refCnt --;
        // 引用计数大于0,说明还持有锁
        if (refCnt > 0) {
            lockers.get().put(key, refCnt);
        } else {
            // 否则从锁集合中删除该键,并释放锁
            lockers.get().remove(key);
            return this.tryRelease(key);
        }
        return true;
    }
}
复制代码

测试锁的使用

public class TestLettuce {
    private final Logger logger = LoggerFactory.getLogger(TestLettuce.class);
    private RedisClient client;
    private int count;

    @Test
    public void testLock() throws InterruptedException {
        RedisURI redisUri = RedisURI.builder().withHost("localhost").withPort(6379).withPassword("redis666").build()
        // 获取客户端
        client = RedisClient.create(redisUri);
        // 创建锁的实例
        Lock lock = new Lock(client);
        // 创建一个固定线程池
        ExecutorService service = Executors.newFixedThreadPool(5);
        // 开启五个任务
        for (int i = 0; i < 5; i++) {
            service.submit(() -> {
                boolean isLock;
                try {
                    isLock = lock.lock("myLock");
                    if (isLock) {
                        logger.info("我拿到锁啦");
                        count ++;
                        logger.info("count: [{}]", count);
                    }
                }finally {
                    lock.unlock("myLock");
                }
            });
        }
        Thread.sleep(10000);
        client.shutdown();
    }
复制代码

根据输出可以看到,实例变量是被五个线程安全地使用的。

2020-02-06 20:53:04.434 [pool-1-thread-2] INFO  com.ncusofter.redis.TestDemo - 我拿到锁啦
2020-02-06 20:53:04.434 [pool-1-thread-2] INFO  com.ncusofter.redis.TestDemo - count: [1]
2020-02-06 20:53:04.441 [pool-1-thread-5] INFO  com.ncusofter.redis.TestDemo - 我拿到锁啦
2020-02-06 20:53:04.441 [pool-1-thread-5] INFO  com.ncusofter.redis.TestDemo - count: [2]
2020-02-06 20:53:04.445 [pool-1-thread-3] INFO  com.ncusofter.redis.TestDemo - 我拿到锁啦
2020-02-06 20:53:04.445 [pool-1-thread-3] INFO  com.ncusofter.redis.TestDemo - count: [3]
2020-02-06 20:53:04.449 [pool-1-thread-1] INFO  com.ncusofter.redis.TestDemo - 我拿到锁啦
2020-02-06 20:53:04.449 [pool-1-thread-1] INFO  com.ncusofter.redis.TestDemo - count: [4]
2020-02-06 20:53:04.455 [pool-1-thread-4] INFO  com.ncusofter.redis.TestDemo - 我拿到锁啦
2020-02-06 20:53:04.457 [pool-1-thread-4] INFO  com.ncusofter.redis.TestDemo - count: [5]
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐