一、Jedis简介

1、Jedis对应Redis的四种工作模式

对应关系如下:

Jedis主要模块Redis工作模式
JedisRedis Standalone(单节点模式)
JedisClusterRedis Cluster(集群模式)
JedisSentinelRedis Sentinel(哨兵模式)
ShardedJedisRedis Sharding(分片模式)

2、Jedis三种请求模式

Jedis实例有3种请求模式:Client、Pipeline和Transaction

Jedis实例通过Socket建立客户端与服务端的长连接,往outputStream发送命令,从inputStream读取回复

在这里插入图片描述

1)、Client模式

客户端发一个命令,阻塞等待服务端执行,然后读取返回结果

2)、Pipeline模式

一次性发送多个命令,最后一次取回所有的返回结果

这种模式通过减少网络的往返时间和IO的读写次数,大幅度提高通信性能

原生批命令(mset、mget)与Pipeline对比

  • 原生批量命令是原子性,Pipeline是非原子性的
  • 原生批量命令是一个命令对应多个key,Pipeline支持多个命令
  • 原生批量命令是Redis服务端支持实现的,而Pipeline需要服务端与客户端的共同实现
3)、Transaction模式

Transaction模式即开启Redis的事务管理

Redis事务可以一次执行多个命令, 并且带有以下三个重要的保证:

  • 批量操作在发送EXEC命令前被放入队列缓存
  • 收到EXEC命令后进入事务执行,事务中任意命令执行失败,其余的命令依然被执行
  • 在事务执行过程,其他客户端提交的命令请求不会插入到事务执行命令序列中

一个事务从开始到执行会经历以下三个阶段:

  • 开始事务
  • 命令入队
  • 执行事务

Redis事务相关命令:

  • multi:标记一个事务块的开始
  • exec:执行所有事务块的命令(一旦执行exec后,之前加的监控锁都会被取消掉)
  • discard:取消事务,放弃事务块中的所有命令
  • watch key1 key2 ...:监视一或多个key,如果在事务执行之前,被监视的key被其他命令改动,则事务被打断(类似乐观锁)
  • unwatch:取消watch对所有key的监控

二、Jedis模块源码解析

1、Jedis类结构

在这里插入图片描述

上图中,标为橘色的类为核心类

  1. Jedis以输入的命令参数是否为二进制,将处理请求的具体实现分为两个类中,例如Jedis和BinaryJedis、Client和BinaryClient
  2. 与Redis服务器的连接信息封装在Client的基类Connection中
  3. BinaryJedis类中包含了Client、Pipeline和Transaction变量,对应3种请求模式

2、Jedis的初始化流程

        Jedis jedis = new Jedis("localhost", 6379, 10000);

        Transaction transaction = jedis.multi();

        Pipeline pipeline = jedis.pipelined();

Jedis通过传入Redis服务器地址(host、port)开始初始化,然后在BinaryJedis里实例化Client。Client通过Socket维持客户端与Redis服务器的连接
Transaction和Pipeline继承同一个基类MultiKeyPipelineBase。Transaction在实例化的时候,就自动发送MULTI命令,开启事务模式;Pipeline按情况手动开启。它们均依靠Client发送命令

public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
    AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {

  protected final Client client;
  protected Transaction transaction = null;
  protected Pipeline pipeline = null;

  public BinaryJedis(final String host, final int port) {
    client = new Client(host, port);
  }

  public BinaryJedis(final HostAndPort hostPort, final JedisClientConfig config) {
    client = new Client(hostPort, config);
    initializeFromClientConfig(config);
  }

  public Transaction multi() {
    client.multi();
    client.getOne(); // expected OK
    transaction = new Transaction(client);
    return transaction;
  }

  public Pipeline pipelined() {
    pipeline = new Pipeline();
    pipeline.setClient(client);
    return pipeline;
  }

3、Jedis调用流程

1)、Client模式

get(key)为例,Jedis代码如下:

