目录

1.分布式锁

2.springboot集成redis

3.使用setnx命令实现分布式锁

4.使用Redission实现分布式锁

5.redission分布式锁的类型


1.分布式锁

分布式锁,即分布式系统中的锁。随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力在单体应用中我们通过锁解决的是控制共享资源访问的问题,而分布式锁,就是解决了分布式系统中控制共享资源访问的问题。

下面主要介绍springboot集成redis实现分布式锁。

需要注意的是,分布式锁可以保证数据的一致性,但同时访问的速度也会受到影响。

2.springboot集成redis

在springboot项目中引入redis相关依赖:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.6.0</version>
        </dependency>

 编写application.yml文件:

spring:
    redis:
      host: 127.0.0.1   #服务器地址
      port: 6379    #端口号
      database: 0   #数据库索引(默认为0)
      timeout: 180000   #连接超时时间
      lettuce:
        pool:
          max-active: 20    #最大连接数
          max-wait: -1    #最大阻塞等待时间,-1即无限制
          max-idle: 8   #最大空闲连接数
          min-idle: 0   #最小空闲连接数

此处使用的是lettuce客户端而不是jedis客户端。Lettuce是基于Netty框架的事件驱动的Redis客户端,其方法调用是异步的,Lettuce的API也是线程安全的,所以多个线程可以操作单个Lettuce连接来完成各种操作,同时Lettuce也支持连接池。

编写redis配置类,实现序列化:

package com.seven.redis.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@EnableCaching
@Configuration
public class RedisConfig {

  @Bean
  @SuppressWarnings("all")
  public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
      RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
      template.setConnectionFactory(factory);
      Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
      ObjectMapper om = new ObjectMapper();
      om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
      om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);     //目前已弃用
      jackson2JsonRedisSerializer.setObjectMapper(om);
      StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

      // key采用String的序列化方式
      template.setKeySerializer(stringRedisSerializer);
      // hash的key也采用String的序列化方式
      template.setHashKeySerializer(stringRedisSerializer);
      // value序列化方式采用jackson
      template.setValueSerializer(jackson2JsonRedisSerializer);
      // hash的value序列化方式采用jackson
      template.setHashValueSerializer(jackson2JsonRedisSerializer);
      template.afterPropertiesSet();

      return template;
  }


}

 然后,我们可以通过直接导入RedisTemplate或来使用redis,

    @Resource
    private RedisTemplate redisTemplate;

或是自定义一个redisUtil工具类,重写RedisTemplate里的部分方法:

package com.seven.redis.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Component
public final class RedisUtil {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // =============================common============================
    /**
     * 指定缓存失效时间
     * @param key  键
     * @param time 时间(秒)
     */
    public boolean expire(String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据key 获取过期时间
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效
     */
    public long getExpire(String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }


    /**
     * 判断key是否存在
     * @param key 键
     * @return true 存在 false不存在
     */
    public boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 删除缓存
     * @param key 可以传一个值 或多个
     */
    @SuppressWarnings("unchecked")
    public void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete(CollectionUtils.arrayToList(key));
            }
        }
    }

    /**
     * set nx,上锁
     * @param key 一般设为lock
     *@param value 一般使用uuid
     *@param time 缓存时间,单位为s
     */
    public boolean setNx(String key, String value, int time){
        return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, value, time, TimeUnit.SECONDS));
    }
    //未指定过期时间
    public boolean setNx(String key, String value){
        return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, value));
    }

    // ============================String=============================

    /**
     * 普通缓存获取
     * @param key 键
     * @return 值
     */
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通缓存放入
     * @param key   键
     * @param value 值
     * @return true成功 false失败
     */

    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 普通缓存放入并设置时间
     * @param key   键
     * @param value 值
     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */

    public boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 递增
     * @param key   键
     * @param delta 要增加几(大于0)
     */
    public long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }


    /**
     * 递减
     * @param key   键
     * @param delta 要减少几(小于0)
     */
    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }



    // ============================set=============================

    /**
     * 根据key获取Set中的所有值
     * @param key 键
     */
    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 根据value从一个set中查询,是否存在
     *
     * @param key   键
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 将数据放入set缓存
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 将set数据放入缓存
     *
     * @param key    键
     * @param time   时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0)
                expire(key, time);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 获取set缓存的长度
     *
     * @param key 键
     */
    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 移除值为value的
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 移除的个数
     */

    public long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


}

