什么是幂等?分布式锁如何实现业务幂等?

2|0什么是幂等

https://www.jianshu.com/p/2927542c6dd9

幂等:即 f(n) = 1^n ,无论n为多少,f(n)的值永远为1。

无论对某一个资源操作了多少次,其影响都应是相同的。

接口重复调用的情况下,对系统产生的影响是一样的,

一次操作产生影响后,以后多次操作不会再产生影响

以SQL为例:

select * from table where id=1 。此SQL无论执行多少次,虽然结果有可能出现不同,都不会对数据产生
改变,具备幂等性。

insert into table(id,name) values(1,'heima') 。此SQL如果id或name有唯一性约束,多次操作只允许插
入一条记录,则具备幂等性。如果不是,则不具备幂等性,多次操作会产生多条数据。

update table set score=100 where id = 1 。此SQL无论执行多少次,对数据产生的影响都是相同的。具备
幂等性。

update table set score=50+score where id = 1 。此SQL涉及到了计算,每次操作对数据都会产生影响。
不具备幂等性。

delete from table where id = 1 。此SQL多次操作,产生的结果相同,具备幂等性。

幂等性设计主要从两个维度进行考虑:空间、时间。

  • 空间:定义了幂等的范围,如生成订单的话,不允许出现重复下单。
  • 时间:定义幂等的有效期。有些业务需要永久性保证幂等,如下单、支付等。而部分业务只要保证一段时间
    幂等即可。

同时对于幂等的使用一般都会伴随着出现锁的概念,

3|0接口幂等

从前端的思路解决的话,主要有三种:前端防重、PRG模式、Token机制

2.1)前端防重
前端相关属性和JS代码即可完成设置
2.2)PRG模式
PRG模式即POST-REDIRECT-GET。当用户进行表单提交时,会重定向到另外一个提交成功页面,而不是停留在原先的表单页面。
2.3)token机制

2.3.1)方案介绍
通过token机制来保证幂等是一种非常常见的解决方案,

img

1)服务端提供获取token接口。如果当前为分布式架构,将token存放于redis中,如果是单体架构,可以保存在jvm缓存中。

2)会携带着token发起请求。

3)服务端接收到客户端请求后,首先会判断该token在redis中是否存在。

  • 如果存在,业务处理,再删除token。

  • 如果不存在,代表当前请求是重复请求,

但是现在有一个问题,当前是先执行业务再删除token。在高并发下,很有可能出现第一次访问时token存在,完成具体业务操作。但在还没有删除token时,客户端又携带token发起请求,

对于这个问题的解决方案的思想就是并行变串行。会造成一定性能损耗与吞吐量降低。

第一种方案:对于业务代码执行和删除token整体加线程锁。当后续线程再来访问时,则阻塞排队。

第二种方案:借助redis单线程和incr是原子性的特点。

  • 当第一次获取token时,以token作为key,对其进行自增。然后将token进行返回, (这里 返回1)

  • 当客户端携带token访问执行业务代码时,对于判断token是否存在 不用删除,而是对其继续incr (新增成为了2,运行出错 在自减1)。

    • 如果incr后的返回值为2。则是一个合法请求允许执行,
    • 如果是其他值,则代表是非法请求,直接返回。

Redis Incr

Redis Incr 命令将 key 中储存的数字值增一。
如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCR 操作。

  • 如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。

本操作的值限制在 64 位(bit)有符号数字表示之内。

redis 127.0.0.1:6379> INCR KEY_NAME 
redis> SET page_view 20
OK

redis> INCR page_view
(integer) 21

redis> GET page_view    # 数字值在 Redis 中以字符串的形式保存
"21"

那如果先删除token再执行业务呢?其实也会存在问题,假设具体业务代码执行超时或失败,没有向客户端返回明确结果,那客户端就很有可能会进行重试,

这种方案无需进行额外处理,一个token只能代表一次请求。一旦业务执行出现异常,则让客户端重新获取令牌,重新发起一次访问即可。推荐使用先删除token方案

每次业务请求都回产生一个额外的请求去获取token。

  • 但是,业务失败或超时,在生产环境下,一万个里最多也就十个左右会失败,那为了这十来个请求,让其他九千九百多个请求都产生额外请求,就有一些得不偿失了。
  • 虽然redis性能好,但是这也是一种资源的浪费。

4|0服务幂等

4|1防重表

