相关阅读

简介

本文基于Spring Boot 2.6.6redisson 3.16.0简单分析Redisson分布式锁自动续期的实现过程。

Demo

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.16.0</version>
</dependency>

测试代码

public class LockDemo {

    private final RedissonClient redissonClient;

    public LockDemo(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }

    public static void main(String[] args) throws InterruptedException {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        LockDemo lockDemo = new LockDemo(Redisson.create(config));
        new Thread(lockDemo::reentrantLock).start();
        lockDemo.release();
    }

    public void release() {
        this.redissonClient.shutdown();
    }

    public void reentrantLock() {
        RLock reentrantLock = redissonClient.getLock("reentrant-lock");

        reentrantLock.lock();
        try {
            // do something
        } finally {
            reentrantLock.unlock();
        }
    }
}

简析

获取锁

Redisson分布式锁获取有两种方式:

  1. lock():未指定过期时间,实现时会设置过期时间,默认30s,然后采用Watchdog不断续期,直至释放锁;
  2. lock(long leaseTime, TimeUnit unit):指定过期时间,超过有效期时间后,会自动释放锁;

本文关注未指定过期时间的获取锁方式,RedissonLock.lock()代码如下:

public void lock() {
    try {
        // 过期时间为-1,表示永不过期
        lock(-1, null, false);
    } catch (InterruptedException e) {
        throw new IllegalStateException();
    }
}

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    if (ttl == null) {
        // 获取到锁直接返回
        return;
    }

    // 还未获取到锁

    // 订阅锁,这样锁释放时会被通知到
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    if (interruptibly) {
        commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        commandExecutor.syncSubscription(future);
    }

    try {
        while (true) {
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            if (ttl == null) {
                // 获取到锁则可以退出死循环
                break;
            }

            if (ttl >= 0) {
                try {
                    // 指定超时时间内获取
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                // 未指定超时时间获取
                if (interruptibly) {
                    future.getNow().getLatch().acquire();
                } else {
                    future.getNow().getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        // 取消锁的订阅
        unsubscribe(future, threadId);
    }
//    get(lockAsync(leaseTime, unit));
}

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime != -1) {
        // 指定过期时间
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        // 未指定过期时间
        // 过期时间设为看门狗超时时间,然后由看门狗一直续期,直到锁释放
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        if (ttlRemaining == null) {
            // 获取到锁

            if (leaseTime != -1) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 未指定过期时间,需要开启Watchdog自动续期
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

首先看下尝试获取锁的实现,tryLockInnerAsync方法通过EVAL执行LUA脚本,代码如下:

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
    "if (redis.call('exists', KEYS[1]) == 0) then " +
            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return nil; " +
            "end; " +
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return nil; " +
            "end; " +
            "return redis.call('pttl', KEYS[1]);",
    Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

主要逻辑如下:

  1. 若锁不存在,则创建锁,并设置过期时间,然后返回nil
  2. 若锁存在且由本线程持有,则锁计数加一,并重设过期时间,然后返回nil
  3. 否则返回锁的过期时间;

由上可知,当返回nil才意味着获取到锁,否则获取锁失败;

再看下开启Watchdog任务自动续期的实现,scheduleExpirationRenewal方法代码如下:

protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        // Watchdog任务已存在,则添加本次线程ID即可
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        // 创建Watchdog任务,用于重设过期时间
        renewExpiration();
    }
}

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    // 借助Netty的Timeout实现自动续期
    // 超时时间为1/3过期时间,确保在过期前能够重设过期时间
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                // 不存在则无需继续执行
                // 释放锁后会删除该Key
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                
                if (res) {
                    // 锁还存在,则需要继续开启Watchdog
                    // 递归执行,重设过期时间
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}

renewExpirationAsync方法通过EVAL执行LUA脚本实现重设锁的过期时间,代码如下:

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

逻辑如下:

  1. 若锁存在,则重设过期时间,然后返回1
  2. 否则,返回0

至此,Redisson获取分布式锁时通过开启Watchdog完成自动重设过期时间的实现就分析完了,接下来分析释放锁时如何关闭Watchdog;

释放锁

Redisson分布式锁释放的方法为RedissonLock.unlock(),代码如下:

public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<>();
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
        // 取消自动续期
        cancelExpirationRenewal(threadId);

        if (e != null) {
            result.tryFailure(e);
            return;
        }

        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }

        result.trySuccess(null);
    });

    return result;
}

首先看下释放锁的实现,unlockInnerAsync方法通过EVAL执行LUA脚本,代码如下:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

主要逻辑如下:

  1. 若锁不存在,则直接返回nil
  2. 若锁计数减一后还大于0,则重设过期时间,然后返回0
  3. 否则删除锁,并发布解锁消息(通知其它线程可以获取锁),然后返回1

再看下关闭Watchdog的实现,scheduleExpirationRenewal方法代码如下:

protected void cancelExpirationRenewal(Long threadId) {
    ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (task == null) {
        // Watchdog任务已不存在则直接退出
        return;
    }
    
    if (threadId != null) {
        // 移除本次线程
        task.removeThreadId(threadId);
    }

    // 当前Watchdog任务已经没有绑定线程,则可以关闭
    if (threadId == null || task.hasNoThreads()) {
        Timeout timeout = task.getTimeout();
        if (timeout != null) {
            // 取消Watchdog任务
            timeout.cancel();
        }
        // 删除该Key
        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
    }
}

至此,Redisson释放分布式锁时关闭Watchdog任务的实现就分析完了。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