原理

一般写法如下:

        RLock lock = redissonClient.getLock("myLock");
        lock.lock();
        try{
            //业务逻辑
        }finally {
            lock.unlock();
        }

        其实redis分布式锁就是基于redis的hash数据类型实现的,key为:锁名称,即myLock,field为:uuid+threadId,value为:上锁次数,从此可以看出redis锁是可重入的

一. 初始化锁RedissonLock

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor;
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    //若锁被其它线程占用,使用redis的发布订阅pub/sub功能来订阅释放锁消息
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
    //1.RedissonObject.name赋值
     super(commandExecutor, name);
     //2.执行lua脚本的执行器
    this.commandExecutor = commandExecutor;
    //3.创建ConnectionManager时产生id:UUID id = UUID.randomUUID();e12badb5-7396-4f05-8290-aaa352e04bc4
    //被用来当做 和threadId组成 value值,用作判断加锁和释放锁是否是同一个线程的校验
    this.id = commandExecutor.getConnectionManager().getId();
    //4.Config的lockWatchdogTimeout 默认lockWatchdogTimeout = 30 * 1000
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    this.entryName = id + ":" + name;
}

二:上锁lock

//RedissonLock.lock()

@Override
public void lock(long leaseTime, TimeUnit unit) {
    try {
        lock(leaseTime, unit, false);
    } catch (InterruptedException e) {
        throw new IllegalStateException();
    }
}

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        //1.获取当前线程id
        long threadId = Thread.currentThread().getId();
        //2.尝试上锁
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        //3.锁获取成功,直接返回 lock acquired
        if (ttl == null) {
            return;
        }
        //4.锁被其它线程占用,订阅释放锁通知,并同步阻塞future完成,即获取释放信号量通知
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
         //同步阻塞future完成,即创建监听runnable,如果是第一个获取锁失败的线程则真正创建释放锁监听
         //int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
         //最大阻塞时间为:3s+1.5s*3
            commandExecutor.syncSubscription(future);
        }

        try {
            //5.收到释放信号量的通知后,进入死循环尝试获取锁
            while (true) {
                //5.1再次尝试加锁,如果获取到锁,直接返回,停止死循环
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                //5.2锁未获取成功,说明有其它线程持有锁,阻塞获取锁
                if (ttl >= 0) {
                    try {
                        // 在锁剩余时间内,阻塞等待获取信号量Semaphore, Semaphore.release()会在订阅释放锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁时,会广播释放锁消息,监听器接收释放锁消息后,释放信号量,最终会唤醒阻塞在这里的线程
                        //tryAcquire()让当前线程阻塞获取信号量,,避免了在while无限循环中频繁请求获取锁
                       // 若在最大等待时间内仍未获取到信号量,进入下一个循环尝试获取锁
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    // 锁过期时间小于零, 则一直等
                    if (interruptibly) {
                        future.getNow().getLatch().acquire();
                    } else {
                        future.getNow().getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            //6.取消订阅
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

//尝试获取锁并等待上锁结果
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
       
//执行lua脚本尝试获取锁 
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    //1.异步执行lua脚本,尝试上锁
    if (leaseTime != -1) {
        //使用自定义锁时间
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        //使用默认的30s
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    //2.等待上锁结果
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            //上锁不成功
            return;
        }

        // lock acquired上锁成功
        if (ttlRemaining == null) {
            if (leaseTime != -1) {
                //重置锁时间为自定义时间
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                //2.1使用默认时间上锁即30s,定时任务每隔10s给锁重置一下过期时间直到锁被释放
                //使用自定义时间就不会不断给锁续延过期时间
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}    


<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
      //keys[1]为锁名,即redissonClient.getLock("myLock")中的myLock
      //ARGV[1]:锁时间,即lock.lock(3, TimeUnit.MINUTES),3*60s,默认internalLockLeaseTime=30s
      //ARGV[2]:锁的唯一标识,uuid + ":" + threadId
      加索的整体逻辑使用hset报错上锁的线程,key为索命myLock,field为uuid+threadId,value为field上锁的次数
      
            //1.若key不存在,则进行加锁,返回nil,即null
            "if (redis.call('exists', KEYS[1]) == 0) then " +
            //1.1对key加锁,并设置加索次数为1
            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
            //1.2设置key的过期时间
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return nil; " +
            "end; " +
            
            //2.若key存在,判断上锁的是否为此线程,若是次数+1,返回nil,即null
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            //2.1 上锁线程数+1
            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
            //2.2 重置过期时间
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return nil; " +
            "end; " +
            
            //3.走到这,说明锁获取不成功,即被其它线程已占用,则返回锁的剩余时间
            "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

上面值得注意的是:

  1. 只有lock.lcok()没有指定leaseTime时,即使用默认过期时间internalLockLeaseTime:30s时,才会每隔internalLockLeaseTime/3即10s重置一下锁的过期时间,即延续锁的过期时间,后面会讲到scheduleExpirationRenewal();如果lock.lock()上锁时指定leaseTime,不会续锁,那么到底使用那种方式合适那,每种方式都有各自优点和弊端:
        a. 指定leaseTime:
            优点:此种方式就算因为异常没有执行unlock(),也有自身的过期时间,等过期后,别的线程就能得到此锁了
            弊端:如果代码在超时释放锁时还未完成业务,就可能出现并发带来的数据问题,若怕出现此问题,可以将leaseTime设大一点;
        b. 不指定leaseTime:
            优点:有效解决了上面业务未执行完锁就释放了的问题
            弊端:如果因为代码异常导致取消续锁任务未执行到,该锁就永远不能自动释放,造成死锁,除非服务挂掉(续锁任务停止)或手动将key删除后,别的线程才能获取到该锁,所以,unLock()一般写在finally里,尽可能避免死锁

    综上: 个人觉得还是指定合适的leaseTime好,能确保系统出现故障未释放锁后,在一定时间内能够主动去释放锁,避免造成死锁

  1. 在获取锁失败后,利用通过redis的channel 发布/订阅功能来实现订阅释放锁事件尝试再次获取锁,再次获取锁失败,利用Semaphore.tryAcquire()阻塞等待信号量,即等待锁释放,如果在等待的过程中一直未等到锁的释放事件通知,当超过最大等待时间则获取锁失败,返回 false;如果等到了锁的释放事件的通知,则开始进入下一个while循环重试获取锁(由此处可看出,我们这讲的是非公平锁,redisson实现了公平锁,读写锁等等),避免了当获取不到锁时一直while死循环无效尝试获取锁造成资源浪费

获取锁的整体流程:
在这里插入图片描述
模拟多线程同时请求锁的情况:
(注意redis中锁值得变化)
在这里插入图片描述

三:释放锁unLock

//RedissonLock.unlock()

@Override
public void unlock() {
    try {
        //释放当前线程持有锁
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}    

@Override
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<>();
    //1.利用lua脚本释放锁
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
        //2.取消续锁的定时任务
        cancelExpirationRenewal(threadId);

        if (e != null) {
            result.tryFailure(e);
            return;
        }

        //2.1 返回==null,说明此线程没有持有锁,抛异常
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }

        result.trySuccess(null);
    });

    return result;
}

//使用lua脚本释放锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
          //keys[1]为锁名,即redissonClient.getLock("myLock")中的myLock
          // //keys[2]为订阅通道:redisson_lock__channel:{myLock}
          //ARGV[1]:发布锁释放事件类型:LockPubSub.UNLOCK_MESSAGE
          //ARGV[2]:锁过期时间
          //ARGV[3]:锁的唯一标识,uuid + ":" + threadId
      
             //1.若field不存在,即当前线程不持有锁,返回null
             "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
                "end; " +
                
                //2.若field存在,value-1,即当前线程持有锁。再判断锁的重入次数,即value值
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                
                //2.1若value-1后仍 >0,说明此线程lock了多次,重置锁剩余时间,返回0
                "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
                "else " +
                
                //2.2若value-1后 不>0,说明此线程不再持有锁,删除key并发布LockPubSub.UNLOCK_MESSAGE事件,返回1
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
                "end; " +
                
                //3.返回null
                "return nil;",
             Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

释放锁的整理流程:

在这里插入图片描述

四:续锁 /取消续锁

  1. 锁续时:使用默认过期时间上锁成功后调用
// RedissonBaseLock.scheduleExpirationRenewal()

//过期时间更新,当有新线程占用锁时就开始定时任务,定时更新key的过期时间
protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        //1.说明锁已经有定时任务在续时了(因为锁可重入,重入时走此分支),增加锁重入次数,在是否要终止续锁时有用
        oldEntry.addThreadId(threadId);
    } else {
        //需要续锁次数+1
        entry.addThreadId(threadId);
        //2.要定时更新锁过期时间
        renewExpiration();
    }
}

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    //使用默认过期时间上锁成功后,10后执行一次续锁任务,如果线程仍然占用着锁,则10s继续重置过期时间为internalLockLeaseTime
    //直到锁(即uuid +  name)没有对应的ExpirationEntry ,即没有线程占用锁定时任务停止
    //在取消续锁RedissonBaseLock.cancelExpirationRenewal()时,如果ExpirationEntry 没有对应的线程后,就删除对应的ExpirationEntry 
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
        
            //1.判断线程是否还持有锁
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            //2.还有线程持有锁,利用lua脚本判断线程是否还持有锁,若是,重置过期时间
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                
                if (res) {
                    // 3.说明线程还在持有锁,10s后继续重置过期时间,调用自己
                    renewExpiration();
                }
            });
        }
        // 10s后执行
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}

