什么是分布式锁?解决什么问题?

分布式锁,是控制分布式系统的各节点同步访问共享资源的一种方式,防止各节点互相干扰引起数据一致性的问题。以前我们学过的锁,比如synchronized、lock等,只在单台机器下生效。如果系统部署了多台机器,或者互斥资源在不同的服务之间进行操作,那么就锁不住。如下图,

  1. 现在有一个订单服务模块部署在3台服务器上,现在有用户1-4发起支付请求,其中用户1、2由订单服务1处理;用户4由订单服务2处理;其中用户3由订单服务3处理。
  2. 如果用的是单机锁,比如用户1发起支付,那只有订单服务1被锁住。其他两个点并未上锁,用户3、4仍然可以支付。这样就会出现并发问题。
    在这里插入图片描述
    为了解决上述问题,我们有了分布式锁的概念。如下图,把加锁动作交给第三方去做。
    在这里插入图片描述

常见的分布式锁实现方案

  • 基于 Redis 实现分布式锁
  • 基于 Zookeeper 实现分布式锁

本文介绍基于 Redis 的分布式锁实现方案。

Redis实现分布式锁

主要步骤

  1. 指定一个 key 作为锁标记,存入 Redis 中,指定一个 唯一的用户标识 作为 value。
  2. 当 key 不存在时才能设置值,确保同一时间只有一个客户端进程获得锁,满足 互斥性 特性。
  3. 设置一个过期时间,防止因系统异常导致没能删除这个 key,满足 防死锁 特性。
  4. 当处理完业务之后需要清除这个 key 来释放锁,清除 key 时需要校验 value 值,需要满足 只有加锁的人才能释放锁

Redisson实现分布式锁

Redisson 是 Redis 官方推荐的分布式锁实现方案。Redisson中的很多锁操作是通过lua脚本实现的,所以,我们先学习一下lua脚本。

Lua脚本

  • lua语言是一个轻量级的脚本语言,依赖于其他宿主语言,自己不会有大而全的功能,但是可以调用宿主语言的功能。

  • redis完美契合了lua脚本的功能,redis可以调用lua脚本中的api,lua脚本也可以调用redis中的指令。

  • lua脚本可以通过redis的eval 命令执行。如下

格式
eval [lua脚本] [key的个数] [key ...] [arg ...] 

在这里插入图片描述

条件语句:如果传入的键是1,则返回第一个参数,否则返回第二个参数。

eval "if KEYS[1]=='1' then return ARGV[1] end return ARGV[2]" 1 1 'yang' 'xiao yang'

在这里插入图片描述
在lua脚本中,我们也可以执行redis的指令,并且可以根据返回的结果继续执行后续的逻辑。如下,判断key=id是否存在,不存在就调用redis的set指令。

eval "local key=redis.call('exists',KEYS[1]) if key==0 then return redis.call('set',KEYS[1],ARGV[1]) end return 1" 1 id XY

在这里插入图片描述

  • lua脚本中的redis指令是原子的。上面的lua语句,判断是否存在的指令和set指令中间不会插入其他指令。在执行lua脚本期间,其他的指令会阻塞。必须等待lua脚本的指令执行完毕。所以单个lua脚本中的指令不宜有太多指令。

Lua 脚本的使用场景

  • 需要原子性地执行多个命令
  • 需要指令的中间值来组合后面的命令
  • 需要指令的中间值来编排后面的命令

Redisson简述

一个基于Redis实现的分布式工具。

Redisson的使用

Redisson整合SpringBoot

创建SpringBoot项目

引入redisson依赖

只需要引入springboot-redisson-starter就可以了,不过这里需要注意springboot与redisson的版本,因为官方推荐redisson版本与springboot版本配合使用。

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.15.6</version>
</dependency>

ctrl+单击,可以进入redisson-spring-boot-starter查看对应redisson-spring-data依赖的版本。redisson-spring-data与Spring Boot version的版本对应关系如下。

redisson-spring-data module nameSpring Boot version
redisson-spring-data-161.3.x
redisson-spring-data-171.4.x
redisson-spring-data-181.5.x
redisson-spring-data-202.0.x
redisson-spring-data-212.1.x
redisson-spring-data-222.2.x
redisson-spring-data-232.3.x
redisson-spring-data-242.4.x

redisson配置

1. application.yml + redisson.yml
  • application.yml
spring:
  application:
    name: redisson
  redis:
    redisson:
      file: classpath:redisson.yml

server:
  port: 8080
  • redisson.yml
singleServerConfig:
  #  password: 123456
  address: "redis://127.0.0.1:6379"
  database: 0
threads: 0
nettyThreads: 0
codec: !<org.redisson.codec.FstCodec> {}
transportMode: "NIO"
2. application.yml
spring:
  application:
    name: redisson
  redis:
    redisson:
      config:
        singleServerConfig:
#          password: 123456
          address: "redis://127.0.0.1:6379"
          database: 0
        threads: 0
        nettyThreads: 0
        codec: '!<org.redisson.codec.FstCodec> {}'
        transportMode: "NIO"
server:
  port: 8080
3. 程序配置

创建配置类RedissonConfig

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        config.setTransportMode(TransportMode.NIO);
        //单个服务节点的配置
        SingleServerConfig singleServerConfig = config.useSingleServer();
        singleServerConfig.setAddress("redis://127.0.0.1:6379");
//        singleServerConfig.setPassword("123456");
        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }
}

测试

创建controller,然后启动并测试

@RestController
@RequestMapping("/v1")
public class TestController {
    @Autowired
    private RedissonClient redissonClient;

