本文基于nacos-2.0.3版本

nacos在集群模式下,使用内置数据库时,当发布新的配置或者更新配置时,nacos使用raft算法将配置分发给集群中的其他节点。在raft算法里面,需要分发的数据被称作日志。
本文以配置分发为例介绍raft算法的日志复制原理。

nacos在集群模式使用内置数据库的方法是:
虚拟机启动参数配置为:
-Dnacos.standalone=false
-DembeddedStorage=true

1、分发原理

正常工作时,raft算法将集群中的机器分为Follower和Leader,客户端发布新配置或者更新配置必须将配置数据上传到Leader,如果客户端连接的是Follower,那么数据由该Follower负责转发。
Leader收到请求后,将数据转换为日志对象(LogEntry)并记录到本地,之后将日志分发给各个Follower。为了加快分发速度,分发各个Follower是并行进行的,且每次都是批量分发。

2、源码解析

(1)Leader分发日志

保存日志

下面我们以Leader的视角来看一下如何将日志分发给Follower。
当需要将数据发送到集群中时,nacos调用JRaftProtocol.update()方法,该方法将要数据组装成WriteRequest对象,之后调用JraftServer.commit()方法,commit()里面判断当前节点是否是Leader,如果是则直接将数据应用到本地,然后再通知其他节点,如果不是,则发送给Leader。

        final Node node = tuple.node;//node表示当前节点
        if (node.isLeader()) {
        	//当前节点为Leader,则直接将数据应用到本地,然后再通知其他节点
            // The leader node directly applies this request
            //data就是WriteRequest对象
            applyOperation(node, data, closure);
        } else {
        	//当前节点不是Leader,则将数据发送给Leader
            // Forward to Leader for request processing
            invokeToLeader(group, data, rpcRequestTimeoutMs, closure);
        }

nacos使用raft算法是SOFA的JRaft。
JRaftProtocol是JRaft的启动类,也是收发集群消息的入口;JraftServer是另外一个非常重要的类,JRaftProtocol的大部分工作都是由JraftServer完成的,可以这么说,JRaftProtocol是JRaft的入口,而JraftServer负责入口后面的具体工作,包括创建与集群的网络连接,触发选举,创建节点信息等。

上面代码里面的参数node的实现类是NodeImpl,一个NodeImpl对象表示raft集群中的一个节点。对于Leader节点,数据首先要应用在本地,所以在applyOperation()方法里面,调用了NodeImpl.apply()方法处理参数data。
为了加快处理速度,NodeImpl将多个WriteRequest组成一个批次,默认32个组成一个批次,然后将该批次存入到RingBuffer中,经过中间一系列的转发,NodeImpl将数据转发给LogManagerImpl。

在JRaft里面,所有需要分发到集群中的数据都被作为日志写入文件,LogManagerImpl便是负责批量写入文件,以及管理日志文件的类。

LogManagerImpl也是将数据存放到RingBuffer,之后由StableClosureEventHandler处理器从队列里读出数据调用LogManagerImpl.appendToStorage()方法将数据写入文件。写入文件成功之后,还会调用BallotBox.commitAt(),该方法的作用主要是记录日志已经提交成功,并通知状态机。
这里有三点需要说明:

  1. 在写入日志文件前,所有的转发的都是通过队列完成的,这可以保证一点,日志文件的写入是有序的;
  2. 参数data在NodeImpl.apply()方法里面组装成LogEntry对象,写入日志的文件的便是该对象,LogEntry里面记录了通知给Follower的数据、raft的term等信息;
  3. nacos底层使用RocksDB存储日志。

到这里,数据被作为日志存储到了Leader本地,下面开始介绍Leader如何将数据分发出去。

创建Replicator

