当并发量大的时候,如果我们有对redis大量进行数据读写操作时,可能会因为大量线程的读写而造成部分线程的阻塞,同时我们的服务还是游刃有余的。只时,我们应该怎么处理呢!
我们可以使用redis-reactive来进行“背压”,来减少阻塞,提交效率。

现有十万的数据需要存储在redis服务上,普通的redis写操作,堆内存图如下:请添加图片描述
reactive的redis写操作,堆内存图如下:
请添加图片描述
从两张图可发现,reactive-redis的写操作使用的时间不到普通redis的写操作的一半不到,可见其性能的卓越性。

代码案例如下:
首先,引入jar

<dependency>
   	<groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

注意:因为spring-boot-starter-data-redis和spring-boot-starter-data-redis-reactive,两个引入其本身主要引入了都是spring-data-redis,所以引入spring-boot-starter-data-redis-reactive即可
在这里插入图片描述
配置文件:
ReactiveRedisConfig

/**
 * 反应redis配置
 */
@Configuration
public class ReactiveRedisConfig {

    @Bean
    public RedisSerializationContext<String, Object> redisSerializationContext() {
        RedisSerializationContext.RedisSerializationContextBuilder<String, Object> builder =
                RedisSerializationContext.newSerializationContext();
        Jackson2JsonRedisSerializer<Object> redisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper om = new ObjectMapper();
        // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        redisSerializer.setObjectMapper(om);

        builder.key(new StringRedisSerializer());
        builder.value(redisSerializer);
        builder.hashKey(new StringRedisSerializer());
        builder.hashValue(redisSerializer);

        return builder.build();
    }

    @Bean
    public ReactiveRedisTemplate<String, Object> reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
        return new ReactiveRedisTemplate<>(connectionFactory,redisSerializationContext());
    }
}

RedisConfig

@Configuration
public class RedisConfig {

    /**
     * retemplate相关配置
     * @param factory
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {

        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 配置连接工厂
        template.setConnectionFactory(factory);

        //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
        Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om = new ObjectMapper();
        // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jacksonSeial.setObjectMapper(om);

        // 值采用json序列化
        template.setValueSerializer(jacksonSeial);
        //使用StringRedisSerializer来序列化和反序列化redis的key值
        template.setKeySerializer(new StringRedisSerializer());

        // 设置hash key 和value序列化模式
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(jacksonSeial);
        template.afterPropertiesSet();

        return template;
    }

}

redis写操作代码

@Autowired
private RedisUtil redisUtil;

@PostMapping("/api/redis/put/more")
public void putMore(){
    for(int a = 1;a <= 100000;a++){
        System.out.println(a);
        redisUtil.set(a+"","test"+a,35);
    }
}
//------------------------

@Autowired
private ReactiveRedisTemplate<String,Object> reactiveRedisTemplate;

@PostMapping("/api/redis/reactive/put/more")
public void putMoreForReactive(){
    for(int a = 1;a <= 100000;a++){
        System.out.println(a);
        reactiveRedisTemplate.opsForValue().set(a+"","testreactive"+a, Duration.ofSeconds(35)).subscribe(
                b -> System.out.println(StrUtil.format("set result:{}", b)),
                e -> System.out.println(StrUtil.format("set data error:{}", e)));
    }
}

//-------------------------------华丽的分割线-------------------------
有细心的朋友,可能会发现redis-reactive 是采用观察者模式(即Publisher-Subscriber),通过Subscriber进行数据流的运行,如果redis写操作还未处理或执行,那此时如果去读取该值,redis-reactive会对该读取的流进行阻塞,直到能写操作完成订阅后,才能获取到值。

以上代码,为作者个人观点,如有不足处,请指教;

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