分布式


分布式综合中间件Redisson – 多连接方式、集合数据分片、分布式对象、分布式集合、分布式锁、分布式服务等多种功能


不管是基于Redis、还是Zookeeper实现分布式锁,为了保证不死锁,需要释放锁,finally中释放锁时需要特别注意释放当前线程自己的锁, 也就是释放之前加上判断,不然将其他线程的锁释放了造成不安全的情况

基于MinIO可以实现分布式对象存储,基于Redis可以实现分布式缓存,将应用的Session使用Redis管理、同时对于不经常变动的,高热点的数据使用redis进行缓存; 基于rabbitMQ可以进行异步通信、解耦、延迟通信、接口限流(令牌桶 Guava的RateLimiter也可), 基于Zookeeper可以实现分布式服务的协调、分布式锁等,基于Dubbo可以实现… 除此之外,还有综合的分布式中间件 Redisson

【声明一下,文章都是写完就提交,可能出现笔误,望理解】

Redisson是架设在Redis基础上实现的Java驻内存数据网格的综合中间件 , 提供的功能远比redis多,其内置了一系列的分布式对象、分布式集合、分布式锁、分布式服务等功能特性,是优秀的缓存中间件

传统方式的相关缺陷

需求场景和传统工具的不足就会让很多工具有了诞生的前提,目前的各种全家桶、各种分布式服务中间件都是为了更好的给出解决方案,Redisson的诞生也是如此,其可以作为一些典型方案的缺陷作为替补

RabbitMQ 死信队列严格FIFO — 不适合TTL不同场景

rabbitMQ是严格FIFO的,也就是按照队列的顺序先进先出,当然一般情况下性能都很优秀,但是对于一些特殊的场景可能出现一些问题: 延迟队列中的消息TTL不同

这个时候rabbitMQ给出的结果还是按照入队的顺序,并不会根据TTL,这显然是不正确的,为了解决这种缺陷,可以考虑使用Redisson的延迟队列,下面会介绍

Redis的SETNX分布式锁 TTL不好把控

redis实现SETNX分布式锁的方式在一定程度上可以解决并发问题,但是很多时候也存在问题,其中最主要的问题就是lock的ttl设置的时长问题,如果时间设置过短,业务还没有执行完就释放了锁,导致下一个线程又获取到锁进入,当前线程还可能将后面线程的锁🔒给释放了

如果时间设置过长,对效率有影响,还可能死锁, 这个时候就可以使用redisson来进行控制,redisson的看门狗可以很好解决问题