对于防止数据重复提交,还有一种解决方案就是通过防重表实现。

  • 防重表的实现思路也非常简单。首先创建一张表
    作为防重表,
  • 同时在该表中建立一个 或 多个字段的唯一索引作为防重字段,
  • 用于保证并发情况下,数据只有一条。
    在向业务表中插入数据之前先向防重表插入,如果插入失败则表示是重复数据。

对于防重表的解决方案,可能有人会说为什么不使用悲观锁。悲观锁在使用的过程中也是会发生死锁的。悲观锁是
通过锁表的方式实现的。

  • 假设现在一个用户A访问表A(锁住了表A),然后试图访问表B; 另一个用户B访问表
    B(锁住了表B),然后试图访问表A。
    • 这时对于用户A来说,由于表B已经被用户B锁住了,所以用户A必须等到用
      户B释放表B才能访问。
    • 同时对于用户B来说,由于表A已经被用户A锁住了,所以用户B必须等到用户A释放表A才
      能访问。此时死锁就已经产生了。

4|2Mysql乐观锁保证幂等

MySQL乐观锁是基于数据库完成分布式锁的一种实现,

  • 实现的方式有两种:基于版本号、基于条件。
  • 但是实现思想都是基于MySQL的行锁思想来实现的。

img

通过版本号控制是一种非常常见的方式,适合于大多数场景。

  • 但现在库存扣减的场景来说,通过版本号控制就是多
    人并发访问购买时,
  • 查询时显示可以购买,但最终只有一个人能成功,这也是不可以的。
  • 其实最终只要商品库存不发生超卖就可以。
  • 那此时就可以通过条件来进行控制。

mysql乐观锁更适用于一些需要计数的表上,而且在竞争不激烈,出现并发冲突几率较小时,推荐使用乐观锁。

  • 虽然通过MySQL乐观锁可以完成并发控制,但锁的操作是直接作用于数据库上,这样就会在一定程度上对数据库性能产生影响。
  • 并且mysql的连接数量是有限的,如果出现大量锁操作占用连接时,也会造成MySQL的性能瓶颈。

4|3zookeeper分布式锁

zk我还没学,请看原文吧。

4|4Redis分布式锁

原理&实现

分布式锁的一个很重要的特性就是互斥性,同一时间内多个调用方加锁竞争,只能有一个调用方加锁成功。

  • 而redis是基于单线程模型的,可以利用这个特性让调用方的请求排队,对于并发请求,只会有一个请求能获取到锁。

redis实现分布式锁也很简单,基于客户端的几个API就可以完成,主要涉及三个核心API:

setNx():向redis中存key-value,只有当key不存在时才会设置成功,否则返回0。用于体现互斥性。
expire():设置key的过期时间,用于避免死锁出现。
delete():删除key,用于释放锁。

1)编写工具类实现加锁

  • 通过jedis.set进行加锁,如果返回值是OK,代表加锁成功
  • 如果加锁失败,则自旋不断尝试获取锁,同时在一定时间内如果仍没有获取到锁,则退出自旋,不再尝试获取锁。
  • requestId:用于标识当前每个线程自己持有的锁标记
PX millisecond :设置键的过期时间为 millisecond 毫秒。 
SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。
从 Redis 2.6.12 版本开始,
		<dependency>
			<groupId>redis.clients</groupId>
			<artifactId>jedis</artifactId>
			<version>3.3.0</version>
		</dependency>
public class SingleRedisLock {
	//地址
    JedisPool jedisPool = new JedisPool("192.168.200.128",6379);

    //锁过期时间,30秒
    protected long internalLockLeaseTime = 30000;

    //获取锁的超时时间。1千秒 。16.6分钟
    private long timeout = 999999;

    /**
     * 加锁。过期时间为30秒。nx是不存在的时,才设置成功。
     * @param lockKey 锁键
     * @param requestId 请求唯一标识
     * @return
     */
    SetParams setParams = SetParams.setParams().nx().px(internalLockLeaseTime);

    public boolean tryLock(String lockKey, String requestId){
		//获取到 线程的 name
        String threadName = Thread.currentThread().getName();
		//获取 jedis 连接
        Jedis jedis = this.jedisPool.getResource();
		//当前的 时间戳
        Long start = System.currentTimeMillis();

        try{
            //死循环
            for (;;){
                //设置 lockkey 请求的ID ,参数
                String lockResult = jedis.set(lockKey, requestId, setParams);
                if ("OK".equals(lockResult)){
                    System.out.println(threadName+":   获取锁成功");
                    return true;
                }
                //否则循环等待,在timeout时间内仍未获取到锁,则获取失败
                System.out.println(threadName+":   获取锁失败,等待中");
                //当前的时间 - 开始的时间
                long l = System.currentTimeMillis() ‐ start;
                //如果 已经超过了 16.6 分钟,返回 false
                if (l>=timeout) {
                    return false;
                }
                //睡 100 毫秒
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }finally {
            //关连接
            jedis.close();
        }

    }
}

解锁时,要避免当前线程将别人的锁释放掉。

