网上很多文章关于redission自动续期原理,基本中文一句话带过,但是对于源码都没有分析。大部分分析都是错误的!!!所以在此对着源码分析分析。

 

一句话总结

redission分布式锁自动续期,是在超市时间/3的时候,会触发锁检查,发现线程ID未解锁,则触发续锁操作。续锁会创建redission自己实现的TimerTask,然后放到时间轮中触发,触发延迟1500ms。

时间轮相当于一个倒计时的秒表,时间轮的格子数,每个格子代表的时间间隔(秒表倒计时指针多久走一格)都可以设置,每个格子内的任务有个队列缓存,初始长度是1024。然后当倒计时指针走到当前格子时,格子内的任务,当round轮数是0的时候触发,如果round轮数>0则减1;然后倒计时指针继续走下一个格子。

当倒计时指针走到最后一个格子的时候,复位到第一个格子(格子虽然是个列表,但是这种行为看起来像个环)。

因为采用了时间轮,只有一个倒计时主线程,所以不会太费性能。

 

以下是源码分析

RedissionLock源码

lock() -> lockInterruptibly

    @Override
    public void lock() {
        try {
            lockInterruptibly();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

在tryAcquire(leaseTime, unit, threadId)方法中,会调用scheduleExpirationRenewal(final long threadId)

internalLockLeaseTime是用户加锁传入的超时时间,所以自动续期方法的执行周期是超时时间的1/3。

 

private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            
            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can't update lock " + getName() + " expiration", future.cause());
                        return;
                    }
                    
                    if (future.getNow()) {
                        // reschedule itself
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }

    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
        task.cancel();
    }
}

 

续期的核心方法renewExpirationAsync(long threadId) ---> commandExecutor.evalWriteAsync()

----> org.redisson.command.CommandAsyncService#evalAsync()

----> org.redisson.command.CommandAsyncService#async() 里面的核心代码创建TimerTask任务

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()), 
        internalLockLeaseTime, getLockName(threadId));
}

 

TimerTask任务

----> connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); 通过时间轮来触发这个task。

public <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
        final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int attempt, 
        final boolean ignoreRedirect) {
    if (mainPromise.isCancelled()) {
        free(params);
        return;
    }

    if (!connectionManager.getShutdownLatch().acquire()) {
        free(params);
        mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown"));
        return;
    }

    Codec codecToUse = getCodec(codec);
    
    final AsyncDetails<V, R> details = AsyncDetails.acquire();
    final RFuture<RedisConnection> connectionFuture = getConnection(readOnlyMode, source, command);

    final RPromise<R> attemptPromise = new RedissonPromise<R>();
    details.init(connectionFuture, attemptPromise,
            readOnlyMode, source, codecToUse, command, params, mainPromise, attempt);

    FutureListener<R> mainPromiseListener = new FutureListener<R>() {
        @Override
        public void operationComplete(Future<R> future) throws Exception {
            if (future.isCancelled() && connectionFuture.cancel(false)) {
                log.debug("Connection obtaining canceled for {}", command);
                details.getTimeout().cancel();
                if (details.getAttemptPromise().cancel(false)) {
                    free(params);
                }
            }
        }
    };

    final TimerTask retryTimerTask = new TimerTask() {
    //...
    }
    //private int retryInterval = 1500;
    Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
    details.setTimeout(timeout);
    details.setupMainPromiseListener(mainPromiseListener);

    connectionFuture.addListener(new FutureListener<RedisConnection>() {
        //...
    }
}

 

时间轮触发,默认org.redisson.config.BaseConfig的配置private int retryInterval = 1500;

private HashedWheelTimer timer;

public Timeout org.redisson.connection.MasterSlaveConnectionManager#newTimeout(TimerTask task, long delay, TimeUnit unit) {
    try {
        return timer.newTimeout(task, delay, unit);
    } catch (IllegalStateException e) {
        // timer is shutdown
        return dummyTimeout;
    }
}

 

 

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