public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
    AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands,
    ModuleCommands {

	@Override
  public String get(final String key) {
    // 校验是否是transaction模式或者pipeline模式
    checkIsInMultiOrPipeline();
    // 1)调用client发送命令
    client.get(key);
    // 2)从inputStream里读取回复
    return client.getBulkReply();
  }

代码1)处会调用Connection的sendCommand()方法,再调用Protocol的sendCommand()方法按照Redis的同一请求协议组织Redis命令,写入outputStream

public class Connection implements Closeable {
  
  public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {
    try {
      // 连接socket,如果已连接,跳过
      connect();
      // 按照redis的同一请求协议组织redis命令,写入outputStream
      Protocol.sendCommand(outputStream, cmd, args);
    } catch (JedisConnectionException ex) {
      /*
       * When client send request which formed by invalid protocol, Redis send back error message
       * before close connection. We try to read it to provide reason of failure.
       */
      try {
        String errorMessage = Protocol.readErrorLineIfPossible(inputStream);
        if (errorMessage != null && errorMessage.length() > 0) {
          ex = new JedisConnectionException(errorMessage, ex.getCause());
        }
      } catch (Exception e) {
        /*
         * Catch any IOException or JedisConnectionException occurred from InputStream#read and just
         * ignore. This approach is safe because reading error message is optional and connection
         * will eventually be closed.
         */
      }
      // Any other exceptions related to connection?
      broken = true;
      throw ex;
    }
  }

代码2)处调用Connection的getBulkReply()方法,再调用Protocol的read()方法从inputStream中读取服务器的回复,此处会阻塞等待

public class Connection implements Closeable {
  
  public String getBulkReply() {
    final byte[] result = getBinaryBulkReply();
    if (null != result) {
      return SafeEncoder.encode(result);
    } else {
      return null;
    }
  }

  public byte[] getBinaryBulkReply() {
    flush();
    return (byte[]) readProtocolWithCheckingBroken();
  }

  protected Object readProtocolWithCheckingBroken() {
    if (broken) {
      throw new JedisConnectionException("Attempting to read from a broken connection");
    }

    try {
      // 从inputStream中读取服务器的回复,此处阻塞等待
      return Protocol.read(inputStream);
    } catch (JedisConnectionException exc) {
      broken = true;
      throw exc;
    }
  }

在这里插入图片描述

Protocol是一个通讯工具类,将Redis的各类执行关键字存储为静态变量,可以直观调用命令,例如Protocol.Command.GET。同时,将命令包装成符合Redis的统一请求协议,回复消息的处理也是在这个类进行,先通过通讯协提取出当次请求的回复消息,将Object类型的消息,格式化为String、List等具体类型,如果回复消息有Error则以异常的形式抛出

Protocol核心方法如下

public final class Protocol {
  
  // 按照redis的同一请求协议组织redis命令,写入outputStream
  public static void sendCommand(final RedisOutputStream os, final ProtocolCommand command,
      final byte[]... args)  
    
  // 从inputStream中读取服务器的回复,此处阻塞等待,调用process()方法处理消息
  public static Object read(final RedisInputStream is)
    
  // 通过检查服务器发回数据的第一个字节,确定这个回复是什么类型,分别交给下面5个函数处理  
  private static Object process(final RedisInputStream is) 
    
  // 处理状态回复  
  private static byte[] processStatusCodeReply(final RedisInputStream is)
   
  // 处理批量回复  
  private static byte[] processBulkReply(final RedisInputStream is)
    
  // 处理多条批量回复  
  private static List<Object> processMultiBulkReply(final RedisInputStream is)
  
  // 处理整数回复
  private static Long processInteger(final RedisInputStream is)  
    
  // 处理错误回复  
  private static void processError(final RedisInputStream is)    
2)、Pipeline模式和Transaction模式

在这里插入图片描述

  1. Pipeline和Transaction都继承自MultiKeyPipelineBase
  2. MultiKeyPipelineBase和PipelineBase的区别在于处理的命令不同,内部均调用Client发送命令
  3. Pipeline有一个内部类对象MultiResponseBuilder,当调用Pipeline的sync()之前,存储所有返回结果

Pipeline的使用方法:

