SpringBoot利用Pipeline批量插入redis数据
Pipeline的使用
·
什么是Pipeline
Pipeline简单来说就是和redis建立一个connection,类似于一个管道,用于数据连接。
使用场景
现在有个项目大批量刷新缓存的场景,redis数据类型为hash,而redisTemplate中封装的都是单个hashkey的插入,这样每次插入一个hash值就会建立一次连接,非常耗时。
redis的hash结构如下
Key HashKey HashValue
假设有10个Key,每个Key里面有10个HashKey,则需要建立100次连接,即便多线程情况下也会耗时很多。
经过测试,通过pipeline可以提升至少十倍的插入效率。
经对比,Key的数据量较大时通过一次pipeline连接插入比按key分多线程去建立多个pipeline连接要快,而pipeline中每次插入时间可以忽略不计,大部分是0ms,偶尔出现几十ms的情况。
代码示例
话不多说上代码,这是一个项目启动初始化缓存的案例。
@Resource(name = "redisTemplate")
private RedisOperations redisOperations;
private void doRefresh() {
RedisSerializer keySerializer = redisOperations.getKeySerializer();
RedisSerializer hashKeySerializer = redisOperations.getHashKeySerializer();
RedisSerializer hashValueSerializer = redisOperations.getHashValueSerializer();
log.info("开始刷新初始化指标参数缓存");
long start = System.currentTimeMillis();
//清空缓存
long startDelete = System.currentTimeMillis();
Set members = redisOperations.keys(appId + StrUtil.COLON + POINT_PREFIX.concat("*"));
if (CollUtil.isNotEmpty(members)) {
redisOperations.delete(members);
}
long endDelete = System.currentTimeMillis();
//查询数据库
long startMysql = System.currentTimeMillis();
List<IndexParamCachePO> indexParamCacheInfo = indexParamCacheMapper.getIndexParamCacheInfo();
long endMysql = System.currentTimeMillis();
//重新加载缓存
long startCache = System.currentTimeMillis();
Map<String, List<IndexParamCacheInfoBO>> indexParamMap = indexParamCacheInfo.stream().map(k -> {
IndexParamCacheInfoBO indexMeasureInfo = new IndexParamCacheInfoBO();
BeanUtil.copyProperties(k, indexMeasureInfo);
return indexMeasureInfo;
}).collect(Collectors.groupingBy(IndexParamCacheInfoBO::getCombinationIndexCode));
redisOperations.executePipelined((RedisCallback) connection -> {
indexParamMap.forEach((k, v) -> {
Map<byte[], byte[]> byteMap = v.stream().collect(Collectors.toMap(x -> hashKeySerializer.serialize(x.getParamCode()), y -> hashValueSerializer.serialize(y), (v1, v2) -> v1));
connection.hMSet(keySerializer.serialize(appId + StrUtil.COLON + POINT_PREFIX + StrUtil.COLON + k), byteMap);
});
return null;
});
long endCache = System.currentTimeMillis();
long end = System.currentTimeMillis();
// log.info("指标参数缓存初始化结束,清空缓存耗时{}ms,查询数据库耗时{}ms,插入缓存耗时{}ms,共{}条,耗时{}ms", endDelete - startDelete, endMysql - startMysql, endCache - startCache, indexParamMap.size(), end - start);
log.info("指标参数缓存初始化结束,耗时:{}ms", end - start);
}
更多推荐
所有评论(0)