节点变成Leader后,需要维持与各个Follower之间的连接、发送心跳和分发数据等,这些工作全部是由Replicator完成的。一个Replicator对象可以认为代表了一个Follower节点。
下面从Replicator的创建开始介绍。
当一个节点通过选举变成Leader后,会调用NodeImpl.becomeLeader()方法,该方法内部遍历各个Follower节点,并对每个节点调用ReplicatorGroupImpl.addReplicator()方法,代码如下:

        // Start follower's replicators
        //遍历每个Follower节点,PeerId存储了节点的IP
        for (final PeerId peer : this.conf.listPeers()) {
            if (peer.equals(this.serverId)) {
                continue;
            }
            LOG.debug("Node {} add a replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
            if (!this.replicatorGroup.addReplicator(peer)) {
                LOG.error("Fail to add a replicator, peer={}.", peer);
            }
        }

ReplicatorGroupImpl.addReplicator()在ReplicatorOptions对象中保存PeerId,并将其作为入参调用Replicator.start()来创建Replicator对象。

	//该方法有两个入参,第一个入参ReplicatorOptions表示当前节点的配置信息,
	//第二个入参RaftOptions表示raft集群的配置信息,比如上面提到了每32个WriteRequest组成一个批次,32就配置在RaftOptions中,
	//ReplicatorOptions里面保存了心跳超时时间、Follower节点的IP、节点类型(是Follower还是Learner)等信息
    public static ThreadId start(final ReplicatorOptions opts, final RaftOptions raftOptions) {
        if (opts.getLogManager() == null || opts.getBallotBox() == null || opts.getNode() == null) {
            throw new IllegalArgumentException("Invalid ReplicatorOptions.");
        }
        //创建Replicator对象
        final Replicator r = new Replicator(opts, raftOptions);
        //创建与节点的连接,它向Follower节点发送了一个PingRequest请求
        if (!r.rpcService.connect(opts.getPeerId().getEndpoint())) {
            LOG.error("Fail to init sending channel to {}.", opts.getPeerId());
            // Return and it will be retried later.
            return null;
        }

        // Register replicator metric set.
        final MetricRegistry metricRegistry = opts.getNode().getNodeMetrics().getMetricRegistry();
        if (metricRegistry != null) {
            try {
                final String replicatorMetricName = getReplicatorMetricName(opts);
                if (!metricRegistry.getNames().contains(replicatorMetricName)) {
                    metricRegistry.register(replicatorMetricName, new ReplicatorMetricSet(opts, r));
                }
            } catch (final IllegalArgumentException e) {
                // ignore
            }
        }

        // Start replication
        //ThreadId主要为Replicator提供加解锁功能
        r.id = new ThreadId(r, r);
        r.id.lock();//加锁
        notifyReplicatorStatusListener(r, ReplicatorEvent.CREATED);
        LOG.info("Replicator={}@{} is started", r.id, r.options.getPeerId());
        r.catchUpClosure = null;
        r.lastRpcSendTimestamp = Utils.monotonicMs();
        r.startHeartbeatTimer(Utils.nowMs());//设置心跳超时时间,并启动心跳定时任务
        // id.unlock in sendEmptyEntries
        r.sendEmptyEntries(false);//向Follower节点发送一个prob信息
        return r.id;
    }

发送prob请求

在start()方法里面,我们看到Replicator建立连接后,紧接着向Follower节点发送了一个prob信息,prob信息的作用是获知Follower已经拥有的的日志位置,以便于向 Follower 发送后续的日志。
下面是sendEmptyEntries()方法发送prob信息的代码:

	// No entries and has empty data means a probe request.
	// TODO(boyan) refactor, adds a new flag field?
	rb.setData(ByteString.EMPTY);//发送的是空消息
	final AppendEntriesRequest request = rb.build();
	// Sending a probe request.
	this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
	this.statInfo.firstLogIndex = this.nextIndex;
	this.statInfo.lastLogIndex = this.nextIndex - 1;
	this.appendEntriesCounter++;
	this.state = State.Probe;//当前Replicator的状态为探测状态
	final int stateVersion = this.version;
	final int seq = getAndIncrementReqSeq();
	//发送prob信息
	final Future<Message> rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(),
	    request, -1, new RpcResponseClosureAdapter<AppendEntriesResponse>() {
	    	//prob信息发送成功后,回调run()方法,run()方法是非常关键的方法,Leader分发消息是run()触发的
	        @Override
	        public void run(final Status status) {
	        	//收到prob请求的响应后,触发该方法的调用
	            onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request,
	                getResponse(), seq, stateVersion, monotonicSendTimeMs);
	        }
	    });
	 //记录已经发出,但是还没有收到响应的请求,当这种请求过多时,nacos控制不再发出新请求
	addInflight(RequestType.AppendEntries, this.nextIndex, 0, 0, seq, rpcFuture);