但是主从复制的问题,不管是原生的redis,还是redisson都不能避免(选择master加锁,主从复制之后,master挂掉,重新产生master; 另外一个线程在新的master加锁,之前的线程仍然觉得加锁成功,导致脏数据】 — 只能尽量保证不挂

Redisson intro

Redisson , Redis + son,虽然不是redis的son,但是和redis关系密切,充分利用Redis的Key- Value 数据结构 和基于内存数据的优势,同时具备多机多线程并发系统处理的优势

Redisson底层数据结构的设计采用动态类方式,类似java对应的数据结构,设计者类比java的List、Map、Set,让Redssion的操作更方便

Redis的部署可以单一节点模式、集群模式、主从模式、哨兵模式,业务应用在使用Redssion的架构时都是像ZooKeeper以节点的方式进行服务的管理

redis节点1                          redis节点2						Redis节点3
 RNode								RNode						    RNode
  |______________________________________|_________________________________|
  									  |
  							    redis服务(单一、集群、主从、哨兵部署)
  							          |
  							          |
  |--------------------------------------|-----------------------------------|
JVM业务微服务1                         JVM业务微服务2					JVM业务微服务3
  redisson中间件						redisson中间件					redisson中间件

Redisson底层采用基于NIO (Non-Blocking IO 非阻塞方式的输入输出)的Netty框架实现数据传输,具有高效的传输性能【NIO和BIO都是网络通信模型,BIO面向流、阻塞、没有IO多路复用; NIO面向缓冲区、非阻塞、IO多路复用 ------- BIO适用连接数目少,一次性发送大量数据的; NIO适用大量的长时间连接,不会频繁太多数据】同时可以实现以Redis命令同步、异步、异步流、管道流发送消息; Redisson覆盖Redis的所有的功能

redis提供了很多的功能特性,包括多种连接方式、数据序列化、集合数据分片、分布式对象、分布式集合、分布式锁、分布式服务

要使用Redisson驻内存数据网格中间件,首先需要引入依赖,直接在pom中添加

 <!-- redisson整合-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.17.5</version>
        </dependency>

需要注意版本的适配问题,Cfeng的springBoot版本为2.7.X,所以就直接使用的redisson-spring-data-27, 也就是3.17.5中包含的

接下来虽然已经自动配置了,【因为Redisson是基于Redis来提供服务的,所以Redis该有的配置还是都要配置,参见cfeng之前的redis配置】,同时redisson支持两种配置方式

  • 完全兼容redis和redisson,全部都在application.yml中配置
  • redisson单独配置,在总yml中通过spring.redis.redisson.config = classpath:redisson.yml指定
//单一节点配置
singleServerConfig:
//连接空闲时间超时 ms
  idleConnectionTimeout: 10000
  //连接超时ms
  connectTimeout: 10000
  //命令等待超时
  timeout: 3000
  //命令失败重试次数
  retryAttempts: 3
  //重试时间间隔
  retryInterval: 1500
  //单个连接最大订阅数量
  subscriptionsPerConnection: 5
  //启动SSL终端识别
  sslEnableEndpointIdentification: true
  //ssl提供方式
  sslProvider: "JDK"
  //ping模式连接时间间隔
  pingConnectionInterval: 30000
  //是否保持连接活跃状态
  keepAlive: true
  //tcp无延迟
  tcpNoDelay: true
  nameMapper: !<org.redisson.api.DefaultNameMapper> {}
  //连接地址,Redis服务地址,使用的是redis的协议
  address: "redis://127.0.0.1:6379"
  //从节点发布和订阅连接的最小空闲数
  subscriptionConnectionMinimumIdleSize: 1
  //从节点发布订阅的连接池大小
  subscriptionConnectionPoolSize: 50
  connectionMinimumIdleSize: 24
  connectionPoolSize: 64
  database: 0
  //dns监控时间间隔
  dnsMonitoringInterval: 5000
threads: 16
//netty的线程数
nettyThreads: 32
codec: !<org.redisson.codec.MarshallingCodec> {}
referenceEnabled: true
//传输模式NIO 非阻塞IO
transportMode: "NIO"
//监控锁看门狗的超时时间
lockWatchdogTimeout: 30000
checkLockSyncedSlaves: true
reliableTopicWatchdogTimeout: 600000
//保持订阅发布的顺序
keepPubSubOrder: true
useScriptCache: false
minCleanUpDelay: 5
maxCleanUpDelay: 1800
cleanUpKeysAmount: 100
nettyHook: !<org.redisson.client.DefaultNettyHook> {}
useThreadClassLoader: true
addressResolverGroupFactory: !<org.redisson.connection.DnsAddressResolverGroupFactory> {}

布隆过滤器RBloomfitter(bitmap) 去重业务场景— 缓存穿透方案

在业务中可能会遇到去重业务场景,传统的解决方式就是HashSet,contains方法,就可以判断,但是 缺陷很明显: 调用方法前,加载数据到内存,其功能基于内存实现,所以高并发场景下,就存在一些问题【内存本就有限,大量的内存占用可能导致GC被迫启动,而回收的对象过多,回收过程导致进程长时间暂停 ---- 程序卡住】(内存管理Cfeng之后分享JVM bg会给出方案 --as 宠用、对象池化、降低一次性对象使用)

  输入元素  ---> 大数据量中进行查询 ---> 存在?--yes---> 告知存在
                      |                 |_no--> 不存在
                      |________大数据量Set----存储--|           

而redisson的布隆过滤器可以很好解决这个困境,Bloom过滤器就是一个过滤器,可以过滤已经存在的元素,保证数据唯一,但是,其性能提升就是 不需要将数据加载进入内存

设计构造K个hash函数: f(x)…

初始化一个长度为N的bit数组,每个数组元素初值为0

Bloom filter步骤:

  • 将当前存在的元素 使用设计好的K 个 hash函数计算K 个hash值,将事先构造的位数组的相关下表置为1
  • 判重时, 使用上面的K个hash函数计算K个hash值,判断对应的下标是否为1,如果是,则大概率存在 — 可能误判,概率低, 否则,一定不存在

查看源图像

Bloom过滤器去重算法的核心就是 使用 K个hash函数计算hash值,和bit数组对应的bit位比较

但是在极少数情况下也是存在误判的,比如上面的数组a、b、c,使用hash函数计算后,将对应的bit位置1, 而d经过同样的hash计算,恰好位置都是1 ------ 出现误判

布隆过滤器的优点就是不加载数据进入内存,节省了内存空间, 但是缺点就是 存在一定的误判概率, 同时Bloom 过滤器的元素不能删除 ----- 因为计算阶段所有的下标覆盖共享,所以删除一个元素可能会误删

上述算法的实现的关键就是选择合适的hash函数 — 降低误判概率, 如果自我实现可能不是那么完美,而Redisson就提供了高度封装的组件布隆过滤器,可以开箱即用

使用方式就是直接创建布隆过滤器组件RBloomFilter<T即可, 指定key, 初始化时指定元素的容量和期望误差率; 其实际上也是一个set,可以使用add方法添加元素,contains进行判断

需要注意这些组件都是基于redisson的,所以都是需要使用key指定名称

这里可以简单实验Redisson的布隆过滤器, 首先需要来一个大数据量10万个数【100万也可】,检查指定的数字是否存在:【对象类型也是一样】

@SpringBootTest
@Slf4j
public class BloomFilterTests {

    @Resource
    private RedissonClient redissonClient;

    @Test
    public void testBloom() {
        final String key = "blogBloomFilter";

        Long total = 100000L;
        //这里查重的类型为Integer,创建bloom过滤器实例
        RBloomFilter<Integer> bloomFilter = redissonClient.getBloomFilter(key);
        //初始化过滤器,期望误差率0.01
        bloomFilter.tryInit(total,0.01);
        //初始化遍历向其中加入元素
        for(int i = 1; i <= total; i++) {
            bloomFilter.add(i);
        }
        //使用contains检查特定
        log.info("该布隆过滤是否包含数据:0: {}",bloomFilter.contains(0));
    }
}
2022-09-22 21:14:14.781  INFO 6360 --- [           main] cfengMiddleware.server.BloomFilterTests  : 该布隆过滤是否包含数据:0: false

在大部分情况下,使用布隆过滤器的体验都很efficient(布隆过滤器存储量大,查询速度快),当然其实Guava也提供了布隆过滤器的实现(还有令牌桶算法eg)

其可以作为缓存穿透的解决方案: 缓存穿透就是请求在缓存中都查询不到,数据库中也查询不到,从而导致再次请求还是会直接查询数据库,缓存就像没有起作用一样(可能会被恶意攻击)

缓存null值解决方案不能应对恶意攻击; 缓存null只能解决正常的错误请求,而如果是黑客攻击,每次都发起不存在的不同的请求: 比如存在1,2,3, 攻击者直接-1,-2 ---- -1000000, 发起洪流式查询,缓存直接没用,gameOver

所以可以使用让请求先进入布隆过滤器查找,再进入缓存,再进入数据库; 布隆过滤器的数据【因为查询Cache就像查询查询元素是否在集合中存在,所以可以使用】

发布-订阅主题RTopic 消息通信场景

基于Redisson也可以实现类似RabbitMQ的消息通信,依靠的就是发布订阅主题这个组件,当Producer需要发布消息时,会将消息数据发布到Topic, 而Consumer只要订阅了相关的channel就可以监听消费消息 【STOMP类型】,相比rabbitMQ简化许多: 主题、生产者、消费者

Redssion的消息通信的消费者不会像RabbitMQ一样自动监听,所以需要定时轮询

 生产者 ---msg--->  基于发布-订阅的topic <----- 订阅---- 消费者

因为这个过程实在没什么特别的,这里就简单延时RTopic如何实现(不需要像RabbitMQ配置组件)

直接new 一个 RTopic组件,使用其publishAsync就可以发布消息,Consumer直接创建即可

 * 演示基于Redission实现的 异步消息通信, 这里和rabbitMQ不同,关系简单,但是消费者没有监听机制,需要轮询
 */

@Component
@RequiredArgsConstructor
@Slf4j
public class RedissonTopicPublisher {

    private final RedissonClient redissonClient;

    /**
     * 发送消息,直接使用redissonClient创建RTopic对象,使用该对象就可以进行消息的发布,当然redis的key还是要的
     * @param messageEntity
     */
    public void sendMsg(MessageEntity messageEntity) {
        final String key = "redissonMsgTopicKey";
        try {
            if(!Objects.isNull(messageEntity)) {
                //基于主题的,所以要使用redisson的主题
                RTopic rTopic = redissonClient.getTopic(key);
                //发送消息,这里采用异步的方式发送
                rTopic.publishAsync(messageEntity);
            } 
        } catch (Exception e) {
            log.error("Redisson生产者发生异常: {}", e.fillInStackTrace());
        }
    }
}

这里发送消息的方式可以选择同步或者异步,异步就是publishAsync,可以看到发布消息很简单,其实就是调用主题RTopic对象进行publish,不需要组件注册,Topic主题就可以实现message的转发

Redisson的基于发布-订阅的主题不具备自动监听的功能,需要让消费者继承Spring的ApplicationRunner和Ordered接口,使得消费者可以在项目启动后不断监听业务逻辑

 *  redisson基于主题方式的发布订阅没有自动监听的功能,只能通过轮询的方式
 */

@Component
@RequiredArgsConstructor
@Slf4j
public class RedissonTopicConsumer implements ApplicationRunner, Ordered {

    private final RedissonClient redissonClient;

    //run方法会在容器启动后执行其中的代码,需要在方法里实现不断监听主题中的消息的动态,间接实现自动监听消费
    @Override
    public void run(ApplicationArguments args) throws Exception {
        final String key = "redissonMsgTopicKey";
        try {
            RTopic rTopic = redissonClient.getTopic(key);
            rTopic.addListener(MessageEntity.class, (charSequence, messageEntity) -> {
                log.info("redisson消费者监听到消息: {}",messageEntity);
                if(!Objects.isNull(messageEntity)) {
                    //执行业务逻辑
                    log.info("处理用户: {}",messageEntity.getId());
                }
            });
        } catch (Exception e) {
            log.error("消费者发生异常:{}",e);
        }
    }

    @Override
    public int getOrder() {
        return 0;
    }
}

之后简单测试一下,调用生产者发布消息

redissonTopicPublisher.sendMsg(new MessageEntity(10012766));


2022-09-22 22:38:41.857  INFO 10184 --- [   redisson-3-2] c.s.r.consumer.RedissonTopicConsumer     : redisson消费者监听到消息: MessageEntity(id=10012766)
2022-09-22 22:38:41.857  INFO 10184 --- [   redisson-3-2] c.s.r.consumer.RedissonTopicConsumer     : 处理用户: 10012766

这里存在的问题redisson服务加载慢,响应慢,不管是布隆过滤器还是当前组件还是之前的测试输出config都很缓慢… 检查和解决方案后续给出

数据映射Map — Eviction元素淘汰、本地缓存、数据分片

127.0.0.1:6379> keys *
 ......
 7) "redissonrRMap"