  • 假设线程A加锁成功,当过了一段时间线程A来解锁,但线程A的锁已
    经过期了,
  • 在这个时间节点,线程B也来加锁,因为线程A的锁已经过期,所以线程B时可以加锁成功的。
  • 此时,就会出现问题,线程A将线程B的锁给释放了。

对于这个问题,就需要使用到加锁时的requestId。

  • 当解锁时要判断 当前锁键的value与 传入的value是否相同,相
    同的话,则代表是同一个人,可以解锁。否则不能解锁。

但是对于这个操作,有非常多的人,会先查询做对比,接着相同则删除。

  • 忽略了一个问题,
    原子性。
  • 判断与删除分成两步执行,则无法保证原子性,一样会出现问题。
  • 所以解锁时不仅要保证加锁和解锁是同
    一个人还要保证解锁的原子性。因此结合lua脚本完成查询&删除操作。
/**
     * 解锁
     * @param lockKey 锁键
     * @param requestId 请求唯一标识
     * @return
     */
public boolean releaseLock(String lockKey,String requestId){
	//线程名
    String threadName = Thread.currentThread().getName();
    System.out.println(threadName+":释放锁");
    //得到连接
    Jedis jedis = this.jedisPool.getResource();

    //如果 redis == argv这个值(值和放入的一样的时候),就删除这个 值。否则 返回0
    String lua =
        "if redis.call('get',KEYS[1]) == ARGV[1] then" +
        "   return redis.call('del',KEYS[1]) " +
        "else" +
        "   return 0 " +
        "end";

    try {
        //转成集合。 值和 放入的一样,requestId 。
        Object result = jedis.eval(lua, Collections.singletonList(lockKey),
                                   Collections.singletonList(requestId));
        if("1".equals(result.toString())){
            return true;
        }
        return false;
    }finally {
        jedis.close();
    }

}

测试类

public class LoclTest {

    public static void main(String[] args) {

        //模拟多个5个客户端
        for (int i=0;i<5;i++) {
            //创建线程,并开启
            Thread thread = new Thread(new LockRunnable());
            thread.start();
        }
    }
	//实现 runnable 方法,开启线程
    private static class LockRunnable implements Runnable {
        @Override
        public void run() {
			//创建锁
            SingleRedisLock singleRedisLock = new SingleRedisLock();
			//获取 ID
            String requestId = UUID.randomUUID().toString();
            //进行锁定
            boolean lockResult = singleRedisLock.tryLock("lock", requestId);
            //如果锁定成功
            if (lockResult){

                try {
                    //睡5秒
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
			//释放锁
            singleRedisLock.releaseLock("lock",requestId);
        }
    }
}

此时可以发现,多线程会竞争同一把锁,且没有获取获取到锁的线程会自旋不断尝试去获取锁。每当一个线程将锁
释放后,则会有另外一个线程持有锁。依次类推。

Thread-0:   获取锁失败,等待中
Thread-1:   获取锁成功

Thread-0:   获取锁失败,等待中
Thread-0:   获取锁失败,等待中
Thread-0:   获取锁失败,等待中
Thread-0:   获取锁失败,等待中

Thread-1:释放锁

Thread-0:   获取锁成功
Thread-0:释放锁

存在的问题

锁续期

当对业务进行加锁时,锁的过期时间,

  • 假设线程A在执行某个业务时加锁成功
    并设置锁过期时间。但该业务执行时间过长,业务的执行时间超过了锁过期时间,那么在业务还没执行完
    时,锁就自动释放了。
  • 接着后续线程就可以获取到锁,又来执行该业务。
  • 因此对于锁的超时时间,让锁的过期时间大于业务执行时间。

业务执行时间的影响因素太多了,无法确定一个准确值,只能是一个估值。

无法百分百保证业务执行期间,锁只能被一个线程占有。

如想保证的话,可以在创建锁的同时创建一个守护线程,同时定义一个定时任务每隔一段时间去为未释放的锁增加过期时间。当业务执行完,释放锁后,再关闭守护线程。 这种实现思想可以用来解决锁续期。

服务单点&集群问题

一旦redis服务节点挂掉了,则无法提供锁操作。
为了保证redis高可用,会采用异步复制方法进行主从部署。

