什么是Pipeline

Pipeline简单来说就是和redis建立一个connection,类似于一个管道,用于数据连接。

使用场景

现在有个项目大批量刷新缓存的场景,redis数据类型为hash,而redisTemplate中封装的都是单个hashkey的插入,这样每次插入一个hash值就会建立一次连接,非常耗时。
redis的hash结构如下
Key HashKey HashValue
假设有10个Key,每个Key里面有10个HashKey,则需要建立100次连接,即便多线程情况下也会耗时很多。
经过测试,通过pipeline可以提升至少十倍的插入效率。
经对比,Key的数据量较大时通过一次pipeline连接插入比按key分多线程去建立多个pipeline连接要快,而pipeline中每次插入时间可以忽略不计,大部分是0ms,偶尔出现几十ms的情况。

代码示例

话不多说上代码,这是一个项目启动初始化缓存的案例。

@Resource(name = "redisTemplate")
private RedisOperations redisOperations;

private void doRefresh() {
        RedisSerializer keySerializer = redisOperations.getKeySerializer();
        RedisSerializer hashKeySerializer = redisOperations.getHashKeySerializer();
        RedisSerializer hashValueSerializer = redisOperations.getHashValueSerializer();
        log.info("开始刷新初始化指标参数缓存");
        long start = System.currentTimeMillis();
        //清空缓存
        long startDelete = System.currentTimeMillis();
        Set members = redisOperations.keys(appId + StrUtil.COLON + POINT_PREFIX.concat("*"));
        if (CollUtil.isNotEmpty(members)) {
            redisOperations.delete(members);
        }
        long endDelete = System.currentTimeMillis();
        //查询数据库
        long startMysql = System.currentTimeMillis();
        List<IndexParamCachePO> indexParamCacheInfo = indexParamCacheMapper.getIndexParamCacheInfo();
        long endMysql = System.currentTimeMillis();
        //重新加载缓存
        long startCache = System.currentTimeMillis();
        Map<String, List<IndexParamCacheInfoBO>> indexParamMap = indexParamCacheInfo.stream().map(k -> {
            IndexParamCacheInfoBO indexMeasureInfo = new IndexParamCacheInfoBO();
            BeanUtil.copyProperties(k, indexMeasureInfo);
            return indexMeasureInfo;
        }).collect(Collectors.groupingBy(IndexParamCacheInfoBO::getCombinationIndexCode));

        redisOperations.executePipelined((RedisCallback) connection -> {
            indexParamMap.forEach((k, v) -> {
                Map<byte[], byte[]> byteMap = v.stream().collect(Collectors.toMap(x -> hashKeySerializer.serialize(x.getParamCode()), y -> hashValueSerializer.serialize(y), (v1, v2) -> v1));
                connection.hMSet(keySerializer.serialize(appId + StrUtil.COLON + POINT_PREFIX + StrUtil.COLON + k), byteMap);
            });
            return null;
        });
        long endCache = System.currentTimeMillis();
        long end = System.currentTimeMillis();
//        log.info("指标参数缓存初始化结束,清空缓存耗时{}ms,查询数据库耗时{}ms,插入缓存耗时{}ms,共{}条,耗时{}ms", endDelete - startDelete, endMysql - startMysql, endCache - startCache, indexParamMap.size(), end - start);
        log.info("指标参数缓存初始化结束,耗时:{}ms", end - start);
    }
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