    @GetMapping("/setRedis")
    public String setRedis(String key, String value) {
        RBucket<String> rBucket = redissonClient.getBucket(key);
        rBucket.set(value);
        String result = (String) redissonClient.getBucket(key).get();
        System.out.println(result);
        return result;
    }
}

Redisson分布式锁测试

controller
@RestController
public class ProductController {
    @Autowired
    private ProductService productService;

    @RequestMapping(value = "/pay_lock",method = RequestMethod.GET)
    @ResponseBody
    public String payRedisson(@RequestParam("id") Long id) throws InterruptedException {
        return productService.payRedisson(id);
    }
}
service
@Service
public class ProductService {
    @Autowired
    private RedissonClient redissonClient;
    public String payRedisson(Long orderId) throws InterruptedException {
        RLock rLock1 = redissonClient.getLock("order_lock" + orderId);
        if (rLock1.tryLock(2, -1, TimeUnit.SECONDS)) {
            System.out.println("获取锁成功");
            Thread.sleep(10000);
            rLock1.unlock();
            return "处理完成";
        } else {
            System.out.println("获取锁失败");
            return "请稍等,已经有人在支付!!";
        }
    }
}
测试

同时启两个服务,端口分别是8080、8081

在vm options中添加:-Dserver.port=8081
在这里插入图片描述

结果如下

同时调用8080和8081的服务,id都为10,可以看到8081被阻塞。
在这里插入图片描述
在这里插入图片描述

Redisson实现分布式锁

概述

  • 工程:p01-redisson

1. 可重入锁(Reentrant Lock),不可中断

如下,关键方法。

//创建RLock对象
redissonClient.getLock(key);
//RLock对象加锁,其中timeout是锁的超时时间,即这个锁被当前线程持有的时间,超过这个时间,锁自动释放
lock.lock(timeout, TimeUnit.SECONDS);

下面对redisson的锁做了封装

@Configuration
@Slf4j
public class RedissonTemplate {
    private final RedissonClient redissonClient;

    private final String DEFAULT_LOCK_NAME = "xy-lock-pre";

    public RedissonTemplate(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }

    /**
     * 可重入锁,不可中断
     *
     * @param lockName
     * @param timeout:锁超时时间,即锁持有的时间
     * @return
     */
    public boolean lock(String lockName, long timeout) {
        checkRedissonClient();
        RLock lock = this.getLock(lockName);
        try {
            if (timeout != -1) {
                //增加锁超时时间
                lock.lock(timeout, TimeUnit.SECONDS);
            } else {
                lock.lock();
            }

            log.debug(" get lock success ,lockKey:{}", lockName);
            return true;
        } catch (Exception e) {
            log.error(" get lock fail,lockKey:{}, cause:{} ",
                    lockName, e.getMessage());
            return false;
        }
    }

    /**
     * 解锁
     *
     * @param lockName
     */
    public void unlock(String lockName) {
        checkRedissonClient();
        try {
            RLock lock = this.getLock(lockName);
            // 当前lock是不已上锁
            // 当前线程是否持有此lock
            if (lock.isLocked() && lock.isHeldByCurrentThread()) {
                lock.unlock();
                log.debug("key:{},unlock success", lockName);
            } else {
                log.debug("key:{},没有加锁或者不是当前线程加的锁 ", lockName);
            }
        } catch (Exception e) {
            log.error("key:{},unlock error,reason:{}", lockName, e.getMessage());
        }
    }

    /**
     * 获取锁对象
     *
     * @param lockName
     * @return
     */
    private RLock getLock(String lockName) {
        String key = DEFAULT_LOCK_NAME + lockName;
        return redissonClient.getLock(key);
    }

    /**
     * 校验redissonClient
     */
    private void checkRedissonClient() {
        if (null == redissonClient) {
            log.error(" redissonClient is null ,please check redis instance ! ");
            throw new RuntimeException("redissonClient is null ,please check redis instance !");
        }
        if (redissonClient.isShutdown()) {
            log.error(" Redisson instance has been shut down !!!");
            throw new RuntimeException("Redisson instance has been shut down !!!");
        }
    }
}

测试类

package com.learn.p01redisson;

import com.learn.p01redisson.reentrantlock.RedissonTemplate;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@SpringBootTest
@Slf4j
public class P01ReetrantLockTest {
    @Autowired
    private RedissonTemplate redissonTemplate;
	//用于线程同步,main能等待子线程结束后再结束
    private CountDownLatch count = new CountDownLatch(2);

