Redis JedisCluster 中使用PipeLine
Redis、Java
·
Jedis的pipeline的原理
通过pipeline对redis的所有操作命令,都会先放到一个List中,当pipeline直接执行或者通过jedis.close()调用sync()的时候,所有的命令都会一次性地发送到客户端,并且每个操作命令返回一个response,通过get来获取操作结果。
Jedis对Redis Cluster提供了JedisCluster客户端,但是没有Pipeline模式,那么JedisCluster为什么不支持Pipeline?
在redis中一共有16384个Slot,每个节点负责一部分Slot,当对Key进行操作时,redis会通过CRC16计算出key对应的Slot,将Key映射到Slot所在节点上执行操作。因为不同Key映射的节点不同,所以JedisCluster需要持有Redis Cluster每个节点的连接才能执行操作,而Pipeline是面向于一个redis连接的执行模式,如果在集群中使用Pipeline,批量操作的数据可能会在不同redis的各个节点上,所以JedisCluster并没有支持Pipeline。
思考与实现
1.翻看源码发现JedisCluster本质上是基于Jedis实现的,而Jedis是支持Pipeline模式的,这一点为JedisCluster实现Pipeline操作提供了可能性;
2.在JedisCluster为什么不支持Pipeline的解释中我们得知是因为集群的批量处理可能会要操作不同的Redis,那如果我们可以将操作的数据根据对应的Redis进行归类,再分类使用Pipeline提交数据,这样虽然不能通过Pipeline一次性将所有数据提交,但属于同一Redis节点的数据使用Pipeline也会提高效率;
3.基本思路确定:通过key获取对应的JedisPool,从JedisPool中拿取一个Jedis开启Pipeline;
redis通过key最终确定redis节点的原理:
redis会通过CRC16计算出key对应的Slot,每个Slot都会映射在对应的Redis节点上,每个节点链接都会对应一个JedisPool
4.我们看一下JedisCluster类关系图,是否能找到获取JedisPool的方法
JedisClusterInfoCache中发现两个关键Map,nodes和slots,并且提供了getSlotPool(int slot)方法,根据slot获取对应的JedisPool
在JedisCluster中发现,JedisCluster继承了BinaryJedisCluster,在BinaryJedisCluster中有一个JedisClusterConnectionHandler对象connectionHandler,而JedisClusterInfoCache对象就存在于connectionHandler中,但他们都被定义成protected的属性,没有办法直接访问,所以要想拿到JedisClusterInfoCache对象cache,只能通过反射实现。
下面开始自己实现Pipeline,步骤如下:
1.反射获取
JedisClusterInfoCache和
JedisSlotBasedConnectionHandler
2.根据由key计算得来Slot获取到JedisPool,并在一次批量处理中存储下来,便于相同节点链接重复使用
3.获取Jedis并开启Pipeline,将Jedis也存储下来,在批量处理完后统一释放
4.提供一个基于Pipeline的计数器,设置一个最大值,达到最大值时提交Pipeline中
注意:由于集群随时可能会发生变化,在获取JedisPool对象时,如果获取不到,调用
JedisSlotBasedConnectionHandler.renewSlotCache()刷新后再获取一次
利用JedisCluster去封装一个具有Pipeline模式的客户端
由于项目集成了mybatis包,这里使用MeteObject.getValue()来获取JedisClusterInfoCache和JedisSlotBasedConnectionHandler
package com.qx.it.util.redis;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisNoReachableClusterNodeException;
import redis.clients.jedis.util.JedisClusterCRC16;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* 基于JedisCluster实现管道的使用
* 核心对象:JedisClusterInfoCache和JedisSlotBasedConnectionHandler
* 使用构造方法将JedisCluster对象传递进来
* @author CK
* @date 2022/07/05
*/
public class JedisClusterPipeline {
/**
* 构造方法
* 通过JedisCluster获取JedisClusterInfoCache和JedisSlotBasedConnectionHandler
* @param jedisCluster
*/
public JedisClusterPipeline(JedisCluster jedisCluster){
this.jedisCluster = jedisCluster;
MetaObject metaObject = SystemMetaObject.forObject(jedisCluster);
clusterInfoCache = (JedisClusterInfoCache)metaObject.getValue("connectionHandler.cache");
connectionHandler = (JedisSlotBasedConnectionHandler)metaObject.getValue("connectionHandler");
}
/** 管道命令提交阈值 */
private final int MAX_COUNT = 10000;
/** Redis集群缓存信息对象 Jedis提供*/
private JedisClusterInfoCache clusterInfoCache;
/** Redis链接处理对象 继承于JedisClusterConnectionHandler,对其提供友好的调用方法 Jedis提供 */
private JedisSlotBasedConnectionHandler connectionHandler;
/** Redis集群操作对象 Jedis提供 */
private JedisCluster jedisCluster;
/** 存储获取的Jedis对象,用于统一释放对象 */
private CopyOnWriteArrayList<Jedis> jedisList = new CopyOnWriteArrayList();
/** 存储获取的Jedis连接池对象与其对应开启的管道,用于保证slot(哈希槽)对应的节点链接的管道只被开启一次 */
private ConcurrentHashMap<JedisPool, Pipeline> pipelines = new ConcurrentHashMap<>();
/** 存储每个开启的管道需要处理的命令(数据)数,当计数达到提交阈值时进行提交 */
private ConcurrentHashMap<Pipeline, Integer> nums = new ConcurrentHashMap<>();
public void hsetByPipeline(String key, String field, String value){
Pipeline pipeline = getPipeline(key);
pipeline.hset(key, field, value);
nums.put(pipeline, nums.get(pipeline) + 1);
this.maxSync(pipeline);
}
/**
* 释放获取的Jedis链接
* 释放的过程中会强制执行PipeLine sync
*/
public void releaseConnection() {
jedisList.forEach(jedis -> jedis.close());
}
/**
* 获取JedisPool
* 第一次获取不到尝试刷新缓存的SlotPool再获取一次
* @param key
* @return
*/
private JedisPool getJedisPool(String key){
/** 通过key计算出slot */
int slot = JedisClusterCRC16.getSlot(key);
/** 通过slot获取到对应的Jedis连接池 */
JedisPool jedisPool = clusterInfoCache.getSlotPool(slot);
if(null != jedisPool){
return jedisPool;
}else{
/** 刷新缓存的SlotPool */
connectionHandler.renewSlotCache();
jedisPool = clusterInfoCache.getSlotPool(slot);
if (jedisPool != null) {
return jedisPool;
} else {
throw new JedisNoReachableClusterNodeException("No reachable node in cluster for slot " + slot);
}
}
}
/**
* 获取Pipeline对象
* 缓存在pipelines中,保证集群中同一节点的Pipeline只被开启一次
* 管道第一次开启,jedisList,pipelines,nums存入与该管道相关信息
* @param key
* @return
*/
private Pipeline getPipeline(String key){
JedisPool jedisPool = getJedisPool(key);
/** 检查管道是否已经开启 */
Pipeline pipeline = pipelines.get(jedisPool);
if(null == pipeline){
Jedis jedis = jedisPool.getResource();
pipeline = jedis.pipelined();
jedisList.add(jedis);
pipelines.put(jedisPool, pipeline);
nums.put(pipeline, 0);
}
return pipeline;
}
/**
* 管道对应的命令计数,并在达到阈值时触发提交
* 提交后计数归零
* @param pipeline
* @return
*/
private void maxSync(Pipeline pipeline){
Integer num = nums.get(pipeline);
if(null != num){
if(num % MAX_COUNT == 0){
pipeline.sync();
nums.put(pipeline, 0);
}
}
}
}
测试:一共条数据,数据格式使用HASH,分别使用JedisCluster模式和自定的PipeLine模式执行插入,看最终的耗时
方式一:使用JedisCluster一条一条的写入redis
Long currentTime = Calendar.getInstance().getTime().getTime();
LOGGER.info("开始写入redis,开始时间:" +
DateUtil.getDate(currentTime));
userFriendMap.forEach((key, value) -> {
value.forEach(item -> {
item.setWriteTime(new Date(currentTime));
jedisCluster.hset(GlobalCacheKeyConstants.H_USER_FRIEND_KEY + key, item.getFriendId(), JSON.toJSONString(item));
});
});
Long endTime = Calendar.getInstance().getTime().getTime();
LOGGER.info("数据写入结束,共" + userFriendDTOList.size() + "条数据,开始时间:" +
DateUtil.getDate(endTime) + ",总耗时:" + (endTime - currentTime) / 1000 + "S");
结果:
方式二:使用自己封装的Pipeline模式
Long currentTime = Calendar.getInstance().getTime().getTime();
LOGGER.info("开始写入redis,开始时间:" + DateUtil.getDate(currentTime));
JedisClusterPipeline jedisClusterPipeline = new JedisClusterPipeline(jedisCluster);
userFriendMap.forEach((key, value) -> {
String redisKey = GlobalCacheKeyConstants.H_USER_FRIEND_KEY + key;
value.forEach(item -> {
item.setWriteTime(new Date(currentTime));
jedisClusterPipeline.hsetByPipeline(redisKey, item.getFriendId(), JSON.toJSONString(item));
});
});
jedisClusterPipeline.releaseConnection();
Long endTime = Calendar.getInstance().getTime().getTime();
LOGGER.info("数据写入结束,共" + userFriendDTOList.size() + "条数据,开始时间:" +
DateUtil.getDate(endTime) + ",总耗时:" + (endTime - currentTime) + "ms");
结果:
结论:
可以看到测试结果,如果大量的数据写入缓存,使用JedisCluster便利插入效率真是很低下,3000+数据足足用了200+秒
而使用Pipeline的方式,有很明显的差距,486ms,可以接受!
更多推荐
已为社区贡献1条内容
所有评论(0)