  • 当主节点写入数据成功,会异步的将
    数据复制给从节点,
  • 并且当主节点宕机,从节点会被提升为主节点继续工作。
  • 假设主节点写入数据成功,在没有将
    数据复制给从节点时,主节点宕机。
    • 则会造成提升为主节点的从节点中是没有锁信息的,其他线程则又可以继续加锁,导致互斥失效。

4|*5Redisson分布式锁

redisson是redis官网推荐实现分布式锁的一个第三方类库。其内部完成的功能非常强大,对各种锁都有实现,

此处重点利用Redisson解决单机锁产生的两个问题。

单机Redisson实现

依赖

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons‐pool2</artifactId>
</dependency>
<!‐‐Redis分布式锁‐‐>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson‐spring‐boot‐starter</artifactId>
    <version>3.13.1</version>
</dependency>

//上面的 maven仓库找不到,用下面的这个试试
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.11.1</version>
        </dependency>

配置文件

server:
  redis:
    host: 192.168.200.150
    port: 6379
    database: 0
    jedis:
      pool:
        max‐active: 500 #最大的 活跃池子
        max‐idle: 1000 #最大的闲置
        min‐idle: 4 #最小的闲置

spring:
  redis:
    host: 192.168.44.146
    port: 6379
idle 
英 /ˈaɪd(ə)l/  美 /ˈaɪd(ə)l/  全球(美国)  
简明 牛津 新牛津  韦氏  柯林斯 例句  百科
adj. 无事可做的;懒惰的;闲置的;空闲的;琐碎无聊的,毫无意义的;虚张声势的,唬人的;(钱)现金保存的,存于无息账户的
v. 闲混,无所事事;使……停止工作,使闲散;(发动机、车辆)空转

启动类

    //读取地址
    @Value("${spring.redis.host}")
    private String host;

    //读取端口
    @Value("${spring.redis.port}")
    private String port;