    @Test
    void contextLoads() {
        String lockName = "xy-test";

        new Thread(() -> {
            Thread.currentThread().setName("Thread-01");
            String threadName = Thread.currentThread().getName();
            log.info("线程:{} 正在尝试获取锁。。。", threadName);
            boolean lock = redissonTemplate.lock(lockName, 10L);
            doSomthing(lock, lockName, threadName);

        }).start();

        new Thread(() -> {
            Thread.currentThread().setName("Thread-02");
            String threadName = Thread.currentThread().getName();
            log.info("线程:{} 正在尝试获取锁。。。", threadName);
            boolean lock = redissonTemplate.lock(lockName, 10L);
            doSomthing(lock, lockName, threadName);
        }).start();

        try {
            count.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("子线程都已执行完毕,main函数可以结束了!");

    }

    private void doSomthing(boolean lock, String lockName, String threadName) {
        if (lock) {
            log.info("线程:{},获取到了锁", threadName);
            try {
                try {
                    TimeUnit.SECONDS.sleep(5L);
                    //TimeUnit.SECONDS.sleep(15L);
                    count.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                redissonTemplate.unlock(lockName);
                log.info("线程:{},释放了锁", threadName);
            }
        }
    }
}

在这里插入图片描述
经过测试可以看到,在50:03,Thread-01获取到了锁,5s后释放锁,然后Thread-02获取到了锁,5s后释放锁。

把55行代码的时间改成15L,则得到如下结果:Thread-01获取到了锁后,过了10s,锁超时,自动释放,但线程仍然会在15s后完成。然后,然后Thread-02获取到了锁。
在这里插入图片描述

  • 这种锁有个问题:

Thread-01获取到了锁后,如果一直不释放,Thread-02会一直处于阻塞状态。除非被其他线程中止。

lock上锁时,是可以不设置过期时间的。此时只要Thread-01不解锁,那么Thread-02等其他线程都将一直处于阻塞状态。这样就会引发一个很严重的问题,那就是在Thread-01获取到了锁之后,程序或者服务器突然宕机,等重启完成之后,Thread-02等其他线程也会一直处于阻塞状态,因为宕机前获取的锁还没有被释放。

为了解决这个问题,redisson设置一个看门狗。它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。说直白一点,如果你加的锁没有指定过期时间,那么redisson会默认将这个锁的过期时间设置为 30 秒,快到 30 的程序去自动续期,直到程序把锁释放,如果这个时候服务器宕机了,那么程序的续期功能自然也就不存在了,锁最多还能再存活 30 秒。

2. 可重入锁(Reentrant Lock),可中断

  • RLock提供了tryLock()方法,用于创建可中断的可重入锁。
  • 相比不可中断的可重入锁,可中断的可重入锁提供阻塞线程的等待时间,超过这个等待时间,阻塞线程就结束等待。
	/**
     * 获取可重入锁,可中断
     *
     * @param lockName
     * @param waitTimeout:等待时间,即获取不到锁的线程尝试的时间
     * @param unit
     * @return
     */
    public boolean tryLock(String lockName, long waitTimeout, TimeUnit unit) {
        checkRedissonClient();
        RLock lock = getLock(lockName);
        try {
            boolean res = lock.tryLock(waitTimeout, unit);
            if (!res) {
                log.debug(" get lock fail ,lockKey:{}", lockName);
                return false;
            }
            log.debug(" get lock success ,lockKey:{}", lockName);
            return true;
        } catch (Exception e) {
            log.error(" get lock fail,lockKey:{}, cause:{} ",
                    lockName, e.getMessage());
            return false;
        }
    }

测试代码

@SpringBootTest
@Slf4j
public class P02ReetrantLockTest {
    @Autowired
    private RedissonTemplate redissonTemplate;

    private CountDownLatch count = new CountDownLatch(2);

    @Test
    public void test() {
        String lockName = "hello-test";
        new Thread(() -> {
            String threadName = Thread.currentThread().getName();
            log.info("线程:{} 正在尝试获取锁。。。", threadName);
            boolean lock = redissonTemplate.tryLock(lockName, 2L, TimeUnit.SECONDS);
            doSomthing(lock, lockName, threadName);
        }).start();

        new Thread(() -> {
            String threadName = Thread.currentThread().getName();
            log.info("线程:{} 正在尝试获取锁。。。", threadName);
            boolean lock = redissonTemplate.tryLock(lockName, 2L, TimeUnit.SECONDS);
            doSomthing(lock, lockName, threadName);
        }).start();
        try {
            count.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("子线程都已执行完毕,main函数可以结束了!");
    }

    private void doSomthing(boolean lock, String lockName, String threadName) {
        if (lock) {
            log.info("线程:{},获取到了锁", threadName);
            try {
                try {
                    TimeUnit.SECONDS.sleep(5L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                redissonTemplate.unlock(lockName);
                log.info("线程:{},释放了锁", threadName);
            }
        } else {
            log.info("线程:{},没有获取到锁,过了等待时长,结束等待", threadName);
        }
        count.countDown();
    }
}

结果如下,可以看到Thread-3没有获取到锁后等待了2s,然后结束等待。
在这里插入图片描述

3. 公平锁(Fair Lock)

公平,就是所谓的先来后到,先尝试获取锁的线程先拿到锁,后面的线程排队。前面两种都是非公平锁。

实现方式

	/**
     * 获取公平锁
     *
     * @param lockName
     * @param waitTimeout
     * @param timeout
     * @param unit
     * @return
     */
    public boolean getFairLock(String lockName, long waitTimeout, long timeout, TimeUnit unit) {
        checkRedissonClient();
        RLock lock = redissonClient.getFairLock(DEFAULT_LOCK_NAME + lockName);
        try {
            boolean res = lock.tryLock(waitTimeout, timeout, unit);
            if (!res) {
                return false;
            }
            return true;
        } catch (InterruptedException e) {
            log.error("get lock failed, lockName:{},cause:{}", lockName, e.getMessage());
            return false;
        }
    }

    /**
     * 解公平锁
     *
     * @param lockName
     */
    public void unlockFairLock(String lockName) {
        checkRedissonClient();
        try {
            RLock lock = redissonClient.getFairLock(DEFAULT_LOCK_NAME + lockName);
            // 当前lock是不已上锁
            // 当前线程是否持有此lock
            if (lock.isLocked() && lock.isHeldByCurrentThread()) {
                lock.unlock();
                log.debug("key:{},unlock success", lockName);
            } else {
                log.debug("key:{},没有加锁或者不是当前线程加的锁 ", lockName);
            }
        } catch (Exception e) {
            log.error("key:{},unlock error,reason:{}", lockName, e.getMessage());
        }
    }

测试类

@SpringBootTest
@Slf4j
public class P03FairLockTest {

    @Autowired
    private RedissonTemplate redissonTemplate;

    private CountDownLatch count = new CountDownLatch(3);

    /**
     * 公平锁
     *
     * @throws InterruptedException
     */
    @Test
    public void test() throws InterruptedException {
        String lockName = "xy-fair-lock";
        new Thread(() -> {
            String threadName = "thread-01";
            log.info("进入{} ======", threadName);
            log.info("{} 正在尝试获取锁。。。", threadName);
            boolean lock = redissonTemplate.getFairLock(lockName, 20L, 7L, TimeUnit.SECONDS);
            doSomthing(lock, lockName, threadName);
        }).start();

        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2L);
                String threadName = "thread-02";
                log.info("进入{} ======", threadName);
                log.info("{} 正在尝试获取锁。。。", threadName);
                boolean lock = redissonTemplate.getFairLock(lockName, 20L, 7L, TimeUnit.SECONDS);
                doSomthing(lock, lockName, threadName);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            String threadName = "thread-03";
            log.info("进入{} ======", threadName);
            try {
                TimeUnit.SECONDS.sleep(3L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("{} 正在尝试获取锁。。。", threadName);
            boolean lock = redissonTemplate.getFairLock(lockName, 20L, 7L, TimeUnit.SECONDS);
            doSomthing(lock, lockName, threadName);
        }).start();
        count.await();
    }

    private void doSomthing(boolean lock, String lockName, String threadName) {
        if (lock) {
            log.info("线程:{},获取到了锁", threadName);
            try {
                try {
                    TimeUnit.SECONDS.sleep(5L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                redissonTemplate.unlockFairLock(lockName);
                log.info("线程:{},释放了锁", threadName);
            }
        } else {
            log.info("线程:{},没有获取到锁,过了等待时长,结束等待", threadName);
        }
        count.countDown();
        log.info("线程:{},结束>>>>>", threadName);
    }
}

结果如下,可以看到thread-01、thread-02、thread-03按尝试获取锁的顺序执行。
在这里插入图片描述

4. 联锁(MultiLock)

联锁指的是:同时对多个资源进行加锁操作,只有所有资源都加锁成功的时候,联锁才会成功。

基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。

@SpringBootTest
@Slf4j
public class P04MultiLockTest {
    @Autowired
    private RedissonTemplate redissonTemplate;

    @Test
    public void test() {
        RLock lock1 = redissonTemplate.getLock("lock1");
        RLock lock2 = redissonTemplate.getLock("lock2");
        RLock lock3 = redissonTemplate.getLock("lock3");
        RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
        boolean flag = lock.tryLock();

        if (flag) {
            try {
                log.info("联锁加锁成功");
            } finally {
                //一定要释放锁
                lock.unlock();
            }
        }
    }
}

5. 红锁(RedLock)

红锁与联锁类似,也是对多个节点进行加锁。但联锁是所有节点都加锁成功,联锁才会成功。红锁只需要超过半数的节点加锁成功即可。

假设有N个redis的master节点,节点相互独立,N推荐为奇数。客户端在获取锁时,需要做以下操作:

  1. 获取当前时间戳,以微秒为单位。
  2. 使用相同的lockName和lockValue,尝试从N个节点获取锁。当从N个节点获取锁结束后,如果客户端能够从(N/2 + 1)个节点中成功获取锁,且获取锁的时间小于失效时间,那么可认为,客户端成功获得了锁。(获取锁的时间=当前时间戳 - 步骤1的时间戳)。
  3. 客户端成功获得锁后,那么锁的实际有效时间 = 设置锁的有效时间 - 获取锁的时间。
  4. 客户端获取锁失败后,N个节点的redis实例都会释放锁,即使未能加锁成功。
//测试类
@SpringBootTest
@Slf4j
public class P05RedLockTest {
    @Autowired
    private RedissonTemplate redissonTemplate;

    @Test
    public void test() {
        RLock lock1 = redissonTemplate.getLock("lock1");
        RLock lock2 = redissonTemplate.getLock("lock2");
        RLock lock3 = redissonTemplate.getLock("lock3");
        RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);

        try {
            log.info("获取红锁成功");
            lock.tryLock();
        } finally {
            lock.unlock();
        }
    }
}

6. 读写锁(ReadWriteLock)

读写锁(ReadWriteLock)允许资源可以加多个读锁和一个写锁。重要的是,分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。这点相当于java并发sdk并发包中的 StampedLock。而java的ReadWriteLock在加了读锁后,是不能加写锁的,写操作是需要处于阻塞状态。即读锁和写锁互斥。

@SpringBootTest
@Slf4j
public class P06ReadWriteLockTest {
    @Autowired
    private RedissonClient redissonClient;

    @Test
    public void test() {
        String lockName = "testRWLock";
        RReadWriteLock rwlock = redissonClient.getReadWriteLock(lockName);

        rwlock.readLock().lock(20L, TimeUnit.SECONDS);
        rwlock.writeLock().lock(20L,TimeUnit.SECONDS);
    }
}

7. 信号量(Semaphore)

信号量(Semaphore)是一个计数器,用来保护共享资源的访问。信号量会预先设定一个许可数,如果一个线程要访问一个资源就必须先获得信号量。如果信号量的内部计数器的值大于0,则值减1,然后允许共享资源;如果信号量的内部计数器的值等于0,信号量将会把线程置入休眠直至计数器大于0。当线程使用完资源时,必须释放资源,同时信号量的内部计数器的值加1。

示例

测试如下,设置许可个数为10,一个有100个线程需要执行,每个线程执行时间约1s,所以结果可以看到每秒同时有10个线程在执行。

@SpringBootTest
@Slf4j
public class P07SemaphoreTest {
    @Autowired
    private RedissonClient redissonClient;

    @Test
    public void test() throws InterruptedException {
        RSemaphore semaphore = redissonClient.getSemaphore("testSemaphore");
        //设置许可个数
        semaphore.trySetPermits(10);
        CountDownLatch count = new CountDownLatch(100);
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                long startTime = System.currentTimeMillis();
                try {
                    String threadName = Thread.currentThread().getName();
                    semaphore.acquire();
                    long endTime = System.currentTimeMillis();
                    log.info("线程{}获取到资源,用时:{}ms", threadName, (endTime - startTime));
                    TimeUnit.SECONDS.sleep(1L);
                    count.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                finally {
                    semaphore.release();
                }
            }).start();
        }
        count.await();
    }
}

结果

可以看到,每次都只有10个线程一起执行。
在这里插入图片描述

可过期信号量

获取到的许可有个有效期。

@Test
public void testPermitExpirableSemaphore() throws InterruptedException {
	RPermitExpirableSemaphore semaphore = redissonTemplate.getPermitExpirableSemaphore("testPermitExpirableSemaphore");
	//设置许可个数
	semaphore.trySetPermits(10);
	// 获取一个信号量,有效期只有1秒钟。
	String permitId = semaphore.acquire(1, TimeUnit.SECONDS);
	log.info("许可:{}",permitId);
	semaphore.release(permitId);
}

8. 闭锁(CountDownLatch)

redisson的CountDownLatch和JUC的CountDownLatch很类似,主要作用也是让一个或多个线程等待,直到等待的线程全部完成或超时,自己再开始执行。

@SpringBootTest
@Slf4j
public class P08CountDownLatchTest {
    @Autowired
    private RedissonClient redissonClient;

    @Test
    public void test() throws InterruptedException {
        long startTime = System.currentTimeMillis();
        RCountDownLatch rCountDownLatch = redissonClient.getCountDownLatch("num");
        //设置初始值
        rCountDownLatch.trySetCount(5);

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                doTask(rCountDownLatch);
            }).start();
        }
        rCountDownLatch.await();
        long endTime = System.currentTimeMillis();
        log.info("所有线程都执行完成,用时:{}ms", (endTime - startTime));
    }

