Redisson是Redis官方推荐的Java版的Redis客户端。它提供的功能非常多,也非常强大,此处我们只用它的分布式锁功能。关于基本的用法不做赘述,简单看一下用法就好

https://github.com/redisson/redisson

添加redisson的配置和maven依赖
 

<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.12.0</version>
        </dependency>

 

@Bean
    RedissonClient redissonClient(){
        Config config=new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
       // config.useClusterServers();//用于集群的配置
        //config.useSentinelServers();//用于哨兵的配置
        return Redisson.create(config);
    }

 

@Autowired
    RedissonClient redisson;
    @GetMapping("/lock")
    public String testStu()  {
        RLock lock = redisson.getLock("lock");
        lock.lock();
        try {
            System.out.println("加锁完毕,业务中:");
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return "ok";
    }

我们知道用redisson的一个好处就是,它可以通过redis来进行对分布式系统下的多线程进行lock控制。比如它的加锁阶段,自动续期, 自动解锁阶段。

对于加锁阶段,是有两个方法可以采用

lock.lock();
lock.lock(2,TimeUnit.SECONDS);

意为一个是采用系统默认,一个是自定义了锁时间过期的时间。下面主要看看不同之处。

最简单的一点就是,如果不设置过期时间,那会采用默认的30s的时间来过期(当然这个时间也可以通过设置看门狗时间来更改默认的30s)。然后当程序运行时候,会有一种自动续期的策略,即为到某个时间点然后会将时间再变为30s。一直等到程序运行完成调用unlock()方法就解锁。如果程序宕机获取其他原因中断,因为有过期时间的存在,也会自动删除redis中的key,不会造成死锁问题。这就是他的看门狗机制。watchDog。

如果设置了锁的超时时间,那就简单了。直接通过lua脚本去执行程序。。这里有一个问题就是,例如程序运行了3s,但是过期时间设置的是1s,那在释放锁unlock()的时候回报错
java.lang.IllegalMonitorStateException: attempt to unlock lock, not locked by current thread by node id: 5dd94211-69eb-4853-8d9c-6bb1e6c33c7a thread-id: 1188。类似于这种。这是因为
 

@Override
    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        future.onComplete((opStatus, e) -> {
            if (e != null) {
                cancelExpirationRenewal(threadId);
                result.tryFailure(e);
                return;
            }

            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
            
            cancelExpirationRenewal(threadId);
            result.trySuccess(null);
        });

        return result;
    }

 就相当于是,之前的锁已经过期不存在了,再当我要去尝试解锁释放的时候,肯定就找不到这个锁了,所以才会报错。可以不用unlock()方法让它自己过期就好了。
下面看看lock.lock();就这个原生方法的实现理解

在RedissonLock类中有这样做一个方法lock(),

lock(long leaseTime, TimeUnit unit, boolean interruptibly),如果没有传值的话,默认

lock(-1, null, false);是这种
在lock方法里面调用tryAcquire,下面继续调用tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
// 这里就是刚刚说的如果用lock.lock()传时间的话,leaseTime 就是自己设置的值
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
//这里是采用的默认的,getLockWatchdogTimeout()返回的就是默认或者通过设置的时间
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
// 这里就是进行自动续期时间的操作
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

在lock()里面有这样一段代码

1.表示一直循环,尝试去查看当前锁的情况

2.就是我们开始上面看的那个方法,也是看门狗的实现

3.就是说,当锁已过期或者是任务结束则自动就去结束循环,不然也会导致死循环的

看到这个方法scheduleExpirationRenewal(threadId); 

private void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
//这里有一个ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
// 因为我们是第一次进行加锁的操作,所以此时map里面是没值的。putIfAbsent返回只能是null
            entry.addThreadId(threadId);
//这里的方法就是进行延迟重试的方式
            renewExpiration();
        }
    }

 主要看我进行注释的地方,就是判断有无对象,然后启动时间轮,延时时间为watchdog的三分之一。

private void renewExpiration() {
// 因为在上面的时候已经用了putIfAbsent,所以放进了一个ExpirationEntry 对象
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        // 这里就启动了时间轮,然后进行循环,启动的时间就是进行延时internalLockLeaseTime / 3
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    if (res) {
                        // 只要是连接未停止,即线程任务还未结束,则进行自旋的方式
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

 RFuture<Boolean> future = renewExpirationAsync(threadId);上面代码中的这一句就是在进行续期的操作,里面实现

 protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "return 0;",
            Collections.<Object>singletonList(getName()), 
            internalLockLeaseTime, getLockName(threadId));
    }
/**
internalLockLeaseTime就是我们的看门狗时间,当我们通过上面的时间轮来重复执行这个lua脚本的时候,会重置这个过期时间。又因为此时是同一个threadId,所以操作的也就是这同一个当前线程对象和同一个锁对象
**/

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