//判断threadId是否存在,存在重置key的过期时间
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
           //KEYS[1]:锁名,myLock
           //ARGV[1]:过期时间
           //ARGV[2]:uuid+threadId
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            //1.若线程还持有锁,重置过期时间,返回1
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return 1; " +
            "end; " +
            //2.否则返回0
            "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

值得注意的: 只有使用默认过期时间,即lock()时没有指定leaseTime时,才会续锁

  1. 取消续锁:在释放锁时调用
// RedissonBaseLock.cancelExpirationRenewal()

//取消续锁的定时任务
protected void cancelExpirationRenewal(Long threadId) {
    ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (task == null) {
        return;
    }
    
    if (threadId != null) {
        //1.线程上锁次数-1,如果-1后为0,则删除锁持有的此线程
        //次数用来适应重入锁的,如第三次重入,第一次unlock时,还有其它两次上着锁那,所以仍需续锁
        task.removeThreadId(threadId);
    }

    //2.若锁已没有上锁的线程了则取消在lock时设置的续锁的定时任务,且将锁对应的ExpirationEntry删除
    //删除后在lock成功后的那个续锁的定时任务就不会执行了,因为取不到锁对应的ExpirationEntry了
    if (threadId == null || task.hasNoThreads()) {
        Timeout timeout = task.getTimeout();
        if (timeout != null) {
            timeout.cancel();
        }
        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
    }
}     