    private void doTask(RCountDownLatch rCountDownLatch) {
        try {
            String threadName = Thread.currentThread().getName();
            long startTime = System.currentTimeMillis();
            TimeUnit.SECONDS.sleep(3);
            long endTime = System.currentTimeMillis();
            log.info("线程:{}执行完成,用时:{}ms", threadName, (endTime - startTime));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rCountDownLatch.countDown();
        }
    }
}

结果:

可以看到5个子线程都完成后,主线程才执行完成。
在这里插入图片描述

Redission分布式锁原理源码分析

Redission就是通过lua脚本+hash实现的。

Redisson分布式锁流程图

在这里插入图片描述

Redisson加锁过程分析

执行步骤

  1. 根据lockName创建RLock对象

  2. 调用tryLock方法,传入waitTime(阻塞排队等待的时间)、leaseTime(过期时间,线程占用资源的时间)

  3. 根据lockName生成大key,并判断redis中,大key是否存在。

  4. 如果不存在,则用hash数据类型保存锁数据,hash结构如下,并设置过期时间:leaseTime>0 ? leaseTime : 30s(默认)。然后return null,表示加锁成功,开始执行业务代码

    key:大key

    ​ filed:UUID+线程ID(唯一性标识)

    ​ value:1

  5. 如果存在,判断filed是否与当前获得锁的线程相同。

  6. 如果相同,当前线程锁重入次数+1,设置过期时间30s。然后return null,表示加锁成功,开始执行业务代码

  7. 如果不相同,返回占用资源的锁的剩余过期时间ttl。

  8. 判断获取锁花费的时间是否超过剩余的等待时间(waitTime),如果超过,则获取锁失败。

  9. 如果没有超过,则订阅锁频道,等待锁释放。这里通过Semaphore阻塞,此时阻塞时间是剩余等待时间。

  10. 如果在剩余等待时间内,锁释放了,则自旋获取锁。

  11. 再次尝试获取锁,如果返回null,则表示获取锁成功;如果返回ttl,则判断获取锁花费的时间是否超过剩余的等待时间(waitTime),如果超过,则获取锁失败。

  12. 如果没有超过,则订阅锁频道,等待锁释放。这里通过Semaphore阻塞,此时阻塞时间:剩余等待时间<锁释放时间?剩余等待时间: 锁释放时间。

  13. 如果在剩余等待时间内,锁释放了,则自旋获取锁。重复10-12的操作。