......
 9) "blogBloomFilter"
.......
13) "{blogBloomFilter}:config"

可以看到每个组件都是存储在redis数据库中,key就是定义的key

基于redis的分布式集合的数据结构映射Map,在Redisson中为RMap【其中的各种组件都是RXXX】,RMap功能组件实现了java.util.concurrent.ConcurrentMap和java.util.Map,拥有这两个接口的各种方法,同时还有独特的Redssion的操作方法

//向put添加方法有很多方式: 异步、添加前判断存在、快速添加.....
//定义 存储缓存中间件的redis的Key
        final String key = "redissonrRMap";
        //创建2个对象用于插入RMap
        MessageEntity message1 = new MessageEntity(12);
        MessageEntity message2 = new MessageEntity(13);
        //获取RMap组件
        RMap<Integer,MessageEntity> rMap = redissonClient.getMap(key);
        //正常添加元素
//        rMap.put()
        //异步方式添加元素
//        rMap.putAsync()
        //添加元素前判断是否存在
//        rMap.putIfAbsent()
        rMap.putIfAbsentAsync(1,message1); //异步方式
        //正常的快速方式,各个都有异步,还有fastputIfAbsent等多种方法
        rMap.fastPut(2,message2);
    }

除了Put方法之外,还有getAll、remove、fastRemove;可以查询接口文档查看相关的方法