这里首先展示addInflight()方法:

    private void addInflight(final RequestType reqType, final long startIndex, final int count, final int size,
                             final int seq, final Future<Message> rpcInfly) {
        this.rpcInFly = new Inflight(reqType, startIndex, count, size, seq, rpcInfly);
        this.inflights.add(this.rpcInFly);//保存双向队列中,队列里面的都是未收到响应的请求
        this.nodeMetrics.recordSize("replicate-inflights-count", this.inflights.size());
    }

下面我们深入看一下onRpcReturned()方法,该方法是收到prob响应后的回调方法。

    //代码有删减
    static void onRpcReturned(final ThreadId id, final RequestType reqType, final Status status, final Message request,
                              final Message response, final int seq, final int stateVersion, final long rpcSendTime) {
        //代码删减
        
        final PriorityQueue<RpcResponse> holdingQueue = r.pendingResponses;
        //将Follower返回的响应信息暂存到优先级队列中
        //优先级队列按照seq排序,seq是Replicator内部属性,每发送一次请求,seq加1
        holdingQueue.add(new RpcResponse(reqType, seq, status, request, response, rpcSendTime));
		//当收到的暂未处理的响应超过256时,nacos认为当前系统出现问题,
		//将Replicator状态更改为prob,重新发送prob消息,并且清空inflights队列
        if (holdingQueue.size() > r.raftOptions.getMaxReplicatorInflightMsgs()) {
            LOG.warn("Too many pending responses {} for replicator {}, maxReplicatorInflightMsgs={}",
                holdingQueue.size(), r.options.getPeerId(), r.raftOptions.getMaxReplicatorInflightMsgs());
            r.resetInflights();
            r.state = State.Probe;
            r.sendEmptyEntries(false);
            return;
        }
        boolean continueSendEntries = false;
        
		//代码删减
		
        try {
            int processed = 0;
            while (!holdingQueue.isEmpty()) {
                final RpcResponse queuedPipelinedResponse = holdingQueue.peek();

                // Sequence mismatch, waiting for next response.
                //requiredNextSeq表示等待响应的下一个请求的序号
                //如果当前响应的序号与等待的序号不相等,说明后发请求的响应先到了,那么退出当前方法,继续等待响应
                if (queuedPipelinedResponse.seq != r.requiredNextSeq) {
                    if (processed > 0) {
                        //代码删减
                        //下面的else解锁,而这里不解锁,
                        //原因是processed不等于0,说明已经处理一些响应,
                        //那么接下来可能要继续发送新请求出去,在sendEntries()里面解锁
                        break;
                    } else {
                        // Do not processed any responses, UNLOCK id and return.
                        continueSendEntries = false;
                        id.unlock();
                        return;
                    }
                }
                holdingQueue.remove();
                processed++;
                //取出未确认的请求
                final Inflight inflight = r.pollInflight();
                if (inflight == null) {
                    // The previous in-flight requests were cleared.
                    if (isLogDebugEnabled) {
                        sb.append("ignore response because request not found: ") //
                            .append(queuedPipelinedResponse) //
                            .append(",\n");
                    }
                    continue;
                }
                if (inflight.seq != queuedPipelinedResponse.seq) {
                    //序号对不上,说明存在问题,那么重新发起prob消息
                    LOG.warn(
                        "Replicator {} response sequence out of order, expect {}, but it is {}, reset state to try again.",
                        r, inflight.seq, queuedPipelinedResponse.seq);
                    r.resetInflights();
                    r.state = State.Probe;
                    continueSendEntries = false;
                    r.block(Utils.nowMs(), RaftError.EREQUEST.getNumber());
                    return;
                }
                try {
                    switch (queuedPipelinedResponse.requestType) {
                        case AppendEntries: //正常请求是AppendEntries类型的
                        	//onAppendEntriesReturned()方法检查response的状态是否成功,
                        	//检查Follower的term是否和Leader一致,
                        	//如果没有问题,则修改Replicator的状态,并返回true
                            continueSendEntries = onAppendEntriesReturned(id, inflight, queuedPipelinedResponse.status,
                                (AppendEntriesRequest) queuedPipelinedResponse.request,
                                (AppendEntriesResponse) queuedPipelinedResponse.response, rpcSendTime, startTimeMs, r);
                            break;
                        case Snapshot:
                            //代码删减
                            break;
                    }
                } finally {
                    if (continueSendEntries) {
                        // Success, increase the response sequence.
                        r.getAndIncrementRequiredNextSeq();//requiredNextSeq加1
                    } else {
                        break;
                    }
                }
            }
        } finally {
            //代码删减
            if (continueSendEntries) {
                // unlock in sendEntries.
                r.sendEntries();//发送下一个请求
            }
        }
    }