因本次实现分布式锁主要只使用String数据类型,固只实现了String数据类型的代码。

3.使用setnx命令实现分布式锁

在Redis中我们通常可以使用redis命令(setnx)实现分布式锁。

setnx key value 命令可以给key上锁,而解锁一般可以通过两种方法:

  • 通过命令 del key 删除key
  • 通过 set key value nx ex time 设置key的过期时间

对应RedisUtil工具类中的以下代码:

    /**
     * set nx,上锁
     * @param key 一般设为lock
     *@param value 一般使用uuid
     *@param time 缓存时间,单位为s
     */
    public boolean setNx(String key, String value, int time){
        return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, value, time, TimeUnit.SECONDS));
    }
    //未指定过期时间
    public boolean setNx(String key, String value){
        return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, value));
    }

在controller中编写模拟代码,代码逻辑如下:

  • 设定锁lock,设置成功则对数据库、redis缓存等相关数据进行操作(下述代码中对redis中缓存的key:num进行+1操作)。锁期间,其他client无法对其进行操作。操作完成后,删除锁,其他客户端即可进行操作。
  • 锁失败,对其0.1秒进行重试,重新进行上锁操作。
package com.seven.redis.controller;

import com.seven.redis.utils.RedisUtil;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.UUID;

@RestController
public class RedisController {

    @Resource
    private RedisUtil redisUtil;

    @GetMapping("/test")
    public String test(){
        //配置锁,设置随机uuid进行验证防止误删
        String uuid = UUID.randomUUID().toString();
        //设置过期时间为10s
        boolean lock = redisUtil.setNx("lock",uuid,10);
        if(lock){
            //若已经上锁
            Object value =redisUtil.get("num");
            //2.1判断num为空return
            if(StringUtils.isEmpty(value)){
                return "key is null";
            }
            //2.2有值就转成成int
            int num = Integer.parseInt(value+"");
            //2.3把redis的num加1
            redisUtil.set("num", ++num);
            //2.4释放锁,del,保证锁必须被释放-->当业务执行时间小与过期时间时需要释放锁
            if(uuid.equals((String)redisUtil.get("lock"))){
                redisUtil.del("lock");
                return "success";
            }else {
                return "fail";
            }
        }else {
            //上锁失败
            try {
                Thread.sleep(100);
                test();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return "done";
    }
}

 上述代码中,为防止误删(即客户端a在进行操作,服务器发生卡顿,达到了key设定的过期时间,解开了锁,客户端b开始进行操作;然后在b进行操作期间,a卡顿结束,继续删锁操作,会导致误删了b的锁),设置了uuid值进行验证:

 if(uuid.equals((String)redisUtil.get("lock"))){
       redisUtil.del("lock");
       return "success";
  }

uuid一致,才可删除锁,否则,无法删除。

注意:此处删除操作缺乏原子性,可以通过lua脚本加强分布式锁的安全性。可参考以下代码,此处不进行详细叙述:

 /*使用lua脚本解锁*/
        // 定义lua 脚本
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        // 使用redis执行lua执行
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptText(script);
        // 设置一下返回值类型 为Long
        // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
        // 那么返回字符串与0 会有发生错误。
        redisScript.setResultType(Long.class);
        // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
        redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);

4.使用Redission实现分布式锁

实现Redis的分布式锁,除了自己基于redis client原生api来实现之外,还可以使用开源框架:Redission。

使用redission只需要通过他的api中的lock和unlock即可完成分布式锁,对比于setnx,他的优势在于:

