@Bean(name = "customRedisTemplate")
RedisTemplate<String, V> redisTemplate(RedisTemplate<String, V> redisTemplate) {
    redisTemplate.setHashKeySerializer(new StringRedisSerializer());
    redisTemplate.setKeySerializer(new StringRedisSerializer());
    redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
    redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
    return redisTemplate;
}

1 单机模式

1.1 引入依赖

<properties>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
	<spring-boot.version>2.1.0.RELEASE</spring-boot.version>
	<jedis.version>2.9.0</jedis.version>
</properties>
<dependencyManagement>
	<dependencies>
		<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-dependencies -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-dependencies</artifactId>
			<version>${spring-boot.version}</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
	</dependencies>
</dependencyManagement>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>${jedis.version}</version>
</dependency>
spring.redis.host=121.36.33.154
spring.redis.port=6379
spring.redis.password=
spring.redis.database=1

1.2 RedisTemplate执行PipeLine

@Test
public void testRedisPipeline() {
    List<Object> resultList = customRedisTemplate.executePipelined(
            new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    // 1、通过connection打开pipeline
                    connection.openPipeline();
                    // 2、给本次pipeline添加一次性要执行的多条命令

                    // 2.1、一个 set key value 的操作
                    byte[] key = "name".getBytes();
                    byte[] value = "qinyi".getBytes();
                    connection.set(key, value);

                    // 2.2、执行一个错误的命令
                    connection.lPop("xyzabc".getBytes());

                    // 2.3、mset 操作
                    Map<byte[], byte[]> tuple = new HashMap<>();
                    tuple.put("id".getBytes(), "1".getBytes());
                    tuple.put("age".getBytes(), "19".getBytes());
                    connection.mSet(tuple);

                    /**
                     * 1、不能关闭pipeline
                     * 2、返回值为null
                     */
                    // 3. 关闭 pipeline
                    // connection.closePipeline();
                    return null;
                }
            }
    );
    resultList.forEach(System.out::println);
}
true
null
true

集群模式下会报如下错:
java.lang.UnsupportedOperationException: Pipeline is currently not supported for JedisClusterConnection.

1.3 RedisTemplate批量获取值的2种方式

import org.springframework.util.StopWatch;
@Autowired
private RedisTemplate customRedisTemplate;
@Test
public void redisPipeline() {
    StopWatch stopWatch = new StopWatch("redis测试");

    stopWatch.start("初始化key");
    RedisSerializer keySerializer = customRedisTemplate.getKeySerializer();
    List<String> redisKey = new ArrayList<>();
    HashMap<String, RedisUser> redisData = new HashMap<>();
    for (int i = 0; i < 100; i++) {
        redisData.put("xxx-yyy-zzz:pipLine:" + i, RedisUser.builder().userId(i).name(i + "").build());
        redisKey.add("xxx-yyy-zzz:pipLine:" + i);
    }
    customRedisTemplate.opsForValue().multiSet(redisData);
    stopWatch.stop();

    stopWatch.start("multiGet");
    List multiGetResult = customRedisTemplate.opsForValue().multiGet(redisKey);
    stopWatch.stop();

    stopWatch.start("Pipeline");
    List<RedisUser> pipeLineResult = customRedisTemplate.executePipelined(
            new RedisCallback<RedisUser>() {
                @Override
                public RedisUser doInRedis(RedisConnection connection) throws DataAccessException {
                    for (String key : redisKey) {
                        connection.get(keySerializer.serialize(key));
                    }
                    return null;
                }
            }
    );
    stopWatch.stop();
    System.out.println(stopWatch.prettyPrint());
    System.out.println("pipeLineResult => " + pipeLineResult.size());
    System.out.println("multiGetResult => " + multiGetResult.size());
}
StopWatch 'redis测试': running time (millis) = 2240
-----------------------------------------
ms     %     Task name
-----------------------------------------
01210  054%  初始化key
00503  022%  multiGet
00527  024%  Pipeline

pipeLineResult => 100
multiGetResult => 100

1.4 Jedis 执行PipeLine

import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisConnectionUtils;
import org.springframework.data.redis.core.RedisTemplate;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
@Test
public void redisPipeline() throws IOException {
    RedisConnection conn = RedisConnectionUtils.getConnection(customRedisTemplate.getConnectionFactory());
    Jedis jedis = (Jedis) conn.getNativeConnection();

    // 批量插入
    HashMap<String, RedisUser> redisData = new HashMap<>();
    for (int i = 0; i < 1000; i++) {
        redisData.put("xxx-yyy-zzz:pipLine:" + i, RedisUser.builder().userId(i).build());
    }
    Pipeline wirtePipeline = jedis.pipelined();
    for (Map.Entry<String, RedisUser> entry : redisData.entrySet()) {
        wirtePipeline.set(entry.getKey(), JSON.toJSONString(entry.getValue()));
    }
    wirtePipeline.sync();

    // 批量读取
    List<Response<byte[]>> responses = new ArrayList<>();
    Pipeline readPipeline = jedis.pipelined();
    for (String key : redisData.keySet()) {
        responses.add(readPipeline.get(key.getBytes(StandardCharsets.UTF_8)));
    }
    readPipeline.close();
    for (Response<byte[]> response : responses) {
        byte[] data = response.get();
        System.out.println(new String(data));
    }
}

2 集群模式

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RedisUser {

    private Integer userId;

    private String name;
}

2.1 集群模式执行PipeLine