存在的问题: 若lock()后时,因为异常续锁没有被取消,那么问题就来了,锁就会一直被持有,其它线程将永远获取不到锁,所以unlock()一定要放到finally{}中

五:未获取锁时订阅获取锁逻辑

这简单说一下如何利用发布订阅获取锁的流程,有关redis发布订阅详细内容会在专门的文章中讲解

  1. 订阅释放锁消息
PublishSubscribe

    //entryName:uuid:name
    //channelName:redisson_lock__channel:{name}
    public RFuture<E> subscribe(String entryName, String channelName) {
        //1.获取semaphore,在初始化时允许请求数为1,同一个锁获取的semaphore一样
        AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
        RPromise<E> newPromise = new RedissonPromise<>();
        //2.添加到监听队列,并tryRun()
        semaphore.acquire(() -> {
            if (!newPromise.setUncancellable()) {
                semaphore.release();
                return;
            }


            E entry = entries.get(entryName);
            if (entry != null) {
                //2.1走到这,说明是第2,3...个线程获取锁失败,将监听器保存到。。。。
                entry.acquire();
                //执行监听任务
                semaphore.release();
                entry.getPromise().onComplete(new TransferListener<E>(newPromise));
                return;
            }

            //2.2走到这,说明是第一个获取锁失败的线程,
            //2.2.1创建RedissonLockEntry,默认允许信号量请求数为0,然后 +1
            E value = createEntry(newPromise);
            value.acquire();

            //2.2.2将创建的lockEntry添加到entries中,在续锁和取消续锁时用
            E oldValue = entries.putIfAbsent(entryName, value);
            if (oldValue != null) {

                oldValue.acquire();
                semaphore.release();
                oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
                return;
            }

            //***2.2.3真正的创建并注册监听,监听redisson_lock__channel:{name}
            RedisPubSubListener<Object> listener = createListener(channelName, value);
            service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
        });

        return newPromise;
    }

    //redisson_lock__channel:{name}
    private RedisPubSubListener<Object> createListener(String channelName, E value) {
        RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {

            //1.收到消息后,拉取第一个监听者任务,开始监听下一个释放锁消息,并释放一个请求
            @Override
            public void onMessage(CharSequence channel, Object message) {
                if (!channelName.equals(channel.toString())) {
                    return;
                }

                PublishSubscribe.this.onMessage(value, (Long) message);
            }

            @Override
            public boolean onStatus(PubSubType type, CharSequence channel) {
                if (!channelName.equals(channel.toString())) {
                    return false;
                }

                if (type == PubSubType.SUBSCRIBE) {
                    value.getPromise().trySuccess(value);
                    return true;
                }
                return false;
            }

        };
        return listener;
    }
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    public static final Long UNLOCK_MESSAGE = 0L;
    public static final Long READ_UNLOCK_MESSAGE = 1L;

    public LockPubSub(PublishSubscribeService service) {
        super(service);
    }
    
    @Override
    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(UNLOCK_MESSAGE)) {
            //1.拉取监听队列中第一个任务并执行run()(又创建监听释放锁监听),
            //2.释放1个信号量唤醒等待的entry.getLatch().tryAcquire去尝试申请锁
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }

            value.getLatch().release();
        } else if (message.equals(READ_UNLOCK_MESSAGE)) {
            //2.使用RedissonWriteLock时,释放锁时为事件
            while (true) {
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute == null) {
                    break;
                }
                runnableToExecute.run();
            }

            value.getLatch().release(value.getLatch().getQueueLength());
        }
    }

}