  • redisson所有指令都通过lua脚本执行,redis支持lua脚本原子性执行
  • redisson设置一个key的默认过期时间为30s,redisson中有一个watchdog看门狗的概念,它会在你获取锁之后,每隔30s/3 的时间就会执行一次定时任务,帮你把key的超时时间设为30s进行续期,知道任务执行完毕

下面对springboot使用Redission进行一次演示:

导入相关依赖:

        <!--redission相关依赖-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.16.0</version>
        </dependency>

编写Redission设置类:

package com.seven.redis.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private String port;

    @Bean
    public RedissonClient getRedisson(){

        Config config = new Config();
        //单机模式  依次设置redis地址和密码
        config.useSingleServer().
                setAddress("redis://" + host + ":" + port);
        return Redisson.create(config);
    }
}

Redission还支持多种连接模式,以下仅作参考:

//主从
Config config = new Config();
config.useMasterSlaveServers()
    .setMasterAddress("127.0.0.1:6379")
    .addSlaveAddress("127.0.0.1:6389", "127.0.0.1:6332", "127.0.0.1:6419")
    .addSlaveAddress("127.0.0.1:6399");
RedissonClient redisson = Redisson.create(config);
 
 
//哨兵
Config config = new Config();
config.useSentinelServers()
    .setMasterName("mymaster")
    .addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379")
    .addSentinelAddress("127.0.0.1:26319");
RedissonClient redisson = Redisson.create(config);
 
 
//集群
Config config = new Config();
config.useClusterServers()
    .setScanInterval(2000) // cluster state scan interval in milliseconds
    .addNodeAddress("127.0.0.1:7000", "127.0.0.1:7001")
    .addNodeAddress("127.0.0.1:7002");
RedissonClient redisson = Redisson.create(config);

 然后我们就可以通过导入Redission使用其分布式锁:

    @Resource
    private RedissonClient redisson;

下面在controller中进行一次库存扣减使用分布式锁的演示:

    @PostMapping("/lock/test")
    public void test() {

        String lockKey = "test_lock";
        RLock lock = redisson.getLock(lockKey);       //获取锁 
        try {
            lock.lock();    //上锁
            log.info("锁已开启");
            synchronized (this){
                if(redisUtil.get("product")==null){
                    log.error("商品不存在!");
                }else{
                    //获取当前库存
                    int stock = Integer.parseInt(redisUtil.get("product").toString()); 
                    if (stock > 0){
                        int realStock = stock - 1;
                        //更新库存
                        redisUtil.set("product", realStock + "");
                        log.info("库存当前为:" + realStock);
                    }else {
                        log.warn("扣减失败,库存不足!");
                    }
                }
            }
        }catch (Exception e){
            log.warn("系统错误,稍后重试");
        }
        finally {
            lock.unlock();    //删除锁
            log.info("锁已关闭");
        }
    }

此处还使用了 synchronized 对线程加锁,若只是启用redission的分布式锁,可不使用。

其运行过程和java多线程下的锁类似,其运行逻辑如下:

注意:锁的范围不易过大,在业务过程中应避免死锁的发生。

5.redission分布式锁的类型

此处注意的是redission分布式锁分为很多种,上文使用的是抢占式的分布式锁。即当锁释放后,其他请求会再次对锁进行抢占,而不是根据请求先后顺序进行。

如果需要公平的分配锁,即按照请求的先后顺序分配锁,可以使用公平锁:

RLock lock = redisson.getFairLock("myLock");

锁的使用方式和抢占式锁相同。

根据业务的需要,还可以使用读写锁:

//读写锁
RReadWriteLock lock = redisson.getReadWriteLock("myLock");


//写锁
lock.writeLock();

//读锁
lock.readLock();

注意,lock.readLock() 和 lock.writeLock() 两个锁用于两个不同的方法中,对应于lock.lock()方法。

读写锁可以在写方法未完成时,保证读方法无法进行;或是两个写方法进行时,保存先后顺序,保证数据的一致性。

只有当两个读方法时,才会不发生冲突。

更多的锁的使用,可以参考redission官网,进行选择:Redisson: Redis Java client with features of In-Memory Data Grid

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