源码分析

测试代码
String lockName = "lockName";
// 步骤1. 根据lockName创建RLock对象
RLock rLock = redissonClient.getLock(lockName);
// 步骤2. 调用tryLock方法,传入waitTime(阻塞排队等待的时间)、leaseTime(过期时间,线程占用资源的时间)
Boolean isLock = rLock.tryLock(3L, 5L, TimeUnit.SECONDS);
1. tryLock方法(类:RedissonLock)
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
	long time = unit.toMillis(waitTime);
	long current = System.currentTimeMillis();
	long threadId = Thread.currentThread().getId();
    //如是返回null代表拿到了锁,此方法的源码进入1.1
	Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
	// lock acquired
	if (ttl == null) {
		return true;
	}
    
	// 代码执行到这里,说明抢锁失败
	time -= System.currentTimeMillis() - current;
    // 步骤8. 判断获取锁花费的时间是否超过剩余的等待时间(waitTime),如果超过,则获取锁失败。
	if (time <= 0) {
		acquireFailed(waitTime, unit, threadId);
		return false;
	}
    
    // 步骤9. 如果没有超过,则订阅锁频道,等待锁释放。这里通过Semaphore阻塞,此时阻塞时间是剩余等待时间。
	current = System.currentTimeMillis();
    //添加锁的监听,如果锁释放,会进入监听类,将Semaphore释放,及时拿到锁
	CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
	try {
        //这是阻塞方法,会等到锁释放或超时。
		subscribeFuture.get(time, TimeUnit.MILLISECONDS);
	} catch (ExecutionException | TimeoutException e) {
		if (!subscribeFuture.cancel(false)) {
			subscribeFuture.whenComplete((res, ex) -> {
				if (ex == null) {
					unsubscribe(res, threadId);
				}
			});
		}
		acquireFailed(waitTime, unit, threadId);
		return false;
	}

	try {
		time -= System.currentTimeMillis() - current;
		if (time <= 0) {
			acquireFailed(waitTime, unit, threadId);
			return false;
		}
		
        // 步骤10. 如果在剩余等待时间内,锁释放了,则自旋获取锁。
        //自旋获取锁
		while (true) {
			long currentTime = System.currentTimeMillis();
            // 步骤11. 再次尝试获取锁,如果返回null,则表示获取锁成功;如果返回ttl,则判断获取锁花费的时间是否超过剩余的等待时间(waitTime),如果超过,则获取锁失败。
			ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
			// lock acquired
			if (ttl == null) {
				return true;
			}

			time -= System.currentTimeMillis() - currentTime;
			if (time <= 0) {
				acquireFailed(waitTime, unit, threadId);
				return false;
			}

			// waiting for message
			currentTime = System.currentTimeMillis();
            // 步骤12. 如果没有超过,则订阅锁频道,等待锁释放。这里通过Semaphore阻塞,此时阻塞时间:剩余等待时间<锁释放时间?剩余等待时间: 锁释放时间。
            // 通过Semaphore进行阻塞,阻塞时间:剩余等待时间time<锁释放时间ttl?剩余等待时间: 锁释放时间
			if (ttl >= 0 && ttl < time) {
				commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
			} else {
				commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
			}

			time -= System.currentTimeMillis() - currentTime;
			if (time <= 0) {
				acquireFailed(waitTime, unit, threadId);
				return false;
			}
            // 步骤13. 如果在剩余等待时间内,锁释放了,则自旋获取锁。重复10-12的操作。
		}
	} finally {
		unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
	}
}
1.1. tryAcquire(类:RedissonLock)
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    //进入1.2
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
1.2. tryAcquireAsync(类:RedissonLock)
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
	RFuture<Long> ttlRemainingFuture;
	if (leaseTime > 0) {
        //如果传入了释放时间,将释放时间传入,进入方法1.3
		ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
	} else {
        //如果没有传入锁的释放时间,默认internalLockLeaseTime=30000,释放时间为30s,进入方法1.3
		ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
				TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
	}
    //tryLockInnerAsync是异步方法,thenApply是它的回调函数。获取锁的操作完成后,执行回调函数
	CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
		// ttlRemaining == null表示成功获取到锁
		if (ttlRemaining == null) {
			if (leaseTime > 0) {
				internalLockLeaseTime = unit.toMillis(leaseTime);
			} else {
                //看门狗的入口
				scheduleExpirationRenewal(threadId);
			}
		}
		return ttlRemaining;
	});
	return new CompletableFutureWrapper<>(f);
}
1.3. tryLockInnerAsync(类:RedissonLock)
//这里执行lua脚本,实现可重入锁的功能,整个lua脚本是原子的。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    //lua脚本,进入1.4
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }
1.4. lua脚本
// 步骤3. 根据lockName生成大key,并判断redis中,大key是否存在。
//判断我的锁是否存在,=0为不存在 没人抢占锁
if (redis.call('exists', KEYS[1]) == 0) 
then
    // 步骤4. 如果不存在,则用hash数据类型保存锁数据,hash结构如下,并设置过期时间:leaseTime>0 ? leaseTime : 30s(默认)。然后return null,表示加锁成功,开始执行业务代码
    //把我的小key +1
    redis.call('hincrby',KEYS[1], ARGV[2], 1); 
    redis.call('pexpire', KEYS[1], ARGV[1]); //设置过期时间
	return nil;