RMap不同的功能特性的数据结构:

  • Eviction元素淘汰: 允许对一个映射中每个元素单独设置有效时间和最长闲置时间
  • LocalCache本地缓存: 部分数据保存本地内存,同名的缓存公用一个发布Topic
  • Sharding数据分片: redis集群环境,利用分库的原理,将单一的映射切换为若干个个小的映射结构,均匀分布于集群,读写能力增强

Redisson提供的RMap和原生Redis提供的Hash其实就是一回事,只是操作方法不同,性能更好,Hash不具备让某单个元素失效的功能,RMap具有,不需要因为所有的数据公用key导致数据都丢失

@Test
    public void testRMapEviction() throws InterruptedException {
        //这里和RMap不同,使用元素淘汰机制,就使用的更加精细化的组件RMapCache
        final String key = "redissRMapCache";

        //映射缓存功能组件 ---- 元素淘汰
        RMapCache<Integer,MessageEntity> rMapCache = redissonClient.getMapCache(key);
        //创建2个对象用于插入RMap
        MessageEntity message1 = new MessageEntity(12);
        MessageEntity message2 = new MessageEntity(13);

        //添加对象进入组件
        rMapCache.putIfAbsent(12,message1);
        //对象2设置过期时间50s
        rMapCache.put(13,message2,50L, TimeUnit.SECONDS);

        //获取MapCache中所有的key,放入Set
        Set<Integer> set = rMapCache.keySet();
        //获取所有
        Map<Integer,MessageEntity> messageEntityMap = rMapCache.getAll(set);
        //元素列表
        System.out.println(messageEntityMap);
        //过51s再看
        Thread.sleep(51000);
        System.out.println(messageEntityMap);
    }

