nacos解析-raft算法日志复制原理
本文基于nacos-2.0.3版本nacos在集群模式下,使用内置数据库时,当发布新的配置或者更新配置时,nacos使用raft算法将配置分发给集群中的其他机器。本文将详细分析分发原理。首先说明如何使nacos在集群模式使用内置数据库:虚拟机启动参数配置为:-Dnacos.standalone=false-DembeddedStorage=true...
本文基于nacos-2.0.3版本
nacos在集群模式下,使用内置数据库时,当发布新的配置或者更新配置时,nacos使用raft算法将配置分发给集群中的其他节点。在raft算法里面,需要分发的数据被称作日志。
本文以配置分发为例介绍raft算法的日志复制原理。
nacos在集群模式使用内置数据库的方法是:
虚拟机启动参数配置为:
-Dnacos.standalone=false
-DembeddedStorage=true
1、分发原理
正常工作时,raft算法将集群中的机器分为Follower和Leader,客户端发布新配置或者更新配置必须将配置数据上传到Leader,如果客户端连接的是Follower,那么数据由该Follower负责转发。
Leader收到请求后,将数据转换为日志对象(LogEntry)并记录到本地,之后将日志分发给各个Follower。为了加快分发速度,分发各个Follower是并行进行的,且每次都是批量分发。
2、源码解析
(1)Leader分发日志
保存日志
下面我们以Leader的视角来看一下如何将日志分发给Follower。
当需要将数据发送到集群中时,nacos调用JRaftProtocol.update()方法,该方法将要数据组装成WriteRequest对象,之后调用JraftServer.commit()方法,commit()里面判断当前节点是否是Leader,如果是则直接将数据应用到本地,然后再通知其他节点,如果不是,则发送给Leader。
final Node node = tuple.node;//node表示当前节点
if (node.isLeader()) {
//当前节点为Leader,则直接将数据应用到本地,然后再通知其他节点
// The leader node directly applies this request
//data就是WriteRequest对象
applyOperation(node, data, closure);
} else {
//当前节点不是Leader,则将数据发送给Leader
// Forward to Leader for request processing
invokeToLeader(group, data, rpcRequestTimeoutMs, closure);
}
nacos使用raft算法是SOFA的JRaft。
JRaftProtocol是JRaft的启动类,也是收发集群消息的入口;JraftServer是另外一个非常重要的类,JRaftProtocol的大部分工作都是由JraftServer完成的,可以这么说,JRaftProtocol是JRaft的入口,而JraftServer负责入口后面的具体工作,包括创建与集群的网络连接,触发选举,创建节点信息等。
上面代码里面的参数node的实现类是NodeImpl,一个NodeImpl对象表示raft集群中的一个节点。对于Leader节点,数据首先要应用在本地,所以在applyOperation()方法里面,调用了NodeImpl.apply()方法处理参数data。
为了加快处理速度,NodeImpl将多个WriteRequest组成一个批次,默认32个组成一个批次,然后将该批次存入到RingBuffer中,经过中间一系列的转发,NodeImpl将数据转发给LogManagerImpl。
在JRaft里面,所有需要分发到集群中的数据都被作为日志写入文件,LogManagerImpl便是负责批量写入文件,以及管理日志文件的类。
LogManagerImpl也是将数据存放到RingBuffer,之后由StableClosureEventHandler处理器从队列里读出数据调用LogManagerImpl.appendToStorage()方法将数据写入文件。写入文件成功之后,还会调用BallotBox.commitAt(),该方法的作用主要是记录日志已经提交成功,并通知状态机。
这里有三点需要说明:
- 在写入日志文件前,所有的转发的都是通过队列完成的,这可以保证一点,日志文件的写入是有序的;
- 参数data在NodeImpl.apply()方法里面组装成LogEntry对象,写入日志的文件的便是该对象,LogEntry里面记录了通知给Follower的数据、raft的term等信息;
- nacos底层使用RocksDB存储日志。
到这里,数据被作为日志存储到了Leader本地,下面开始介绍Leader如何将数据分发出去。
创建Replicator
节点变成Leader后,需要维持与各个Follower之间的连接、发送心跳和分发数据等,这些工作全部是由Replicator完成的。一个Replicator对象可以认为代表了一个Follower节点。
下面从Replicator的创建开始介绍。
当一个节点通过选举变成Leader后,会调用NodeImpl.becomeLeader()方法,该方法内部遍历各个Follower节点,并对每个节点调用ReplicatorGroupImpl.addReplicator()方法,代码如下:
// Start follower's replicators
//遍历每个Follower节点,PeerId存储了节点的IP
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue;
}
LOG.debug("Node {} add a replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
if (!this.replicatorGroup.addReplicator(peer)) {
LOG.error("Fail to add a replicator, peer={}.", peer);
}
}
ReplicatorGroupImpl.addReplicator()在ReplicatorOptions对象中保存PeerId,并将其作为入参调用Replicator.start()来创建Replicator对象。
//该方法有两个入参,第一个入参ReplicatorOptions表示当前节点的配置信息,
//第二个入参RaftOptions表示raft集群的配置信息,比如上面提到了每32个WriteRequest组成一个批次,32就配置在RaftOptions中,
//ReplicatorOptions里面保存了心跳超时时间、Follower节点的IP、节点类型(是Follower还是Learner)等信息
public static ThreadId start(final ReplicatorOptions opts, final RaftOptions raftOptions) {
if (opts.getLogManager() == null || opts.getBallotBox() == null || opts.getNode() == null) {
throw new IllegalArgumentException("Invalid ReplicatorOptions.");
}
//创建Replicator对象
final Replicator r = new Replicator(opts, raftOptions);
//创建与节点的连接,它向Follower节点发送了一个PingRequest请求
if (!r.rpcService.connect(opts.getPeerId().getEndpoint())) {
LOG.error("Fail to init sending channel to {}.", opts.getPeerId());
// Return and it will be retried later.
return null;
}
// Register replicator metric set.
final MetricRegistry metricRegistry = opts.getNode().getNodeMetrics().getMetricRegistry();
if (metricRegistry != null) {
try {
final String replicatorMetricName = getReplicatorMetricName(opts);
if (!metricRegistry.getNames().contains(replicatorMetricName)) {
metricRegistry.register(replicatorMetricName, new ReplicatorMetricSet(opts, r));
}
} catch (final IllegalArgumentException e) {
// ignore
}
}
// Start replication
//ThreadId主要为Replicator提供加解锁功能
r.id = new ThreadId(r, r);
r.id.lock();//加锁
notifyReplicatorStatusListener(r, ReplicatorEvent.CREATED);
LOG.info("Replicator={}@{} is started", r.id, r.options.getPeerId());
r.catchUpClosure = null;
r.lastRpcSendTimestamp = Utils.monotonicMs();
r.startHeartbeatTimer(Utils.nowMs());//设置心跳超时时间,并启动心跳定时任务
// id.unlock in sendEmptyEntries
r.sendEmptyEntries(false);//向Follower节点发送一个prob信息
return r.id;
}
发送prob请求
在start()方法里面,我们看到Replicator建立连接后,紧接着向Follower节点发送了一个prob信息,prob信息的作用是获知Follower已经拥有的的日志位置,以便于向 Follower 发送后续的日志。
下面是sendEmptyEntries()方法发送prob信息的代码:
// No entries and has empty data means a probe request.
// TODO(boyan) refactor, adds a new flag field?
rb.setData(ByteString.EMPTY);//发送的是空消息
final AppendEntriesRequest request = rb.build();
// Sending a probe request.
this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
this.statInfo.firstLogIndex = this.nextIndex;
this.statInfo.lastLogIndex = this.nextIndex - 1;
this.appendEntriesCounter++;
this.state = State.Probe;//当前Replicator的状态为探测状态
final int stateVersion = this.version;
final int seq = getAndIncrementReqSeq();
//发送prob信息
final Future<Message> rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(),
request, -1, new RpcResponseClosureAdapter<AppendEntriesResponse>() {
//prob信息发送成功后,回调run()方法,run()方法是非常关键的方法,Leader分发消息是run()触发的
@Override
public void run(final Status status) {
//收到prob请求的响应后,触发该方法的调用
onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request,
getResponse(), seq, stateVersion, monotonicSendTimeMs);
}
});
//记录已经发出,但是还没有收到响应的请求,当这种请求过多时,nacos控制不再发出新请求
addInflight(RequestType.AppendEntries, this.nextIndex, 0, 0, seq, rpcFuture);
这里首先展示addInflight()方法:
private void addInflight(final RequestType reqType, final long startIndex, final int count, final int size,
final int seq, final Future<Message> rpcInfly) {
this.rpcInFly = new Inflight(reqType, startIndex, count, size, seq, rpcInfly);
this.inflights.add(this.rpcInFly);//保存双向队列中,队列里面的都是未收到响应的请求
this.nodeMetrics.recordSize("replicate-inflights-count", this.inflights.size());
}
下面我们深入看一下onRpcReturned()方法,该方法是收到prob响应后的回调方法。
//代码有删减
static void onRpcReturned(final ThreadId id, final RequestType reqType, final Status status, final Message request,
final Message response, final int seq, final int stateVersion, final long rpcSendTime) {
//代码删减
final PriorityQueue<RpcResponse> holdingQueue = r.pendingResponses;
//将Follower返回的响应信息暂存到优先级队列中
//优先级队列按照seq排序,seq是Replicator内部属性,每发送一次请求,seq加1
holdingQueue.add(new RpcResponse(reqType, seq, status, request, response, rpcSendTime));
//当收到的暂未处理的响应超过256时,nacos认为当前系统出现问题,
//将Replicator状态更改为prob,重新发送prob消息,并且清空inflights队列
if (holdingQueue.size() > r.raftOptions.getMaxReplicatorInflightMsgs()) {
LOG.warn("Too many pending responses {} for replicator {}, maxReplicatorInflightMsgs={}",
holdingQueue.size(), r.options.getPeerId(), r.raftOptions.getMaxReplicatorInflightMsgs());
r.resetInflights();
r.state = State.Probe;
r.sendEmptyEntries(false);
return;
}
boolean continueSendEntries = false;
//代码删减
try {
int processed = 0;
while (!holdingQueue.isEmpty()) {
final RpcResponse queuedPipelinedResponse = holdingQueue.peek();
// Sequence mismatch, waiting for next response.
//requiredNextSeq表示等待响应的下一个请求的序号
//如果当前响应的序号与等待的序号不相等,说明后发请求的响应先到了,那么退出当前方法,继续等待响应
if (queuedPipelinedResponse.seq != r.requiredNextSeq) {
if (processed > 0) {
//代码删减
//下面的else解锁,而这里不解锁,
//原因是processed不等于0,说明已经处理一些响应,
//那么接下来可能要继续发送新请求出去,在sendEntries()里面解锁
break;
} else {
// Do not processed any responses, UNLOCK id and return.
continueSendEntries = false;
id.unlock();
return;
}
}
holdingQueue.remove();
processed++;
//取出未确认的请求
final Inflight inflight = r.pollInflight();
if (inflight == null) {
// The previous in-flight requests were cleared.
if (isLogDebugEnabled) {
sb.append("ignore response because request not found: ") //
.append(queuedPipelinedResponse) //
.append(",\n");
}
continue;
}
if (inflight.seq != queuedPipelinedResponse.seq) {
//序号对不上,说明存在问题,那么重新发起prob消息
LOG.warn(
"Replicator {} response sequence out of order, expect {}, but it is {}, reset state to try again.",
r, inflight.seq, queuedPipelinedResponse.seq);
r.resetInflights();
r.state = State.Probe;
continueSendEntries = false;
r.block(Utils.nowMs(), RaftError.EREQUEST.getNumber());
return;
}
try {
switch (queuedPipelinedResponse.requestType) {
case AppendEntries: //正常请求是AppendEntries类型的
//onAppendEntriesReturned()方法检查response的状态是否成功,
//检查Follower的term是否和Leader一致,
//如果没有问题,则修改Replicator的状态,并返回true
continueSendEntries = onAppendEntriesReturned(id, inflight, queuedPipelinedResponse.status,
(AppendEntriesRequest) queuedPipelinedResponse.request,
(AppendEntriesResponse) queuedPipelinedResponse.response, rpcSendTime, startTimeMs, r);
break;
case Snapshot:
//代码删减
break;
}
} finally {
if (continueSendEntries) {
// Success, increase the response sequence.
r.getAndIncrementRequiredNextSeq();//requiredNextSeq加1
} else {
break;
}
}
}
} finally {
//代码删减
if (continueSendEntries) {
// unlock in sendEntries.
r.sendEntries();//发送下一个请求
}
}
}
发送日志
prob请求发送并成功处理响应后,在onRpcReturned()方法的最后调用了Replicator.sendEntries()方法:
void sendEntries() {
boolean doUnlock = true;
try {
long prevSendIndex = -1;
while (true) {
final long nextSendingIndex = getNextSendIndex();//获取下一个发送日志的索引位置
if (nextSendingIndex > prevSendIndex) {
if (sendEntries(nextSendingIndex)) {//根据索引从RocksDB中取出日志并发送给Follower
prevSendIndex = nextSendingIndex;
} else {
doUnlock = false;
// id already unlock in sendEntries when it returns false.
break;
}
} else {
break;
}
}
} finally {
if (doUnlock) {
this.id.unlock();
}
}
}
//在成功处理prob响应之前,该方法始终返回-1
long getNextSendIndex() {
// Fast path
if (this.inflights.isEmpty()) {
return this.nextIndex;
}
// Too many in-flight requests.
if (this.inflights.size() > this.raftOptions.getMaxReplicatorInflightMsgs()) {
return -1L;
}
// Last request should be a AppendEntries request and has some entries.
//发送prob请求时,rpcInFly.count为0,所以在成功处理prob响应之前,下面的if都为false
if (this.rpcInFly != null && this.rpcInFly.isSendingLogEntries()) {
return this.rpcInFly.startIndex + this.rpcInFly.count;
}
return -1L;
}
private boolean sendEntries(final long nextSendingIndex) {
final AppendEntriesRequest.Builder rb = AppendEntriesRequest.newBuilder();
//这里删减了获取日志的代码
//删减的代码主要作用是根据nextSendingIndex从RocksDB中取出日志放到rb中,然后使用下面的方法构建出AppendEntriesRequest对象
final AppendEntriesRequest request = rb.build();
//代码删减
this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
this.statInfo.firstLogIndex = rb.getPrevLogIndex() + 1;
this.statInfo.lastLogIndex = rb.getPrevLogIndex() + rb.getEntriesCount();
final Recyclable recyclable = dataBuf;
final int v = this.version;
final long monotonicSendTimeMs = Utils.monotonicMs();
final int seq = getAndIncrementReqSeq();
Future<Message> rpcFuture = null;
try {
//向Follower发送请求消息,以同步数据
rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(), request, -1,
new RpcResponseClosureAdapter<AppendEntriesResponse>() {
@Override
public void run(final Status status) {
RecycleUtil.recycle(recyclable); // TODO: recycle on send success, not response received.
onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request, getResponse(),
seq, v, monotonicSendTimeMs);
}
});
} catch (final Throwable t) {
RecycleUtil.recycle(recyclable);
ThrowUtil.throwException(t);
}
//记录未收到响应的请求
addInflight(RequestType.AppendEntries, nextSendingIndex, request.getEntriesCount(), request.getData().size(),
seq, rpcFuture);
return true;
}
在最后的sendEntries()方法里面,要将日志发送给Follower,这里发送的日志是批量的,从RocksDB中读取日志是循环读取,最多读取1024个(ReaftOptions.maxEntriesSize参数指定),然后将这些日志全部放入到AppendEntriesRequest中一起发送给Follower。
到这里,Leader发送请求到Follower的原理已经介绍完毕,总起来说一共分为如下几步:
- 根据Follower的IP地址创建Replicator;
- 发送ping请求,以与Follower建立连接;
- 发送prob请求,获取Follower当前已经同步到的日志位置;
- 得到Follower的日志位置后,开始同步日志直到nacos停止运行。
(2)Follower同步数据
从上一部分看到,Leader将AppendEntriesRequest请求发送给Follower,Follower调用
AppendEntriesRequestProcessor处理器处理该请求。
pipeline模式:下面代码涉及到pipeline模式,pipeline模式的作用是使Follower顺序处理Leader发送的请求。在pipeline模式下,Leader使用单线程按顺序发送日志,Follower直接使用IO线程(负责连接Leader的线程)处理收到的日志,而且Follower对收到的日志也会做判断,如果不是需要处理的下一个序号的日志,Follower直接返回失败。通过这样的处理,保证了Follower按照顺序处理接收到的日志。
public Message processRequest0(final RaftServerService service, final AppendEntriesRequest request,
final RpcRequestClosure done) {
final Node node = (Node) service;
//isReplicatorPipeline()检查是否启用pipeline模式,默认是启用的
if (node.getRaftOptions().isReplicatorPipeline()) {
final String groupId = request.getGroupId();
final PeerPair pair = pairOf(request.getPeerId(), request.getServerId());
boolean isHeartbeat = isHeartbeatRequest(request);
int reqSequence = -1;
if (!isHeartbeat) {
reqSequence = getAndIncrementSequence(groupId, pair, done.getRpcCtx().getConnection());
}
final Message response = service.handleAppendEntriesRequest(request, new SequenceRpcRequestClosure(done,
defaultResp(), groupId, pair, reqSequence, isHeartbeat));
if (response != null) {
if (isHeartbeat) {
done.getRpcCtx().sendResponse(response);
} else {
sendSequenceResponse(groupId, pair, reqSequence, done.getRpcCtx(), response);
}
}
return null;
} else {
return service.handleAppendEntriesRequest(request, done);
}
}
processRequest0()里面调用NodeImpl.handleAppendEntriesRequest()处理收到的请求:
public Message handleAppendEntriesRequest(final AppendEntriesRequest request, final RpcRequestClosure done) {
boolean doUnlock = true;
final long startMs = Utils.monotonicMs();
this.writeLock.lock();
final int entriesCount = request.getEntriesCount();
try {
//代码删减
//删减的代码主要是对request里面的一些参数检查
//prevLogIndex表示Leader发送的当前日志的前一个日志的索引
final long prevLogIndex = request.getPrevLogIndex();
final long prevLogTerm = request.getPrevLogTerm();
//getTerm()根据索引从RocksDB查找日志并返回日志的term,如果没有找到日志,返回0
//所以如果当前收到日志是乱序的话,localPrevLogTerm便是0
final long localPrevLogTerm = this.logManager.getTerm(prevLogIndex);
if (localPrevLogTerm != prevLogTerm) {
//如果收到的日志是乱序的,那么向Leader返回处理失败,并且返回当前Follower的最大索引
final long lastLogIndex = this.logManager.getLastLogIndex();
//代码删减
return AppendEntriesResponse.newBuilder() //
.setSuccess(false) //
.setTerm(this.currTerm) //
.setLastLogIndex(lastLogIndex) //
.build();
}
//代码删减
// Parse request
long index = prevLogIndex;
final List<LogEntry> entries = new ArrayList<>(entriesCount);
ByteBuffer allData = null;
if (request.hasData()) {
allData = request.getData().asReadOnlyByteBuffer();
}
final List<RaftOutter.EntryMeta> entriesList = request.getEntriesList();
//处理日志
for (int i = 0; i < entriesCount; i++) {
index++;
final RaftOutter.EntryMeta entry = entriesList.get(i);
final LogEntry logEntry = logEntryFromMeta(index, allData, entry);
if (logEntry != null) {
// Validate checksum
if (this.raftOptions.isEnableLogEntryChecksum() && logEntry.isCorrupted()) {
long realChecksum = logEntry.checksum();
//代码删减
return RpcFactoryHelper //
.responseFactory() //
.newResponse(AppendEntriesResponse.getDefaultInstance(), RaftError.EINVAL,
"The log entry is corrupted, index=%d, term=%d, expectedChecksum=%d, realChecksum=%d",
logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(),
realChecksum);
}
entries.add(logEntry);
}
}
final FollowerStableClosure closure = new FollowerStableClosure(request, AppendEntriesResponse.newBuilder()
.setTerm(this.currTerm), this, done, this.currTerm);
//将日志保存到本地的RocksDB中
this.logManager.appendEntries(entries, closure);
// update configuration after _log_manager updated its memory status
checkAndSetConfiguration(true);
return null;
} finally {
//代码删减
}
}
handleAppendEntriesRequest()方法处理完日志会创建出响应对象,sendSequenceResponse()方法将响应对象返回到Leader:
void sendSequenceResponse(final String groupId, final PeerPair pair, final int seq, final RpcContext rpcCtx,
final Message msg) {
//找到当前节点下groupId对应的上下文对象
final PeerRequestContext ctx = getPeerRequestContext(groupId, pair);
//代码删减
final PriorityQueue<SequenceMessage> respQueue = ctx.responseQueue;
synchronized (Utils.withLockObject(respQueue)) {
respQueue.add(new SequenceMessage(rpcCtx, msg, seq));
//判断是否有过多的未发出去的响应,默认最多是256个
//如果未发出去的响应过多,说明当前链接出现问题,那么nacos选择的策略是关闭链接
//不过下面这个判断在pipeline模式下不起作用,因为pipeline模式是单线程处理收到的请求
if (!ctx.hasTooManyPendingResponses()) {
while (!respQueue.isEmpty()) {
final SequenceMessage queuedPipelinedResponse = respQueue.peek();
//在pipeline模式下,if判断永远为true
if (queuedPipelinedResponse.sequence != ctx.getNextRequiredSequence()) {
// sequence mismatch, waiting for next response.
break;
}
respQueue.remove();
try {
queuedPipelinedResponse.sendResponse();//发出响应
} finally {
ctx.getAndIncrementNextRequiredSequence();
}
}
} else {
final Connection connection = rpcCtx.getConnection();
//代码删减
connection.close();//关闭链接
removePeerRequestContext(groupId, pair);//销毁上下文对象
}
}
}
Follower同步数据总的来说也比较简单,在pipeline模式下,Follower单线程处理收到的请求,对请求对象校验无误,将请求写入本地。
参考文章
https://www.sofastack.tech/blog/sofa-jraft-pipeline-principle/
https://zhuanlan.zhihu.com/p/312024022
更多推荐
所有评论(0)