end;
// 步骤5. 如果存在,判断filed是否与当前获得锁的线程相同。
// 进入该逻辑说明有线程抢占了锁 继续判断是否同一个线程 ==1为同一线程
if (redis.call('hexists', KEYS[1],ARGV[2]) == 1) 
then
	// 步骤6. 如果相同,当前线程锁重入次数+1,设置过期时间30s。然后return null,表示加锁成功,开始执行业务代码
	redis.call('hincrby', KEYS[1], ARGV[2], 1); 
	redis.call('pexpire', KEYS[1], ARGV[1]); //设置过期时间
	return nil;
end;
// 步骤7. 如果不相同,返回占用资源的锁的剩余过期时间ttl。
//前面2个if都没进,说明有人抢占并且不是同一线程,直接返回还有多少ms过期
return redis.call('pttl',KEYS[1]); 

怎么解决锁时间到了,但是业务没执行完

如果锁时间到了,我的业务没有执行完,那么我是希望这个锁要一直被我占有的!所以我们能不能申请去加时!直到我执行完任务,手动去删除锁。所以这个问题的解决方法就是开个定时任务,定时去判断下这个锁还存不存在,如果存在,续期,如果不存在,不续期。

定时调度的方式有很多种,redis使用的是时间轮算法

时间轮算法