之所以RMapCache可以如此精细化淘汰,是因为底层进行put时,如果指定了TTL,那么Redisson会额外开启一个定时任务,定时扫描数据结构是否到了存活时间, 相当于会轮询,可能会比较吃性能

数据集合Set — 有序集合、计分排序集合、字典排序集合

Redisson中的集合组件RSet实现了java中的java.util.Set接口,该组件可以保证唯一性,同时还有SortedSet功能组件、计分排序综合组件ScoredSortedSet嗨哟字典排序集合功能组件LexSortedSet; 这些数据结构再redis中就已经实现了,redisson只是做了进一步的优化和综合

class RsetComparator implements Comparator<MessageEntity> {

        @Override
        public int compare(MessageEntity o1, MessageEntity o2) {
            return o2.getId().compareTo(o1.getId());
        }
    }

    @Test
    public void testRedissonRSet() {
        //定义存储缓存中间件的key
        final String key = "redissonRset";

        MessageEntity message1 = new MessageEntity(12);
        MessageEntity message2 = new MessageEntity(13);
        //定义有序集合实例
        RSortedSet<MessageEntity> rSortedSet = redissonClient.getSortedSet(key);
        //设置有序集合的元素比较器
        rSortedSet.trySetComparator(new RsetComparator());
        //添加元素到集合
        rSortedSet.add(message1);
        rSortedSet.addAsync(message2);

        //查看此时有序Set的列表
        Collection<MessageEntity> result = rSortedSet.readAll();
        System.out.println(result);
    }
}

除此之外还有Sored的功能组件

@Test
    public void testRedissonRSetScore() {
        final String key = "RedissonScoredSortedSet";

        RScoredSortedSet<MessageEntity> rScoredSortedSet = redissonClient.getScoredSortedSet(key);

        //添加元素,add方法即可
        //Redisson底层默认是采用正序的方式进行排序,可以指定SortedOrder.DESC, ASC为正序
    }