@Test
public void testClusterPipLineGet() {
    HashMap<String, RedisUser> redisData = new HashMap<>();
    for (int i = 0; i < 1000; i++) {
        redisData.put("xxx-yyy-zzz:pipLine:" + i, RedisUser.builder().userId(i).build());
    }
    customRedisTemplate.opsForValue().multiSet(redisData);
    List<RedisUser> valueList = (List<RedisUser>)clusterPipLineGet(Lists.newArrayList(redisData.keySet()));
    System.out.println("valueList =>\n " + JSON.toJSONString(valueList, true));
}
public List<?> clusterPipLineGet(List<String> redisKeyList) {
    RedisSerializer<String> keySerializer = customRedisTemplate.getKeySerializer();
    RedisSerializer<?> valueSerializer = customRedisTemplate.getValueSerializer();
    // Map<redis节点, hash到该接口的key>
    HashMap<RedisClusterNode, List<String>> nodeKeyMap = new HashMap<>();
    List<Object> resultList = new ArrayList<>();
    RedisClusterConnection redisClusterConnection = customRedisTemplate.getConnectionFactory().getClusterConnection();
    try {
        //通过计算每个key的槽点,并获取相应节点
        Iterable<RedisClusterNode> redisClusterNodes = redisClusterConnection.clusterGetNodes();
        for (RedisClusterNode redisClusterNode : redisClusterNodes) {
            RedisClusterNode.SlotRange slotRange = redisClusterNode.getSlotRange();
            for (String redisKey : redisKeyList) {
                int slot = JedisClusterCRC16.getSlot(redisKey);
                if (slotRange.contains(slot)) {
                    List<String> list = nodeKeyMap.get(redisClusterNode);
                    if (null == list) {
                        list = new ArrayList<>();
                        nodeKeyMap.putIfAbsent(redisClusterNode, list);
                    }
                    list.add(redisKey);
                }
            }
        }
        // 每个节点执行pipeLine命令
        for (Map.Entry<RedisClusterNode, List<String>> clusterNodeEntry : nodeKeyMap.entrySet()) {
            RedisClusterNode redisClusterNode = clusterNodeEntry.getKey();
            List<String> nodeKeyList = clusterNodeEntry.getValue();
            JedisPool jedisPool = ((JedisCluster) redisClusterConnection.getNativeConnection()).getClusterNodes().get(new HostAndPort(redisClusterNode.getHost(), redisClusterNode.getPort()).toString());
            Jedis jedis = jedisPool.getResource();
            List<Response<byte[]>> responses = new ArrayList<>();
            try {
                Pipeline pipeline = jedis.pipelined();
                for (String nodeKey : nodeKeyList) {
                    responses.add(pipeline.get(keySerializer.serialize(nodeKey)));
                }
                pipeline.close();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                jedis.close();
            }
            for (Response<byte[]> response : responses) {
                byte[] data = response.get();
                resultList.add(valueSerializer.deserialize(data));
            }
        }
    } finally {
        RedisConnectionUtils.releaseConnection(redisClusterConnection, customRedisTemplate.getConnectionFactory());
    }
    return resultList;
}

2.2 集群模式下性能测试

@Test
public void redisPipeline() throws InterruptedException {
    StopWatch stopWatch = new StopWatch("redis测试");
    int testSize = 10000;
    /**
     * 批量插入 ??
     */
    stopWatch.start("构建key");
    ArrayList<String> redisKey = new ArrayList<>();
    redisKey.add("xxx-yyy-zzz:pipLine:0");
    HashMap<String, RedisUser> redisData = new HashMap<>();
    for (int i = 1; i <= testSize; i++) {
        redisKey.add("xxx-yyy-zzz:pipLine:" + i);
        redisData.put("xxx-yyy-zzz:pipLine:" + i, RedisUser.builder().userId(i).build());
    }
    redisKey.add("xxx-yyy-zzz:pipLine:20001" + testSize + 1);
    stopWatch.stop();
    /**
     * 批量获取 ??
     */
    stopWatch.start("multiSet");
    customRedisTemplate.opsForValue().multiSet(redisData);
    stopWatch.stop();
    /**
     *
     */
    stopWatch.start("multiGet");
    List<RedisUser> multiGetResult = (List<RedisUser>) customRedisTemplate.opsForValue().multiGet(redisKey);
    System.out.println("multiGetResult => " + multiGetResult.size());
    System.out.println(multiGetResult.get(testSize));
    System.out.println(multiGetResult.get(testSize + 1));
    stopWatch.stop();
    /**
     * 集群模式下的PipLine
     */
    stopWatch.start("clusterPipLineGet");
    List<?> pipeLineResult = clusterPipLineGet(redisKey);
    System.out.println("clusterPipLineGet => " + pipeLineResult.size());
    System.out.println(pipeLineResult.get(testSize));
    System.out.println(pipeLineResult.get(testSize + 1));
    stopWatch.stop();

    System.out.println(stopWatch.prettyPrint());
    TimeUnit.MINUTES.sleep(10);
}
multiGetResult => 10002
RedisUser(userId=10000, name=null)
null
clusterPipLineGet => 10002
RedisUser(userId=9998, name=null)
RedisUser(userId=10000, name=null)
StopWatch 'redis测试': running time (millis) = 27644
-----------------------------------------
ms     %     Task name
-----------------------------------------
00006  000%  构建key
03951  014%  multiSet
23610  085%  multiGet
00077  000%  clusterPipLineGet

multiGet

  1. 获取到的值和入参的key是一一对应的,对应的key获取不到值就为null
  2. multiGet在集群模式下并不是批量获取的

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