        Pipeline pipeline = jedis.pipelined();
        Response<String> key1 = pipeline.get("key1");
        Response<String> key2 = pipeline.get("key2");
        Response<String> key3 = pipeline.get("key3");
        pipeline.sync();
        System.out.println("value1:" + key1.get() + ",value2:" + key2.get() + ",value3:" + key3.get());

Transaction的使用方法:

        Transaction transaction = jedis.multi();
        Response<String> key1 = transaction.get("key1");
        Response<String> key2 = transaction.get("key2");
        Response<String> key3 = transaction.get("key3");
        transaction.exec();
        System.out.println("value1:" + key1.get() + ",value2:" + key2.get() + ",value3:" + key3.get());

get(key)为例,Pipeline代码如下:

public abstract class PipelineBase extends Queable implements BinaryRedisPipeline, RedisPipeline {
  
  @Override
  public Response<String> get(final String key) {
    // 调用client发送命令
    getClient(key).get(key);
    // 新建response,放入消息队列queue,此时response没有数据
    return getResponse(BuilderFactory.STRING);
  }
public class Queable {
  private Queue<Response<?>> pipelinedResponses = new LinkedList<>();
  
  protected <T> Response<T> getResponse(Builder<T> builder) {
    Response<T> lr = new Response<>(builder);
    pipelinedResponses.add(lr);
    return lr;
  }

Pipeline的sync()方法代码如下:

public class Pipeline extends MultiKeyPipelineBase implements Closeable {
  
  public void sync() {
    // 判断消息队列是否为空,是否发出请求
    if (getPipelinedResponseLength() > 0) {
      // 1)从inputStream中获取回复消息,消息塞入消息队列的response中
      List<Object> unformatted = client.getMany(getPipelinedResponseLength());
      for (Object o : unformatted) {
        generateResponse(o);
      }
    }
  }

代码1)会调用Connection的getMany()方法,代码如下:

public class Connection implements Closeable {
  
	public List<Object> getMany(final int count) {
    flush();
    final List<Object> responses = new ArrayList<>(count);
    for (int i = 0; i < count; i++) {
      try {
        responses.add(readProtocolWithCheckingBroken());
      } catch (JedisDataException e) {
        responses.add(e);
      }
    }
    return responses;
  }

  protected Object readProtocolWithCheckingBroken() {
    if (broken) {
      throw new JedisConnectionException("Attempting to read from a broken connection");
    }

    try {
      // 从inputStream中读取服务器的回复,此处阻塞等待
      return Protocol.read(inputStream);
    } catch (JedisConnectionException exc) {
      broken = true;
      throw exc;
    }
  }

Pipeline的sync()方法中调用Queable的方法如下:

public class Queable {
  private Queue<Response<?>> pipelinedResponses = new LinkedList<>();
  
  protected Response<?> generateResponse(Object data) {
    Response<?> response = pipelinedResponses.poll();
    if (response != null) {
      response.set(data);
    }
    return response;
  }
  
  protected int getPipelinedResponseLength() {
    return pipelinedResponses.size();
  }  

在这里插入图片描述

Transaction的exec()方法和Pipeline的sync()很相似,代码如下:

public class Transaction extends MultiKeyPipelineBase implements Closeable {
  
  public List<Object> exec() {
    // 清空inputstream里面的所有数据,忽略QUEUED or ERROR回复
    client.getMany(getPipelinedResponseLength());
    // 发送EXEC指令,让服务端执行所有命令
    client.exec();
    // 事务结束
    inTransaction = false;

    // 从inputStream中读取所有回复
    List<Object> unformatted = client.getObjectMultiBulkReply();
    if (unformatted == null) {
      return null;
    }
    // 处理响应结果
    List<Object> formatted = new ArrayList<>();
    for (Object o : unformatted) {
      try {
        formatted.add(generateResponse(o).get());
      } catch (JedisDataException e) {
        formatted.add(e);
      }
    }
    return formatted;
  }

参考

Jedis源码分析(一)-Jedis介绍

Jedis源码分析(二)-Jedis的内部实现(Client,Pipeline,Transaction)

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