例子

举例说明:如下图,假如我的数组大小为8,即hash环为8,每移到一次下标的时间为1s。那么整个hash环跑完的时间为8s。现在的需求是我添加3个任务,task1是5s后执行,task3是5s后执行,在task1执行完成后启动task2,task2是13s后执行,这3个任务应该怎么放?
在这里插入图片描述
示例代码如下:

public class HashedWheelTimerTest {
    public static void main(String[] args) throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final long startTime = System.currentTimeMillis();
        Timer timer = new HashedWheelTimer();
        System.out.println((new Date()).toString() + ":开始");
        TimerTask task1 = new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                long current = System.currentTimeMillis();
                System.out.println((new Date()).toString() + ":" + ((current - startTime) / 1000) + "s后执行task1");
                countDownLatch.countDown();
            }
        };

        TimerTask task2 = new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                long current = System.currentTimeMillis();
                System.out.println((new Date()).toString() + ":" + ((current - startTime) / 1000) + "s后执行task2");
            }
        };

        TimerTask task3 = new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                long current = System.currentTimeMillis();
                System.out.println((new Date()).toString() + ":" + ((current - startTime) / 1000) + "s后执行task3");
                countDownLatch.countDown();
            }
        };

        timer.newTimeout(task1, 5, TimeUnit.SECONDS);
        timer.newTimeout(task3, 5, TimeUnit.SECONDS);
        countDownLatch.await();
        timer.newTimeout(task2, 13, TimeUnit.SECONDS);
    }
}

执行过程:

  1. 在new HashedWheelTimer的时候会将数组大小定义为8,数组的循环间隔设置为1s。

  2. 在第一次newTimeout放入task1时后,会将任务添加到queue,并且开启hash环的循环,设置hash环的启动时间。

  3. hash环开始轮询,从队列中拿到task1,是5s后执行。

    • 我们得到这个task1是5s后执行,我们先得到task1要**执行的绝对时间(从时间轮开始,多久后执行)=当前时间-hash环的启动时间+5s;**因为是第一次启动,所以绝对时间无限接近5s。
    • 模的值=hash环的大小 - 1。根据task1的5s的绝对时间,task1在hash环上的下标 = 5&7=5
    • 同理,task3也会放入下标5的位置,因为下标已经有task1,task3会以链表的形式放在task1后面。
    • 从0开始,当循环到下标5的位置的的时间刚好是5s,在循环到下标5的时候,会把5位置的链表的任务都执行,所以在5s后task1、task3会执行。
  4. 当执行完task1、task3后,我新加入task2。

    • task2是当前时间的13s后执行,计算这个任务执行的绝对时间=当前时间 - hash环的启动时间 + 13s。因为是等task1、3执行完,所以这个绝对时间为5+13=18s
    • 根据task2的绝对时间18s,取模的值=hash环的大小 - 1。task1在hash环上的下标 = 18&7=2,但是2这个位置,我从5开始走到2这个位置的时间只需要5s,所以第一次循环到2的时候不能执行。
    • 第一次走到2的时候,判断roud是否为0,因为task2的roud=1不执行,但是将task2的roud-1=0。
    • 当第二次执行到2下标的时候,task2的roud为0,执行,从5走到第2轮的2,经过的时间刚好为13s,所以,task2能在13s后执行

注意:

  1. HashedWheelTimer中的hash环实质是一个固定大小的数组,默认大小为512,默认数组的循环间隔tickDuration=100ms 。数组中存放的是task任务的链表。
  2. 新起worker线程去循环数组元素,只不过循环的频率有固定的时间限制,每次循环把task从队列中取出放入对应的下标数组中。
  3. 如果循环到的数组下标中有任务,就去执行数组中的任务。这样就能达到我某个下标中的task能够在某个时刻去执行的目的。

watchDog看门狗机制

执行步骤
  1. 当线程获取锁成功的时候,如果没有设置过期时间(leaseTime <= 0),则触发看门狗机制。
  2. 判断hash里面的filed是否存在,即,线程是否存在。
  3. 如果线程存在,则给hash的leaseTime重新设置30s。(这里时间轮做的事就是执行lua脚本,给锁重新设置过期时间)
  4. 然后,自旋,10s执行一次步骤2、3。
  5. 如果线程不存在,则取消定时。
