Jedis源码解析(一):Jedis简介、Jedis模块源码解析
一、Jedis简介1、Jedis对应Redis的四种工作模式对应关系如下:Jedis主要模块Redis工作模式JedisRedis Standalone(单节点模式)JedisClusterRedis Cluster(集群模式)JedisSentinelRedis Sentinel(哨兵模式)ShardedJedisRedis Sharding(分片模式)2、Jedis三种请求模式Jedis实例有
一、Jedis简介
1、Jedis对应Redis的四种工作模式
对应关系如下:
Jedis主要模块 | Redis工作模式 |
---|---|
Jedis | Redis Standalone(单节点模式) |
JedisCluster | Redis Cluster(集群模式) |
JedisSentinel | Redis Sentinel(哨兵模式) |
ShardedJedis | Redis Sharding(分片模式) |
2、Jedis三种请求模式
Jedis实例有3种请求模式:Client、Pipeline和Transaction
Jedis实例通过Socket建立客户端与服务端的长连接,往outputStream发送命令,从inputStream读取回复
1)、Client模式
客户端发一个命令,阻塞等待服务端执行,然后读取返回结果
2)、Pipeline模式
一次性发送多个命令,最后一次取回所有的返回结果
这种模式通过减少网络的往返时间和IO的读写次数,大幅度提高通信性能
原生批命令(mset、mget)与Pipeline对比
- 原生批量命令是原子性,Pipeline是非原子性的
- 原生批量命令是一个命令对应多个key,Pipeline支持多个命令
- 原生批量命令是Redis服务端支持实现的,而Pipeline需要服务端与客户端的共同实现
3)、Transaction模式
Transaction模式即开启Redis的事务管理
Redis事务可以一次执行多个命令, 并且带有以下三个重要的保证:
- 批量操作在发送EXEC命令前被放入队列缓存
- 收到EXEC命令后进入事务执行,事务中任意命令执行失败,其余的命令依然被执行
- 在事务执行过程,其他客户端提交的命令请求不会插入到事务执行命令序列中
一个事务从开始到执行会经历以下三个阶段:
- 开始事务
- 命令入队
- 执行事务
Redis事务相关命令:
multi
:标记一个事务块的开始exec
:执行所有事务块的命令(一旦执行exec后,之前加的监控锁都会被取消掉)discard
:取消事务,放弃事务块中的所有命令watch key1 key2 ...
:监视一或多个key,如果在事务执行之前,被监视的key被其他命令改动,则事务被打断(类似乐观锁)unwatch
:取消watch对所有key的监控
二、Jedis模块源码解析
1、Jedis类结构
上图中,标为橘色的类为核心类
- Jedis以输入的命令参数是否为二进制,将处理请求的具体实现分为两个类中,例如Jedis和BinaryJedis、Client和BinaryClient
- 与Redis服务器的连接信息封装在Client的基类Connection中
- BinaryJedis类中包含了Client、Pipeline和Transaction变量,对应3种请求模式
2、Jedis的初始化流程
Jedis jedis = new Jedis("localhost", 6379, 10000);
Transaction transaction = jedis.multi();
Pipeline pipeline = jedis.pipelined();
Jedis通过传入Redis服务器地址(host、port)开始初始化,然后在BinaryJedis里实例化Client。Client通过Socket维持客户端与Redis服务器的连接
Transaction和Pipeline继承同一个基类MultiKeyPipelineBase。Transaction在实例化的时候,就自动发送MULTI命令,开启事务模式;Pipeline按情况手动开启。它们均依靠Client发送命令
public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {
protected final Client client;
protected Transaction transaction = null;
protected Pipeline pipeline = null;
public BinaryJedis(final String host, final int port) {
client = new Client(host, port);
}
public BinaryJedis(final HostAndPort hostPort, final JedisClientConfig config) {
client = new Client(hostPort, config);
initializeFromClientConfig(config);
}
public Transaction multi() {
client.multi();
client.getOne(); // expected OK
transaction = new Transaction(client);
return transaction;
}
public Pipeline pipelined() {
pipeline = new Pipeline();
pipeline.setClient(client);
return pipeline;
}
3、Jedis调用流程
1)、Client模式
以get(key)
为例,Jedis代码如下:
public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands,
ModuleCommands {
@Override
public String get(final String key) {
// 校验是否是transaction模式或者pipeline模式
checkIsInMultiOrPipeline();
// 1)调用client发送命令
client.get(key);
// 2)从inputStream里读取回复
return client.getBulkReply();
}
代码1)处会调用Connection的sendCommand()
方法,再调用Protocol的sendCommand()
方法按照Redis的同一请求协议组织Redis命令,写入outputStream
public class Connection implements Closeable {
public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {
try {
// 连接socket,如果已连接,跳过
connect();
// 按照redis的同一请求协议组织redis命令,写入outputStream
Protocol.sendCommand(outputStream, cmd, args);
} catch (JedisConnectionException ex) {
/*
* When client send request which formed by invalid protocol, Redis send back error message
* before close connection. We try to read it to provide reason of failure.
*/
try {
String errorMessage = Protocol.readErrorLineIfPossible(inputStream);
if (errorMessage != null && errorMessage.length() > 0) {
ex = new JedisConnectionException(errorMessage, ex.getCause());
}
} catch (Exception e) {
/*
* Catch any IOException or JedisConnectionException occurred from InputStream#read and just
* ignore. This approach is safe because reading error message is optional and connection
* will eventually be closed.
*/
}
// Any other exceptions related to connection?
broken = true;
throw ex;
}
}
代码2)处调用Connection的getBulkReply()
方法,再调用Protocol的read()
方法从inputStream中读取服务器的回复,此处会阻塞等待
public class Connection implements Closeable {
public String getBulkReply() {
final byte[] result = getBinaryBulkReply();
if (null != result) {
return SafeEncoder.encode(result);
} else {
return null;
}
}
public byte[] getBinaryBulkReply() {
flush();
return (byte[]) readProtocolWithCheckingBroken();
}
protected Object readProtocolWithCheckingBroken() {
if (broken) {
throw new JedisConnectionException("Attempting to read from a broken connection");
}
try {
// 从inputStream中读取服务器的回复,此处阻塞等待
return Protocol.read(inputStream);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
}
}
Protocol是一个通讯工具类,将Redis的各类执行关键字存储为静态变量,可以直观调用命令,例如Protocol.Command.GET
。同时,将命令包装成符合Redis的统一请求协议,回复消息的处理也是在这个类进行,先通过通讯协提取出当次请求的回复消息,将Object类型的消息,格式化为String、List等具体类型,如果回复消息有Error则以异常的形式抛出
Protocol核心方法如下:
public final class Protocol {
// 按照redis的同一请求协议组织redis命令,写入outputStream
public static void sendCommand(final RedisOutputStream os, final ProtocolCommand command,
final byte[]... args)
// 从inputStream中读取服务器的回复,此处阻塞等待,调用process()方法处理消息
public static Object read(final RedisInputStream is)
// 通过检查服务器发回数据的第一个字节,确定这个回复是什么类型,分别交给下面5个函数处理
private static Object process(final RedisInputStream is)
// 处理状态回复
private static byte[] processStatusCodeReply(final RedisInputStream is)
// 处理批量回复
private static byte[] processBulkReply(final RedisInputStream is)
// 处理多条批量回复
private static List<Object> processMultiBulkReply(final RedisInputStream is)
// 处理整数回复
private static Long processInteger(final RedisInputStream is)
// 处理错误回复
private static void processError(final RedisInputStream is)
2)、Pipeline模式和Transaction模式
- Pipeline和Transaction都继承自MultiKeyPipelineBase
- MultiKeyPipelineBase和PipelineBase的区别在于处理的命令不同,内部均调用Client发送命令
- Pipeline有一个内部类对象MultiResponseBuilder,当调用Pipeline的
sync()
之前,存储所有返回结果
Pipeline的使用方法:
Pipeline pipeline = jedis.pipelined();
Response<String> key1 = pipeline.get("key1");
Response<String> key2 = pipeline.get("key2");
Response<String> key3 = pipeline.get("key3");
pipeline.sync();
System.out.println("value1:" + key1.get() + ",value2:" + key2.get() + ",value3:" + key3.get());
Transaction的使用方法:
Transaction transaction = jedis.multi();
Response<String> key1 = transaction.get("key1");
Response<String> key2 = transaction.get("key2");
Response<String> key3 = transaction.get("key3");
transaction.exec();
System.out.println("value1:" + key1.get() + ",value2:" + key2.get() + ",value3:" + key3.get());
以get(key)
为例,Pipeline代码如下:
public abstract class PipelineBase extends Queable implements BinaryRedisPipeline, RedisPipeline {
@Override
public Response<String> get(final String key) {
// 调用client发送命令
getClient(key).get(key);
// 新建response,放入消息队列queue,此时response没有数据
return getResponse(BuilderFactory.STRING);
}
public class Queable {
private Queue<Response<?>> pipelinedResponses = new LinkedList<>();
protected <T> Response<T> getResponse(Builder<T> builder) {
Response<T> lr = new Response<>(builder);
pipelinedResponses.add(lr);
return lr;
}
Pipeline的sync()
方法代码如下:
public class Pipeline extends MultiKeyPipelineBase implements Closeable {
public void sync() {
// 判断消息队列是否为空,是否发出请求
if (getPipelinedResponseLength() > 0) {
// 1)从inputStream中获取回复消息,消息塞入消息队列的response中
List<Object> unformatted = client.getMany(getPipelinedResponseLength());
for (Object o : unformatted) {
generateResponse(o);
}
}
}
代码1)会调用Connection的getMany()
方法,代码如下:
public class Connection implements Closeable {
public List<Object> getMany(final int count) {
flush();
final List<Object> responses = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
try {
responses.add(readProtocolWithCheckingBroken());
} catch (JedisDataException e) {
responses.add(e);
}
}
return responses;
}
protected Object readProtocolWithCheckingBroken() {
if (broken) {
throw new JedisConnectionException("Attempting to read from a broken connection");
}
try {
// 从inputStream中读取服务器的回复,此处阻塞等待
return Protocol.read(inputStream);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
}
}
Pipeline的sync()
方法中调用Queable的方法如下:
public class Queable {
private Queue<Response<?>> pipelinedResponses = new LinkedList<>();
protected Response<?> generateResponse(Object data) {
Response<?> response = pipelinedResponses.poll();
if (response != null) {
response.set(data);
}
return response;
}
protected int getPipelinedResponseLength() {
return pipelinedResponses.size();
}
Transaction的exec()
方法和Pipeline的sync()
很相似,代码如下:
public class Transaction extends MultiKeyPipelineBase implements Closeable {
public List<Object> exec() {
// 清空inputstream里面的所有数据,忽略QUEUED or ERROR回复
client.getMany(getPipelinedResponseLength());
// 发送EXEC指令,让服务端执行所有命令
client.exec();
// 事务结束
inTransaction = false;
// 从inputStream中读取所有回复
List<Object> unformatted = client.getObjectMultiBulkReply();
if (unformatted == null) {
return null;
}
// 处理响应结果
List<Object> formatted = new ArrayList<>();
for (Object o : unformatted) {
try {
formatted.add(generateResponse(o).get());
} catch (JedisDataException e) {
formatted.add(e);
}
}
return formatted;
}
参考:
更多推荐
所有评论(0)