redisson的各种组件开箱即用,凡是需要对缓存中的数据列表进行排序的,都可以使用Redisson的ScoredSortedSet

消息队列 — 基于主题的基础上封装,推荐MQ

基于Redisson的消息队列实际上还是基于主题的订阅发布,Producer将消息发布到队列,Consumer就可以从队列中监听接收,只是没有RabbitMQ那种自动监听@RabbitListener,所以需要不断的轮询,可以使用@EnableSheulding快捷开启定时器定时扫描监听

@Component
@RequiredArgsConstructor
@Slf4j
public class RedissonQueuePublisher {

    private final RedissonClient redissonClient;

    public void sendMsg(MessageEntity messageEntity) {
        //定义该队列组件的名称
        final String queueName = "redissonBasicQueue";

        //声明组件
        RQueue<MessageEntity> rQueue = redissonClient.getQueue(queueName);
        //使用组件向队列组件中发送消息
        try {
            rQueue.add(messageEntity);
            log.info("Redisson消息发送成功,RQueue: {}",messageEntity);
        } catch (Exception e) {
            log.error("redisson消息发送失败");
        }
        //
    }
}

如果使用redisson的队列进行通信,直接使用RQueue组件,调用add方法就可以发送消息,就可以将消息发送到key的队列; 监听处理消息的QueueConsumer类,需要注意,RQueue和主题组件一样,当消息到来是,是不会主动将消息推送到消费者的,需要comsumer不断监听队列中是否有消息到来

@Component
@RequiredArgsConstructor
@Slf4j
public class RedissonTopicConsumer implements ApplicationRunner, Ordered {

    private final RedissonClient redissonClient;

    //run方法会在容器启动后执行其中的代码,需要在方法里实现不断监听主题中的消息的动态,间接实现自动监听消费
    @Override
    public void run(ApplicationArguments args) throws Exception {
        final String key = "redissonMsgTopicKey";
        try {
            RTopic rTopic = redissonClient.getTopic(key);
            rTopic.addListener(MessageEntity.class, (charSequence, messageEntity) -> {
                log.info("redisson消费者监听到消息: {}",messageEntity);
                if(!Objects.isNull(messageEntity)) {
                    //执行业务逻辑
                    log.info("处理用户: {}",messageEntity.getId());
                }
            });
        } catch (Exception e) {
            log.error("消费者发生异常:{}",e);
        }
    }

    @Override
    public int getOrder() {
        return 0;
    }
}

从分布式队列中监听、消费消息,需要先获取组件实例RQueue,不断调用poll()方法,弹出队列消息,如果消息不为空,代表有消息来了,如果没有消息就不执行

但是这样看是很消耗性能的,while循环没有结束

延迟队列 — 使用TTL不同的msg

上面已经提供,RabbitMQ的延迟队列在有些特殊的情景下可能会出现问题,因为RabbitMQ是严格按照基本的队列的FIFO,而忽视的TTL, 所以当所有的Mesage的TTL相同,当然还是直接使用RabbitMQ即可,但是当Message的TTL不同时(TTL存活时间的单位为ms),使用Redisson的delayQueue就可以 RBlockingQueue

开始 ---->  生产者 ---> 生产消息 ---ttl---> 阻塞式队列---> TTL到?
													 |->  消息进入下一个中转站--> Queue <---- 用户持续监听

Redisson的延迟队列需要借助阻塞式队列作为中转站, 充当消息的暂存区(相当于死信队列),TTL存活时间移到,消息进入真正的队列

延迟队列的实现主要就是借助RBlockingQueue 和延迟队列RDelayQueue, RDelayQueue需要使用RBlockingQueue作为参数

@Component
@RequiredArgsConstructor
@Slf4j
@EnableScheduling
public class RedissonDelayQueuePublisher {

    private final RedissonClient redissonClient;

    //发布消息进入延迟队列
    public void sendDelyMsg(MessageEntity messageEntity,Long ttl) {
        //定义延迟队列的名称
        final String delayQueueName = "redissonDelayQueueV3";

        //定义阻塞队列组件
        RBlockingDeque<MessageEntity> rBlockingDeque = redissonClient.getBlockingDeque(delayQueueName);
        //定义获取延迟队列实例
        RDelayedQueue<MessageEntity> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingDeque);
        //延迟队列以阻塞队列为参数,先将消息放入阻塞队列阻塞住
        //将消息放入延迟队列,设置ttl,会按照TTL排序
        try {
            rDelayedQueue.offer(messageEntity,ttl, TimeUnit.SECONDS);
            log.info("延迟队列Redisson 生产者 {}",messageEntity);
        } catch (Exception e) {
            log.error("延迟队列Redisson 生产者Error, {}",e.fillInStackTrace());
        }
    }
}

而对于消费者,就可以直接使用sheduled定时器轮询

@Component
@Slf4j
@EnableScheduling
@RequiredArgsConstructor
public class RedissonDelayQueueConsumer {

    private final RedissonClient redissonClient;

    @Scheduled(cron = "*/1 * * * *")
    public void consumeMsg() throws Exception {
        //定义延迟队列
        final String key = "redissonDelayQueueV3";
        RBlockingQueue<MessageEntity> rBlockingQueue = redissonClient.getBlockingQueue(key);
        //从队列中取出消息
        MessageEntity messageEntity = rBlockingQueue.take();
        if(!Objects.isNull(messageEntity)) {
            log.info("Redisson延迟消息 消费者: 监听消息: {}",messageEntity);
        }
    }
}

这里就是使用的定时器任务轮询队列,如果有消息就从队列中取出

分布式锁可重入 – lock()

现在使用redis做锁,会出现TTL不好把控的问题,Redisson的看门狗可以解决这个问题,默认情况下,看门狗检查锁的超时时间为30s,实际生产中可以修改Config.lockWatchdogTimeout进行设置

Redisson设计了多种不同应用场景下的分布式锁,主要包含可重入锁(Reentrant lock)、公平锁(Fair Lock)、 联锁(MultiLock)、红锁(RedLock)、读写锁(ReadWriteLock)、信号量(Semaphore)、闭锁(CountDownLatch)

Redisson提供的可重入锁有lock和trylock两种方式,lock方式返回值为void,没有获取到锁的线程一直等待,trylock方式类似之前的redis的锁,返回值为boolean,代表加锁成功或者失败

也就是说,获取锁失败的线程不会阻塞等待获取锁,而是像乐观锁简单处理方式一样直接失败

    			多线程
   				 |
    			需要操作共享资源?
    			|
    			调用Redisson的tryLock和lock方法尝试获取锁
    			|
    		共享资源 --->  当前线程获取到锁? --> 是---> 操作共享资源--> 释放锁、结束
                 			  |
                   				否
线程阻塞后重新尝试获取 --可重入 _____|____> 一次性的,获取到锁就直接失败

lock锁的 返回值 void,如果lock没有获取到锁就会一直等待,不能被打断, 当然其是可重入锁,获取到就将count set为1,再次获取就 ++;如果没有获取到锁,当前线程阻塞,休眠一直到获取到锁

/**
     * 基于Redisson 的lock方式,一直阻塞直到获取
     */
    @Override
    public void takeMoneyWithRedisson(UserAccountDto userAccountDto) throws Exception {
        final String key = "redisson:account:" + userAccountDto.getUserId() + "-lock";

        //获取分布式锁实例(可重入锁)
        RLock lock = redissonClient.getLock(key);

        //操作共享资源之前上锁,这里lock.lock方法会一直阻塞直到获取到锁
        try {
            //这里先设置锁的释放时间,10s后自动释放,看门狗可以进行监控
            lock.lock(10L,TimeUnit.SECONDS);
            //当前线程锁住资源
            UserAccount userAccount = userAccountMapper.selectByUserId(userAccountDto.getUserId());

            if(!Objects.isNull(userAccount) && userAccount.getAmount().doubleValue() > userAccountDto.getAccount()) {
                int res = userAccountMapper.updateAmount(userAccountDto.getAccount(),userAccount.getId());
                if(res > 0) {
                    //提现成功
                    UserAccountRecord record = new UserAccountRecord();
                    record.setCreateTime(LocalDateTime.now());
                    record.setAccountId(userAccount.getId());
                    record.setMoney(BigDecimal.valueOf(userAccountDto.getAccount()));

                    accountRecordMapper.insert(record);
                    log.info("Redisson分布式锁lock -当前待提现金额为:{},账户的余额为: {}, 成功", userAccountDto.getAccount(), userAccount.getAmount());
                    //成功后结束
                } else {
                    throw new Exception("更新出现异常");
                }
            }
        } catch (Exception e) {
            log.info("出现异常: {}",e.fillInStackTrace());
        } finally {
            if(lock != null) {
                //释放分布式锁,这里可以使用Lua保证锁是当前锁
                lock.forceUnlock();
            }
        }
    }

分布式锁可重入 — trylock()

redisson做分布式锁的效果很佳,因为其拥有自动延时机制: watch dog, 后台线程,会轮询客户端的lock,并且不断延时锁的时间, 可以解决使用redis的SETNX 做分布式锁,业务还未完成锁却自动释放的问题(lock的expireTime的设置问题)

在Redis中,配合Lua执行命令,Redis保证原子性执行命令,Lua脚本会保证整体一次性执行完成,也就是Automic

可重入锁,主要就是有一个hincrby, 记录的当前线程进入锁,释放锁的时候,无锁就直接返回,有锁但是不是当前线程的锁,也不能释放,只有当前线程加的锁才能返回

当获取到锁,返回值为true,count置为1,**当获取锁失败时,返回false,这个时候线程可以不需要阻塞等待,直接去做其余的事情, tryLock方式获取锁是非公平方式

@Override
    public void takeMoneyWithRedisLock(UserAccountDto userAccountDto) throws Exception {
        ValueOperations valueOperations = redisTemplate.opsForValue();
        //读取操作,修改操作之前需要获取lock锁,也就是redis的一个key
        //首先还是需要先进行读取,读取大家都可以,修改操作就是一个一个来
        UserAccount userAccount = userAccountMapper.selectByUserId(userAccountDto.getUserId());
        //之后进行判断
        if(!Objects.isNull(userAccount) && userAccount.getAmount().doubleValue() - userAccountDto.getAccount() >= 0) {
            //如果余额够,那么就进行提现,写操作,加分布式锁【redis的lock锁】,操作的是元组,所以主要是避免同一个用户的不同线程
            final String key = "redisson:account:" + userAccountDto.getUserId() + "-lock";
           
            RLock lock = redissonClient.getLock(key);
            //尝试获取锁,这里就是trylock,返回值为true代表获取锁成功,失败了不需要像lock一样一直阻塞
            boolean result = lock.tryLock(100,10L,TimeUnit.SECONDES);
            if (result) {
                try {
                    //提现
                    int res = userAccountMapper.updateAmount(userAccountDto.getAccount(), userAccount.getId());
                    //和之前的是相同的,现在IDEA都不提倡重复的代码,最好能够复用
                    if (res > 0) {
                        UserAccountRecord record = new UserAccountRecord();
                        record.setCreateTime(LocalDateTime.now());
                        record.setAccountId(userAccount.getId());
                        record.setMoney(BigDecimal.valueOf(userAccountDto.getAccount()));

                        accountRecordMapper.insert(record);
                        log.info("redisson分布式锁 -当前待提现金额为:{},账户的余额为: {}, 成功", userAccountDto.getAccount(), userAccount.getAmount());
                        //成功后结束
                        return;
                    } else {
                        throw new Exception("redisson分布式锁 -- 余额不足");
                    }
                } catch (Exception e) {
                    throw e;
                } finally {
                    //不管发生什么异常情况,一定要释放锁,避免死锁,当然释放的是当前对象的锁
                    if(lock != null) {
                        lock.forceUnlock();
                    }
                }
            }
        }
        throw new Exception("出现异常");
    }

这里只是简单给出redisson的一点简单intro,可以根据具体的业务场景选用,分布式锁还是推荐使用redisson的方式实现,原生redis的SETNX方式的问题更多,但是像更小的并发量或者修改少还是乐观锁就可了

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