    //配置客户端
    @Bean
    public RedissonClient redissonClient(){
        //先定义
        RedissonClient redissonClient;
        //创建 配置类
        Config config = new Config();
        //url 为 host + port
        String url = "redis://" + host + ":" + port;
        //单 server,设置上 地址
        config.useSingleServer().setAddress(url);

        try {
            //进行创建
            redissonClient = Redisson.create(config);
            //然后 返回
            return redissonClient;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

可惜了,报错:
Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module @708f5957

锁工具

@Component
public class RedissonLock {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 加锁
     *
     * @param lockKey
     * @return
     */
    public boolean addLock(String lockKey) {

        try {
            //如果 客户端为 null,直接结束
            if (redissonClient == null) {
                System.out.println("redisson client is null");
                return false;
            }
            //获得 锁
            RLock lock = redissonClient.getLock(lockKey);

            //设置锁超时时间为5秒,到期自动释放
            lock.lock(5, TimeUnit.SECONDS);
            //打印 获得到锁
            System.out.println(Thread.currentThread().getName() + ":  获取到锁");

            //加锁成功
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    //释放锁
    public boolean releaseLock(String lockKey) {

        try {
            //同样判断 客户端是否为 null
            if (redissonClient == null) {
                System.out.println("redisson client is null");
                return false;
            }
            //得到锁
            RLock lock = redissonClient.getLock(lockKey);
            //释放锁,并打印
            lock.unlock();
            System.out.println(Thread.currentThread().getName() + ":  释放锁");
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
}

测试类

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-test</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
        </dependency>  哈哈哈,最终的结果 是不行
@SpringBootTest
@RunWith(SpringRunner.class)
public class RedissonLockTest {

    @Autowired //注入
    private RedissonLock redissonLock;

    @Test
    public void easyLock(){
        //模拟多个10个客户端
        for (int i=0;i<10;i++) {
            Thread thread = new Thread(new LockRunnable());
            thread.start();
        }

        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private class LockRunnable implements Runnable {
        @Override
        public void run() {
            //添加锁
            redissonLock.addLock("demo");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //释放锁
            redissonLock.releaseLock("demo");
        }
    }
}

根据执行效果可知,多线程并发获取所时,当一个线程获取到锁,其他线程则获取不到,并且其内部会不断尝试获
取锁,当持有锁的线程将锁释放后,其他线程则会继续去竞争锁。

源码分析

lock()源码分析

当获取到RLock对象后,调用其内部的lock()执行加锁操作。

  • 根据源码描述,当线程获取锁时,如果没有获取到
    锁,则会让其进入自旋,直到获取到锁。
  • 如果获取到锁,则会一直保留到调用unLock()手动释放或根据传入的
    leaseTime时间自动释放。
  • 当前传入两个参数值:锁超时时间,时间单位。主要用于避免死锁的出现,假设持有锁的redis节点宕机,到期后锁可以自动释放。

lock()方法中还会调用lock()的另外一个重载方法,需要传入三个参数:过期时间、时间单位、是否中断。

在三个参数的lock()重载方法中,

  • 首先会获取当前线程id,接着调用tryAcquire()方法尝试获取锁,

  • 如果返回值为
    null,代表获取到锁。

  • 如果返回值不是null,则根据当前线程id创建异步任务并放入线程池中,接着进入自旋,在
    自旋过程中,尝试调用tryAcquire()获取锁,如果获取到则退出自旋。否则会不断的尝试获取锁。

  • 还是看 源文档吧

锁续期

对于锁续期问题,用于防止业务执行超时或宕机而引起的业务被重复
执行。

根据对lock方法的解析,可以发现,当设置完过期时间后,当前锁的过期时间就已经被设定了,不会发生改变,锁
到期后则会被自动释放,因此在业务执行中,通过lock()方法加锁会造成隐患。

红锁

当在单点redis中实现redis锁时,一旦redis服务器宕机,则无法进行锁操作。

  • 因此会考虑将redis配置为主从结
    构,但在主从结构中,数据复制是异步实现的。
  • 假设在主从结构中,master会异步将数据复制到slave中,一旦某
    个线程持有了锁,在还没有将数据复制到slave时,master宕机。
  • 则slave会被提升为master,但被提升为slave的
    master中并没有之前线程的锁信息,那么其他线程则又可以重新加锁

redlock算法

redlock是一种基于多节点redis实现分布式锁的算法,可以有效解决redis单点故障的问题。官方建议搭建五台
redis服务器对redlock算法进行实现。
在redis官网中,对于redlock算法的实现思想也做了详细的介绍。地址:https://redis.io/topics/distlock

整个实现过程分为五步:

1)记录获取锁前的当前时间
2)使用相同的key,value获取所有redis实例中的锁,并且设置获取锁的时间要远远小于锁自动释放的时间。假设
锁自动释放时间是10秒,则获取时间应在5-50毫秒之间。通过这种方式避免客户端长时间等待一个已经关闭的实
例,如果一个实例不可用了,则尝试获取下一个实例。
3)客户端通过获取所有实例的锁后的时间减去第一步的时间,得到的差值要小于锁自动释放时间,避免拿到一个
已经过期的锁。

  • 并且要有超过半数的redis实例成功获取到锁,才算最终获取锁成功。如果不是超过半数,有可能
    出现多个客户端重复获取到锁,导致锁失效。

4)当已经获取到锁,那么它的真正失效时间应该为:过期时间-第三步的差值。
5)如果客户端获取锁失败,则在所有redis实例中释放掉锁。为了保证更高效的获取锁,还可以设置重试策略,在
一定时间后重新尝试获取锁,但不能是无休止的,要设置重试次数。

虽然通过redlock能够更加有效的防止redis单点问题,但是仍然是存在隐患的。假设redis没有开启持久化,
clientA获取锁后,所有redis故障重启,则会导致clientA锁记录消失,clientB仍然能够获取到锁。这种情况虽然发生几率极低,但并不能保证肯定不会发生。

保证的方案就是开始AOF持久化,但是要注意同步的策略,使用每秒同步,如果在一秒内重启,仍然数据丢失。使用always又会造成性能急剧下降。

官方推荐使用默认的AOF策略即每秒同步,且在redis停掉后,要在ttl时间后再重启。 缺点就是ttl时间内redis无法
对外提供服务。

实现

redisson对于红锁的实现已经非常完善,通过其内部提供的api既可以完成红锁的操作。

@Configuration
public class RedissonRedLockConfig {

    public RedissonRedLock initRedissonClient(String lockKey){

        Config config1 = new Config();
        config1.useSingleServer().setAddress("redis://192.168.200.150:7000").setDatabase(0);
        RedissonClient redissonClient1 = Redisson.create(config1);

        Config config2 = new Config();
        config2.useSingleServer().setAddress("redis://192.168.200.150:7001").setDatabase(0);
        RedissonClient redissonClient2 = Redisson.create(config2);

        Config config3 = new Config();
        config3.useSingleServer().setAddress("redis://192.168.200.150:7002").setDatabase(0);
        RedissonClient redissonClient3 = Redisson.create(config3);

        Config config4 = new Config();
        config4.useSingleServer().setAddress("redis://192.168.200.150:7003").setDatabase(0);
        RedissonClient redissonClient4 = Redisson.create(config4);

        Config config5 = new Config();
        config5.useSingleServer().setAddress("redis://192.168.200.150:7004").setDatabase(0);
        RedissonClient redissonClient5 = Redisson.create(config5);

        RLock rLock1 = redissonClient1.getLock(lockKey);
        RLock rLock2 = redissonClient2.getLock(lockKey);
        RLock rLock3 = redissonClient3.getLock(lockKey);
        RLock rLock4 = redissonClient4.getLock(lockKey);
        RLock rLock5 = redissonClient5.getLock(lockKey);

        RedissonRedLock redissonRedLock = new 
RedissonRedLock(rLock1,rLock2,rLock3,rLock4,rLock5);

        return redissonRedLock;
    }
}

测试类

@SpringBootTest
@RunWith(SpringRunner.class)
public class RedLockTest {

    @Autowired
    private RedissonRedLockConfig redissonRedLockConfig;

    @Test
    public void easyLock(){
        //模拟多个10个客户端
        for (int i=0;i<10;i++) {
            Thread thread = new Thread(new RedLockTest.RedLockRunnable());
            thread.start();
        }

        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private class RedLockRunnable implements Runnable {
        @Override
        public void run() {
            //初始化 客户端
            RedissonRedLock redissonRedLock = redissonRedLockConfig.initRedissonClient("demo");

            try {
                //尝试 锁
                boolean lockResult = redissonRedLock.tryLock(100, 10, TimeUnit.SECONDS);

                if (lockResult){
                    System.out.println("获取锁成功");
                    TimeUnit.SECONDS.sleep(3);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                redissonRedLock.unlock();
                System.out.println("释放锁");
            }
        }
    }
}

redissonRedLock加锁源码分析

// 等待时间 过期时间
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException 
{
    long newLeaseTime =1;
    if (leaseTime !=1) {
        newLeaseTime = unit.toMillis(waitTime)*2;
    }

    long time = System.currentTimeMillis();
    long remainTime =1;
    if (waitTime !=1) {
        remainTime = unit.toMillis(waitTime);
    }
    long lockWaitTime = calcLockWaitTime(remainTime);
    /**
     * 1\. 允许加锁失败节点个数限制(N‐(N/2+1)),当前假设五个节点,则允许失败节点数为2
    */
    int failedLocksLimit = failedLocksLimit();
    /**
     * 2\. 遍历所有节点执行lua加锁,用于保证原子性
    */
    List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        /**
         *  3.对节点尝试加锁
        */
        try {
            if (waitTime ==1 && leaseTime ==1) {
                lockAcquired = lock.tryLock();
            } else {
                long awaitTime = Math.min(lockWaitTime, remainTime);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException e) {
            // 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            // 抛出异常表示获取锁失败
            lockAcquired = false;
        }

        if (lockAcquired) {
            /**
             *4\. 如果获取到锁则添加到已获取锁集合中
            */
            acquiredLocks.add(lock);
        } else {
            /**
             * 5\. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N‐(N/2+1))
             * 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了
             * 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功
4)消息幂等
             */
            if (locks.size() ‐ acquiredLocks.size() == failedLocksLimit()) {
                break;
            }

            if (failedLocksLimit == 0) {
                unlockInner(acquiredLocks);
                if (waitTime ==1 && leaseTime ==1) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                failedLocksLimit‐‐;
            }
        }

        /**
        * 6.计算从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则申请锁失败,返回false
        */
        if (remainTime !=1) {
            remainTime ‐= System.currentTimeMillis() ‐ time;
            time = System.currentTimeMillis();
            if (remainTime <= 0) {
                unlockInner(acquiredLocks);
                return false;
            }
        }
    }

    if (leaseTime !=1) {
        List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
        for (RLock rLock : acquiredLocks) {
            RFuture<Boolean> future = ((RedissonLock) 
rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }

        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }

    /**
     * 7.如果逻辑正常执行完则认为最终申请锁成功,返回true
    */
    return true;
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