源码分析
1.1 scheduleExpirationRenewal方法(类:RedissonBaseLock)
protected void scheduleExpirationRenewal(long threadId) {
	ExpirationEntry entry = new ExpirationEntry();
    //放入EXPIRATION_RENEWAL_MAP 这个ConcurrentMap中
	ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
	if (oldEntry != null) {
        //如果其他线程来抢占这个锁,将线程ID保存至ExpirationEntry的threadIds这个Map中
		oldEntry.addThreadId(threadId);
	} else {
        //将线程ID保存至ExpirationEntry的threadIds这个Map中
		entry.addThreadId(threadId);
		try {
            //执行方法 renewExpiration 进入1.2 方法
			renewExpiration();
		} finally {
			if (Thread.currentThread().isInterrupted()) {
				cancelExpirationRenewal(threadId);
			}
		}
	}
}
1.2 renewExpiration(类:RedissonBaseLock)
private void renewExpiration() {
    //从Map中拿到ExpirationEntry
	ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
	if (ee == null) {
		return;
	}
	
    //newTimeout 进入1.3方法,也就是开启时间轮,时间是10s之后执行我们的TimerTask任务。
    //有个独立的线程去执行这个task,这里也有一个异步回调
	Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        //从EXPIRATION_RENEWAL_MAP中拿到锁的对象,有可能在定时的时候被移除取消
		@Override
		public void run(Timeout timeout) throws Exception {
			ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
			if (ent == null) {
				return;
			}
            //步骤2. 判断hash里面的filed是否存在,即,线程是否存在。
            //得到加锁的线程ID
			Long threadId = ent.getFirstThreadId();
			if (threadId == null) {
				return;
			}
            
            //步骤3. 给hash的leaseTime重新设置30s
			//给锁续期 进入方法1.4,另外开启一个线程去调用lua脚本,重新设置过期时间,
			CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            //异步回调函数
			future.whenComplete((res, e) -> {
				if (e != null) {
                    //异常报错,从Map移除
					log.error("Can't update lock " + getRawName() + " expiration", e);
					EXPIRATION_RENEWAL_MAP.remove(getEntryName());
					return;
				}
				//如果返回的是1 代表线程还占有锁,递归调用自己
				if (res) {
                    //步骤4. 自旋,10s执行一次步骤2、3。
					renewExpiration();
				} else {
                    //如果该线程的锁不存在,直接取消
					cancelExpirationRenewal(null);
				}
			});
		}
	}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
	
	ee.setTimeout(task);
}
1.3 newTimeout方法(类:MasterSlaveConnectionManager)
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
	try {
        //这里timer就是HashedWheelTimer,时间轮
		return timer.newTimeout(task, delay, unit);
	} catch (IllegalStateException e) {
		if (isShuttingDown()) {
			return DUMMY_TIMEOUT;
		}
		
		throw e;
	}
}
1.4 renewExpirationAsync(类:RedissonBaseLock)
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
	return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
			"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
					"redis.call('pexpire', KEYS[1], ARGV[1]); " +
					"return 1; " +
					"end; " +
					"return 0;",
			Collections.singletonList(getRawName()),
			internalLockLeaseTime, getLockName(threadId));
}

Redisson释放锁

执行步骤

  1. 异步执行lua脚本,
    • 判断线程id(小key)是否存在,如果不存在,则返回空;如果存在,则可重入次数-1。
    • 再判断可重入次数是否 > 0,如果 > 0,则不能释放锁,并重新设置锁过期时间;如果 <= 0,则删除redis的大key,往订阅的频道发送message,发送UNLOCK_MESSAGE。
  2. 获取到需要释放锁的线程的threadId,然后移除。这里watchDog就不会继续续期。

参考资料

  • Redisson 实现分布式锁原理分析

https://zhuanlan.zhihu.com/p/135864820#:~:text=Redis%20%E5%AE%9E%E7%8E%B0%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%E4%B8%BB%E8%A6%81%E6%AD%A5%E9%AA%A4%20%E6%8C%87%E5%AE%9A%E4%B8%80%E4%B8%AA%20key%20%E4%BD%9C%E4%B8%BA%E9%94%81%E6%A0%87%E8%AE%B0%EF%BC%8C%E5%AD%98%E5%85%A5%20Redis%20%E4%B8%AD%EF%BC%8C%E6%8C%87%E5%AE%9A%E4%B8%80%E4%B8%AA,%E5%94%AF%E4%B8%80%E7%9A%84%E7%94%A8%E6%88%B7%E6%A0%87%E8%AF%86%20%E4%BD%9C%E4%B8%BA%20value%E3%80%82%20%E5%BD%93%20key%20%E4%B8%8D%E5%AD%98%E5%9C%A8%E6%97%B6%E6%89%8D%E8%83%BD%E8%AE%BE%E7%BD%AE%E5%80%BC%EF%BC%8C%E7%A1%AE%E4%BF%9D%E5%90%8C%E4%B8%80%E6%97%B6%E9%97%B4%E5%8F%AA%E6%9C%89%E4%B8%80%E4%B8%AA%E5%AE%A2%E6%88%B7%E7%AB%AF%E8%BF%9B%E7%A8%8B%E8%8E%B7%E5%BE%97%E9%94%81%EF%BC%8C%E6%BB%A1%E8%B6%B3%20%E4%BA%92%E6%96%A5%E6%80%A7%20%E7%89%B9%E6%80%A7%E3%80%82

  • springboot整合redisson(一)搭建Redisson环境

https://juejin.cn/post/6973528234046521380

  • springboot整合redisson(二)实现超强的分布式锁

https://juejin.cn/post/6978343278391328804

  • 红锁

https://www.jianshu.com/p/8d929ea3c5c6

  • CountDownLatch

https://blog.csdn.net/pcwl1206/article/details/85059522?spm=1001.2101.3001.6661.1&utm_medium=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7ECTRLIST%7ERate-1-85059522-blog-86531897.pc_relevant_multi_platform_whitelistv6&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7ECTRLIST%7ERate-1-85059522-blog-86531897.pc_relevant_multi_platform_whitelistv6&utm_relevant_index=1

  • Redisson使用手册

https://www.mianshigee.com/tutorial/redisson-wiki-zh/redisson项目介绍.md

  • Redisson实战

https://www.jianshu.com/p/47199a5b70c9

  • windows下开启redis多端口

https://www.cnblogs.com/leis/p/10188496.html#:~:text=1.配置文件 将redis.windows-service.conf复制一份,改名为相应文件,并更改配置文件中的端口为指定端口,以6380为例 port 6380 2.安装服务,redis-server --service-install --service-name redis_6380 redis.windows-service-6380.conf 3.启动服务

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