//AsyncSemaphore 

    //默认为1,同时只能有1个监听,因为业务只能允许同一时间只能有一个线程获取锁,
    //如果counter=2,说明同时能有两个线程获取锁,未被了锁的设计
    private final AtomicInteger counter;
    //决定了要监听几次锁释放,即release几次信号量,也即几个线程正在等待获取锁
    private final Queue<Runnable> listeners = new ConcurrentLinkedQueue<>();

    //在初始化PublishSubscribeService创建,permits=1
    public AsyncSemaphore(int permits) {
        counter = new AtomicInteger(permits);
    }

    public void acquire(Runnable listener) {
        //1.将listener添加到监听队列,在收到释放锁消息时,就会从listeners中拉取首部runnable并尝试执行
        listeners.add(listener);
        //2.尝试创建释放锁监听
        tryRun();
    }

   //根据信号量counter决定是否创建释放锁监听
    private void tryRun() {
        //默认为1,同时只能有1个监听
        //1.说明是第2,3。。。个线程获取锁失败或在收到释放锁时想要拉取下一个监听任务,为null,即不再执行run()创建监听
        if (counter.get() == 0
                || listeners.peek() == null) {
            return;
        }
  
        if (counter.decrementAndGet() >= 0) {
            //2.说明是第一个线程获取锁失败,可以获取信号量,直接执行监听者任务,不会阻塞
            Runnable listener = listeners.poll();
            if (listener == null) {
                counter.incrementAndGet();
                return;
            }

            listener.run();
        } else {
            //3.按理来说应该走不到此处
            if (counter.incrementAndGet() > 0) {
                //+1,调用自己执行监听
                tryRun();
            }
        }
    }

订阅释放锁整体流程:
在这里插入图片描述

  1. 取消订阅释放锁
//PublishSubscribe

  //entryName:uuid:name
  //channelName:redisson_lock__channel:{name}
public void unsubscribe(E entry, String entryName, String channelName) {
    AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
    
    semaphore.acquire(() -> {
        if (entry.release() == 0) {
            //1.counter-1==0说明不再需要监听说明是在前面subscription()创建监听时创建的Entry
            //发布PubSubType.UNSUBSCRIBE事件,完成后并释放信号量
            // just an assertion
            boolean removed = entries.remove(entryName) == entry;
            if (!removed) {
                throw new IllegalStateException();
            }
            service.unsubscribe(PubSubType.UNSUBSCRIBE, new ChannelName(channelName))
                    .onComplete((r, e) -> {
                        semaphore.release();
                    });
        } else {
            semaphore.release();
        }
    });

}

六:redisson分布式锁问题

        如果是redis-cluster模式,在高并发时,线程1对master写入了myLock锁,在异步赋值给master对应的slave节点时发生当即(此时还未写入到slave成功),主备切换,slave变为了master,此时线程2请求锁,发现没有线程锁定也获取到了锁,此时就有两个线程1,线程2同时获取到了锁,会造成脏数据

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