发送日志

prob请求发送并成功处理响应后,在onRpcReturned()方法的最后调用了Replicator.sendEntries()方法:

    void sendEntries() {
        boolean doUnlock = true;
        try {
            long prevSendIndex = -1;
            while (true) {
                final long nextSendingIndex = getNextSendIndex();//获取下一个发送日志的索引位置
                if (nextSendingIndex > prevSendIndex) {
                    if (sendEntries(nextSendingIndex)) {//根据索引从RocksDB中取出日志并发送给Follower
                        prevSendIndex = nextSendingIndex;
                    } else {
                        doUnlock = false;
                        // id already unlock in sendEntries when it returns false.
                        break;
                    }
                } else {
                    break;
                }
            }
        } finally {
            if (doUnlock) {
                this.id.unlock();
            }
        }
    }
    //在成功处理prob响应之前,该方法始终返回-1
    long getNextSendIndex() {
        // Fast path
        if (this.inflights.isEmpty()) {
            return this.nextIndex;
        }
        // Too many in-flight requests.
        if (this.inflights.size() > this.raftOptions.getMaxReplicatorInflightMsgs()) {
            return -1L;
        }
        // Last request should be a AppendEntries request and has some entries.
        //发送prob请求时,rpcInFly.count为0,所以在成功处理prob响应之前,下面的if都为false
        if (this.rpcInFly != null && this.rpcInFly.isSendingLogEntries()) {
            return this.rpcInFly.startIndex + this.rpcInFly.count;
        }
        return -1L;
    }
    private boolean sendEntries(final long nextSendingIndex) {
     	final AppendEntriesRequest.Builder rb = AppendEntriesRequest.newBuilder();
        //这里删减了获取日志的代码
        //删减的代码主要作用是根据nextSendingIndex从RocksDB中取出日志放到rb中,然后使用下面的方法构建出AppendEntriesRequest对象
        final AppendEntriesRequest request = rb.build();
        //代码删减
        this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
        this.statInfo.firstLogIndex = rb.getPrevLogIndex() + 1;
        this.statInfo.lastLogIndex = rb.getPrevLogIndex() + rb.getEntriesCount();

        final Recyclable recyclable = dataBuf;
        final int v = this.version;
        final long monotonicSendTimeMs = Utils.monotonicMs();
        final int seq = getAndIncrementReqSeq();

        Future<Message> rpcFuture = null;
        try {
        	//向Follower发送请求消息,以同步数据
            rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(), request, -1,
                new RpcResponseClosureAdapter<AppendEntriesResponse>() {

                    @Override
                    public void run(final Status status) {
                        RecycleUtil.recycle(recyclable); // TODO: recycle on send success, not response received.
                        onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request, getResponse(),
                            seq, v, monotonicSendTimeMs);
                    }
                });
        } catch (final Throwable t) {
            RecycleUtil.recycle(recyclable);
            ThrowUtil.throwException(t);
        }
        //记录未收到响应的请求
        addInflight(RequestType.AppendEntries, nextSendingIndex, request.getEntriesCount(), request.getData().size(),
            seq, rpcFuture);
        return true;
    }

在最后的sendEntries()方法里面,要将日志发送给Follower,这里发送的日志是批量的,从RocksDB中读取日志是循环读取,最多读取1024个(ReaftOptions.maxEntriesSize参数指定),然后将这些日志全部放入到AppendEntriesRequest中一起发送给Follower。
到这里,Leader发送请求到Follower的原理已经介绍完毕,总起来说一共分为如下几步:

  1. 根据Follower的IP地址创建Replicator;
  2. 发送ping请求,以与Follower建立连接;
  3. 发送prob请求,获取Follower当前已经同步到的日志位置;
  4. 得到Follower的日志位置后,开始同步日志直到nacos停止运行。

(2)Follower同步数据

从上一部分看到,Leader将AppendEntriesRequest请求发送给Follower,Follower调用
AppendEntriesRequestProcessor处理器处理该请求。

pipeline模式:下面代码涉及到pipeline模式,pipeline模式的作用是使Follower顺序处理Leader发送的请求。在pipeline模式下,Leader使用单线程按顺序发送日志,Follower直接使用IO线程(负责连接Leader的线程)处理收到的日志,而且Follower对收到的日志也会做判断,如果不是需要处理的下一个序号的日志,Follower直接返回失败。通过这样的处理,保证了Follower按照顺序处理接收到的日志。

    public Message processRequest0(final RaftServerService service, final AppendEntriesRequest request,
                                   final RpcRequestClosure done) {

        final Node node = (Node) service;
		//isReplicatorPipeline()检查是否启用pipeline模式,默认是启用的
        if (node.getRaftOptions().isReplicatorPipeline()) {
            final String groupId = request.getGroupId();
            final PeerPair pair = pairOf(request.getPeerId(), request.getServerId());

            boolean isHeartbeat = isHeartbeatRequest(request);
            int reqSequence = -1;
            if (!isHeartbeat) {
                reqSequence = getAndIncrementSequence(groupId, pair, done.getRpcCtx().getConnection());
            }
            final Message response = service.handleAppendEntriesRequest(request, new SequenceRpcRequestClosure(done,
                defaultResp(), groupId, pair, reqSequence, isHeartbeat));
            if (response != null) {
                if (isHeartbeat) {
                    done.getRpcCtx().sendResponse(response);
                } else {
                    sendSequenceResponse(groupId, pair, reqSequence, done.getRpcCtx(), response);
                }
            }
            return null;
        } else {
            return service.handleAppendEntriesRequest(request, done);
        }
    }

processRequest0()里面调用NodeImpl.handleAppendEntriesRequest()处理收到的请求:

    public Message handleAppendEntriesRequest(final AppendEntriesRequest request, final RpcRequestClosure done) {
        boolean doUnlock = true;
        final long startMs = Utils.monotonicMs();
        this.writeLock.lock();
        final int entriesCount = request.getEntriesCount();
        try {
            //代码删减
            //删减的代码主要是对request里面的一些参数检查
			//prevLogIndex表示Leader发送的当前日志的前一个日志的索引
            final long prevLogIndex = request.getPrevLogIndex();
            final long prevLogTerm = request.getPrevLogTerm();
            //getTerm()根据索引从RocksDB查找日志并返回日志的term,如果没有找到日志,返回0
            //所以如果当前收到日志是乱序的话,localPrevLogTerm便是0
            final long localPrevLogTerm = this.logManager.getTerm(prevLogIndex);
            if (localPrevLogTerm != prevLogTerm) {
            	//如果收到的日志是乱序的,那么向Leader返回处理失败,并且返回当前Follower的最大索引
                final long lastLogIndex = this.logManager.getLastLogIndex();

                //代码删减

                return AppendEntriesResponse.newBuilder() //
                    .setSuccess(false) //
                    .setTerm(this.currTerm) //
                    .setLastLogIndex(lastLogIndex) //
                    .build();
            }

            //代码删减

            // Parse request
            long index = prevLogIndex;
            final List<LogEntry> entries = new ArrayList<>(entriesCount);
            ByteBuffer allData = null;
            if (request.hasData()) {
                allData = request.getData().asReadOnlyByteBuffer();
            }

            final List<RaftOutter.EntryMeta> entriesList = request.getEntriesList();
            //处理日志
            for (int i = 0; i < entriesCount; i++) {
                index++;
                final RaftOutter.EntryMeta entry = entriesList.get(i);

                final LogEntry logEntry = logEntryFromMeta(index, allData, entry);

                if (logEntry != null) {
                    // Validate checksum
                    if (this.raftOptions.isEnableLogEntryChecksum() && logEntry.isCorrupted()) {
                        long realChecksum = logEntry.checksum();
                        //代码删减
                        return RpcFactoryHelper //
                            .responseFactory() //
                            .newResponse(AppendEntriesResponse.getDefaultInstance(), RaftError.EINVAL,
                                "The log entry is corrupted, index=%d, term=%d, expectedChecksum=%d, realChecksum=%d",
                                logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(),
                                realChecksum);
                    }
                    entries.add(logEntry);
                }
            }

            final FollowerStableClosure closure = new FollowerStableClosure(request, AppendEntriesResponse.newBuilder()
                .setTerm(this.currTerm), this, done, this.currTerm);
            //将日志保存到本地的RocksDB中
            this.logManager.appendEntries(entries, closure);
            // update configuration after _log_manager updated its memory status
            checkAndSetConfiguration(true);
            return null;
        } finally {
            //代码删减
        }
    }

handleAppendEntriesRequest()方法处理完日志会创建出响应对象,sendSequenceResponse()方法将响应对象返回到Leader:

    void sendSequenceResponse(final String groupId, final PeerPair pair, final int seq, final RpcContext rpcCtx,
                              final Message msg) {
       //找到当前节点下groupId对应的上下文对象
        final PeerRequestContext ctx = getPeerRequestContext(groupId, pair);
        //代码删减
        final PriorityQueue<SequenceMessage> respQueue = ctx.responseQueue;
        synchronized (Utils.withLockObject(respQueue)) {
            respQueue.add(new SequenceMessage(rpcCtx, msg, seq));
			//判断是否有过多的未发出去的响应,默认最多是256个
			//如果未发出去的响应过多,说明当前链接出现问题,那么nacos选择的策略是关闭链接
			//不过下面这个判断在pipeline模式下不起作用,因为pipeline模式是单线程处理收到的请求
            if (!ctx.hasTooManyPendingResponses()) {
                while (!respQueue.isEmpty()) {
                    final SequenceMessage queuedPipelinedResponse = respQueue.peek();
					//在pipeline模式下,if判断永远为true
                    if (queuedPipelinedResponse.sequence != ctx.getNextRequiredSequence()) {
                        // sequence mismatch, waiting for next response.
                        break;
                    }
                    respQueue.remove();
                    try {
                        queuedPipelinedResponse.sendResponse();//发出响应
                    } finally {
                        ctx.getAndIncrementNextRequiredSequence();
                    }
                }
            } else {
                final Connection connection = rpcCtx.getConnection();
                //代码删减
                connection.close();//关闭链接
                removePeerRequestContext(groupId, pair);//销毁上下文对象
            }
        }
    }

Follower同步数据总的来说也比较简单,在pipeline模式下,Follower单线程处理收到的请求,对请求对象校验无误,将请求写入本地。

参考文章

https://www.sofastack.tech/blog/sofa-jraft-pipeline-principle/
https://zhuanlan.zhihu.com/p/312024022

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