springBoot整合redission框架

pom依赖

  <!--整合redission框架start-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.12.5</version>
        </dependency>
        <!--整合redission框架enc-->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

YML配置

spring:
  #redisson配置,默认连接库0,无密码只配置连接地址即可
  redis:
    host: 127.0.0.1
    database: 0
    password:

redission锁使用

package com.cyc.redission.n1;

import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.Calendar;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("/lock")
public class RedissonLockTest {

    public static int amount = 5;

    @Autowired
    private RedissonClient redissonClient;

    //没获取到锁阻塞线程
    @RequestMapping(value = "/test", method = RequestMethod.GET)
    public Integer test() {
        RLock lock = null;
        try {
            //TODO 创建一个名字为lock的锁,如果是并发访问,会阻塞到 lock.lock();,知道2秒后,才能执行下面的逻辑代码
            lock = redissonClient.getLock("lock");
            lock.lock();
            System.out.println(formatDate() + " " + Thread.currentThread().getName() + "获取到锁");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (null != lock && lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
        return amount;
    }

    //立刻返回获取锁的状态
    @RequestMapping("/test1")
    public Integer test1() {
        RLock lock = null;
        try {
            lock = redissonClient.getLock("lock");
            //TODO 判断获取锁,执行业务逻辑,否则直接返回提示信息
            if (lock.tryLock()) {
                System.out.println(formatDate() + " " + Thread.currentThread().getName() + "获取到锁");
                Thread.sleep(2000);
            } else {
                System.out.println(formatDate() + " " + Thread.currentThread().getName() + "已抢光");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (null != lock && lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
        return amount;
    }

    //立刻返回获取锁的状态
    @RequestMapping("/test2")
    public Integer test2() {
        RLock lock = redissonClient.getLock("lock"); //非公平锁,随机取一个等待中的线程分配锁
        //RLock lock=redissonClient.getFairLock("lock"); //公平锁,按照先后顺序依次分配锁
        try {
            if (lock.tryLock(2, 10, TimeUnit.SECONDS)) { //最多等待锁2秒,10秒后强制解锁,推荐使用
                System.out.println(formatDate() + " " + Thread.currentThread().getName() + "获取到锁");
                Thread.sleep(4500);
            } else {
                System.out.println(formatDate() + " " + Thread.currentThread().getName() + "未获取到锁");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (null != lock && lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
        return amount;
    }

    public String formatDate() {
        Calendar c = Calendar.getInstance();
        return c.get(Calendar.SECOND) + ":" + c.get(Calendar.MILLISECOND);
    }
}

限流器

package com.cyc.redission.n1;

import org.redisson.api.RRateLimiter;
import org.redisson.api.RateIntervalUnit;
import org.redisson.api.RateType;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/*
 * 限流器
 * 1.先调用init方法生成5个令牌
 * 2.通过该限流器的名称rateLimiter来获取令牌limiter.tryAcquire()
 * 3.谁抢到,谁先执行,否则返回提示信息,可以用于秒杀场景
 * */
@RestController
@RequestMapping("/limiter")
public class RateLimiterTest {

    @Autowired
    private RedissonClient redissonClient;

    //初始化限流器
    @RequestMapping("/init")
    public void init() {
        RRateLimiter limiter = redissonClient.getRateLimiter("rateLimiter");
        limiter.trySetRate(RateType.PER_CLIENT, 5, 1, RateIntervalUnit.SECONDS);//每1秒产生5个令牌
    }

    //获取令牌
    @RequestMapping("/thread")
    public void thread() {
        RRateLimiter limiter = redissonClient.getRateLimiter("rateLimiter");
        if (limiter.tryAcquire()) {//尝试获取1个令牌
            System.out.println(Thread.currentThread().getName() + "成功获取到令牌");
        } else {
            System.out.println(Thread.currentThread().getName() + "未获取到令牌");
        }
    }
}

List、Queue测试

package com.cyc.redission.n1;

import org.redisson.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

//List、Queue测试
@RestController
@RequestMapping("/collection")
public class RedisQueueTest {

    @Autowired
    private RedissonClient redissonClient;


    /**
     * //List测试 - 添加元素
     * http://127.0.0.1:8080/collection//list/add?a="亚春2"
     * http://127.0.0.1:8080/collection//list/add?a="亚春2"
     * 1. 可添加相同名称的元素
     *
     * @param a
     * @return
     */
    @RequestMapping("/list/add")
    public List<String> addAndGetList(String a) {
        RList<String> list = redissonClient.getList("my_list");
        list.add(a);
        return list.readAll();
    }


    /**
     * //List测试 - 删除元素
     * http://127.0.0.1:8080/collection//list/del?a="亚春2"
     * 1 removeAll():删除值相同的多个元素
     * 2 remove()删除元素,仅删除匹配到的第一个元素
     *
     * @param a
     * @return
     */
    @RequestMapping("/list/del")
    public List<String> removeList(String a) {
        RList<String> list = redissonClient.getList("my_list");
      /* 自定义删除条件
      list.removeIf(new Predicate<String>() {
          @Override
          public boolean test(String s) {
              return s.length()>10;
          }
      });*/

        //list.remove(a);//删除元素,仅删除匹配到的第一个元素
        list.removeAll(Arrays.asList(a));//删除指定集合中所有元素
        return list.readAll();
    }

    //Queue测试 - 添加元素
    @RequestMapping("/queue/add")
    public List<String> addQueue(String a) {
        RQueue<String> list = redissonClient.getQueue("my_queue");
        list.add(a);//添加一个元素到集合最末尾
        return list.readAll();
    }

    //Queue测试 - 读取元素
    @RequestMapping("/queue/poll")
    public String pollQueue() {
        RQueue<String> list = redissonClient.getQueue("my_queue");
        return list.poll();//从队列的头部获取一个元素并从队列中删除该元素,队列为空时返回null
    }

    //Blocking Queue测试 - 添加元素
    @RequestMapping("/blocking/add")
    public List<String> addBlockingQueue(String a) {
        RBlockingQueue<String> list = redissonClient.getBlockingQueue("my_blocking_queue");
        list.add(a);
        return list.readAll();
    }

    //Blocking Queue测试 - 读取元素
    @RequestMapping("/blocking/get")
    public String getBlockingQueue() throws InterruptedException {
        RBlockingQueue<String> list = redissonClient.getBlockingQueue("my_blocking_queue");
        //return list.poll();//从队列的头部获取一个元素并从队列中删除该元素,队列为空时返回null
        return list.take();//从队列的头部获取一个元素并从队列中删除该元素,队列为空时阻塞线程
        //return list.peek();//从队列的头部获取一个元素但不删除该元素,队列为空时返回null
    }

    //Delayed Queue测试 - 添加元素
    @RequestMapping("/delayed/add")
    public List<String> addDelayedQueue(String a, Long b) {
        RQueue<String> queue = redissonClient.getQueue("my_blocking_queue");//目标队列
        RDelayedQueue<String> list = redissonClient.getDelayedQueue(queue);//延迟队列,数据临时存放地,发出后删除该元素
        list.offer(a, b, TimeUnit.SECONDS);
        return list.readAll();
    }

    @PostConstruct
    public void acceptElement() {
        RBlockingQueue<String> list = redissonClient.getBlockingQueue("my_blocking_queue");
        list.subscribeOnElements(new Consumer<String>() {
            public void accept(String s) {
                System.out.println("获取到元素:" + s);
            }
        });
    }
}

RedisRScoredSortedSet

package com.cyc.redission.n1;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateUtils;
import org.redisson.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.concurrent.TimeUnit;

@Slf4j
@RestController
@RequestMapping("/score")
public class RedisRScoredSortedSet {

    @Autowired
    private RedissonClient redissonClient;

    @RequestMapping("/add")
    public String addScore(String a,Double b){
        //创建Set
        RScoredSortedSet<String> set = redissonClient.getScoredSortedSet("simpleSet1");
        //设置过期时间
        boolean exists=set.isExists();
        set.addListener(new ExpiredObjectListener() {
            public void onExpired(String name) {
                System.out.println("超时事件被触发,name="+name);
                log.info("超时事件被触发,name={}",name);
            }
        });
        //添加元素
        set.addScore(a,b);
        if(!exists) {
            set.expireAt(DateUtils.addMinutes(new Date(), 2));
        }
        //获取元素在集合中的位置
        Integer index=set.revRank(a);
        //获取元素的评分
        Double score=set.getScore(a);
        log.info("size={},a={},index={},score={}",set.size(),a,index,score);

        //可以设置单一元属过期,但是不能触发对应过期事件
        RSetCache<String> map = redissonClient.getSetCache("simpleSet2");
        map.add(a,1, TimeUnit.MINUTES);
        //TODO 可设置,但不会触发监听.
        map.addListener(new ExpiredObjectListener() {
            public void onExpired(String name) {
                log.info("entryExpiredListener超时事件被触发,event={}",name);
            }
        });

        //不能设置单一元属过期
        RSet<String> set1 = redissonClient.getSet("simpleSet3");
        set1.add(a);

        return "SUCCESS";
    }

    @RequestMapping("/show")
    public String showList(String key){
        log.info("排行榜={}", key);
        RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(key);
        set.stream().forEach(a->{
            Integer index=set.revRank(a);//获取元素在集合中的位置
            Double score=set.getScore(a);//获取元素的评分
            log.info("size={},key={},index={},score={}", set.size(), a, index, score);
        });
        return "SUCCESS";
    }

    @RequestMapping("/clear")
    public String clearList(){
        long size=redissonClient.getKeys().deleteByPattern("*impl*");
        log.info("删除数量:{}",size);
        return "SUCCESS";
    }

    @RequestMapping("/deleteAll")
    public String deleteAll(String pattern){
        long amount=redissonClient.getKeys().deleteByPattern(pattern);
        log.info("删除数量:{}",amount);
        return "SUCCESS";
    }
}

RedissonMapTest

package com.cyc.redission.n1;

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RMapCache;
import org.redisson.api.RedissonClient;
import org.redisson.api.map.event.EntryEvent;
import org.redisson.api.map.event.EntryExpiredListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;

@Slf4j
@RestController
@RequestMapping("/map")
public class RedissonMapTest {

    @Autowired
    private RedissonClient redissonClient;
    //测试map集合的名称
    private final static String key = "my_test_map";


    /**
     * //初始化Listener,仅初始化一次,过期事件不一定那么及时触发,存在一定的延时
     * //注意如果触发2次,则会执行2次回调..
     */
    @PostConstruct
    public void init() {
        //redissonClient.getXXX来创建不同类型的map ,KEY为要执行监听的key
        RMapCache<String, String> map = redissonClient.getMapCache(key);
        map.addListener(new EntryExpiredListener<String, String>() {
            @Override
            public void onExpired(EntryEvent<String, String> event) {
                log.info("{}已过期,原来的值为:{},现在的值为:{}", event.getKey(), event.getOldValue(), event.getValue());
            }
        });
        log.info("{}初始化完成", key);
    }

    //存放Key-Value对
    //http://127.0.0.1:8080/map/put/?a=myKey&b=myKeyValue22&flag=true
    @RequestMapping("/put")
    public String put(String a, String b, boolean flag) {
        //redissonClient.getXXX来创建不同类型的map ,KEY为要执行监听的key
        RMapCache<String, String> map = redissonClient.getMapCache(key);
        if (flag) {
            map.put(a, b, 2, TimeUnit.SECONDS);//key设置有效时间,并在实现时,触发上面的监听函数
        } else {
            map.put(a, b);
        }
        log.info("设置{}={}成功", a, b);
        return "SUCCESS";
    }

    /**
     * 遍历map中的所以元素,需要指定key
     *
     * @return
     */
    @RequestMapping("/show")
    public String put() {
        RMapCache<String, String> map = redissonClient.getMapCache(key);
        map.keySet().stream().forEach(i -> log.info("{},{}", i, map.get(i)));
        return "SUCCESS";
    }
}

RedisTopicTest

package com.cyc.redission.n1;

import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;

/*
 * 话题(订阅分发)
 * */
@RestController
@RequestMapping("/topic")
public class RedisTopicTest {

    @Autowired
    private RedissonClient redissonClient;


    /**
     * //分发
     * http://127.0.0.1:8080/topic/produce?a=redis主题
     *
     * @param a
     * @return
     */
    @RequestMapping("/produce")
    public String produce(String a) {
        RTopic topic = redissonClient.getTopic("anyTopic");
        topic.publish(a);
        return "发送消息:" + a;
    }

    //订阅
    @PostConstruct
    public void consume() {
        RTopic topic = redissonClient.getTopic("anyTopic");//订阅指定话题
        //RPatternTopic topic=redissonClient.getPatternTopic("*any*");//指定话题表达式订阅多个话题
        topic.addListener(String.class, new MessageListener<String>() {
            @Override
            public void onMessage(CharSequence charSequence, String map) {
                System.out.println("接收到消息:" + map);
            }
        });
    }
}

RedisTransactionTest

package com.cyc.redission.n1;

import org.apache.commons.lang3.RandomUtils;
import org.redisson.api.RMap;
import org.redisson.api.RTransaction;
import org.redisson.api.RedissonClient;
import org.redisson.api.TransactionOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.TimeUnit;

/*
* Redisson为RMap、RMapCache、RLocalCachedMap、RSet、RSetCache和RBucket这样的对象提供了具有ACID属性的事务功能
* Redisson事务通过分布式锁保证了连续写入的原子性,同时在内部通过操作指令队列实现了Redis原本没有的提交与滚回功能
* 当提交与滚回遇到问题的时候,将通过org.redisson.transaction.TransactionException告知用户
* */
@RestController
@RequestMapping("/tx")
public class RedisTransactionTest {

    @Autowired
    private RedissonClient redissonClient;

    @RequestMapping("/test/{key}")
    public void test(@PathVariable String key){
        TransactionOptions options=TransactionOptions.defaults().syncSlavesTimeout(5, TimeUnit.SECONDS)
                .responseTimeout(3,TimeUnit.SECONDS).retryInterval(2,TimeUnit.SECONDS)
                .retryAttempts(3).timeout(5,TimeUnit.SECONDS);
        RTransaction transaction=redissonClient.createTransaction(options);
        RMap<String, Integer> map=transaction.getMap("myMap");
        System.out.println(map.get("userId"));
        map.put("userId", RandomUtils.nextInt(1,100));
        System.out.println(map.get(key).toString());
        try {
            transaction.commit();
        } catch (Exception e) {
            e.printStackTrace();
            transaction.rollback();
        }
    }
}

CountDownLatchTest

package com.cyc.redission.n1;

import org.redisson.api.RCountDownLatch;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("/count")
public class CountDownLatchTest {

    @Autowired
    private RedissonClient redissonClient;

    //主线程等待所有子线程完成
    @RequestMapping("/await")
    public void await(){
        try {
            RCountDownLatch latch = redissonClient.getCountDownLatch("latch");
            latch.trySetCount(3);//设置计数器初始大小
            long count = latch.getCount();
            System.out.println("count = " + count);
            latch.await();//阻塞线程直到计数器归零
            System.out.println(Thread.currentThread().getName()+"所有子线程已运行完毕");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //子线程
    @RequestMapping("/thread")
    public void thread(){
        try {
            RCountDownLatch latch = redissonClient.getCountDownLatch("latch");
            System.out.println(Thread.currentThread().getName()+"抵达现场");
            TimeUnit.SECONDS.sleep(1);
            latch.countDown();//计数器减1,当计数器归零后通知所有等待着的线程恢复执行
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