Lab 2A: Leader Election

Tips

2A主要要完成的任务:

  • 实现初始化Raft实例,核心就在于完成超时选举以及心跳通知。ticker函数用于选举计时,一旦超时就会开始选举。然后关于定时发送心跳的函数都需要自己写。
  • 完成选举函数,需要发送RequestVote RPC,对这个RPC进行处理:根据自己实现的投票条件决定是否投票。若candidate竞选失败且未收到新leader的RPC,就会随机等待时间再次选举;若收到新leader的RPC则会变回follower,停止选举;选举成功则要发送新leader的RPC
  • 实现AppendEntries RPC,2A部分主要是关于心跳的部分,用于选举成功通信以及平常的时候定期通信。
  1. 创建若干个Raft实例,初始状态都为follower
  2. 每个实例后台开启发送心跳(如果是leader)协程和ticker协程(记录选举超时)(也可以采用go特性的select case,管道写入信息的方式触发心跳超时等事件)
  3. 最初每个follower都会选举超时,因为无leader,同时初始的日志都为{”initial“, 0, 0},开始选举。
  4. 每个candidate都会并行发送请求投票RPC,同时对投票计数;如果票数过半则不用再等待后面的投票,直接变为leader;否则持续收集,收集完还没获得过半票数,并且还没收到心跳RPC(收到认同的leader的心跳,可以让自己退回成follower),则会随机等待一段时间再次选举。

投票条件:

  1. 比较term
  2. 比较日志新的程度:先看log最后的term,一样就再比较log最后的index

**这两个条件是要同时满足的,voter的term更小不一定就可以直接投票,因为candidate的日志可能更旧。所以要同时判断这两个条件。**但是如果voter的term更大那就可以直接拒绝了。

实际上2A不涉及日志的更改,所以2A的选举只要比较term即可。

Tester

2A里面有三个测试部分,分别是TestInitialElection2A, TestReElection2A, TestManyElections2A,有时要多测几遍才可以测出错误。

  • TestInitialElection2A是raft实例初始化之后建立的第一次选举,这往往不会有什么问题。
  • TestReElection2A是在网络故障后进行的选举测试,有以下几个部分:
    • leader会被设置为网络故障,检测新的选举,并且要求在旧leader加入之后也不会扰乱选举。
    • 如果集群人数少于一半(Quorum),要求无leader被选举出来。
    • 集群人数达到条件,则可以选举。
    • 新节点的重新加入不应阻止leader的存在。
  • TestManyElections2A会开启7个节点,选出leader,再断连3个节点,要求要么现有leader还存活,要么剩下的4个选出新的leader。然后再加入3个节点,检测是不是还是只有一个leader。

批量测试脚本地址:https://gist.github.com/jonhoo/f686cacb4b9fe716d5aa,使用go-test-many.sh 100(测试次数) 4(同一时刻测试的数量) 2A(lab名称)进行大量测试。也可以直接go test -run <测试函数名>运行单个测试。

Lab 2B: Log

Tips

追加日志的过程:

  1. Leader正常发送心跳,发送心跳时根据自己的rf.nextIndex[server]决定仅发送心跳还是携带日志。发送的日志要从rf.nextIndex[server]开始的所有日志。

  2. Peer收到AppendEntriesRPC,先检查term是不是满足条件(大于等于自己的Term),如果满足,则心跳触发成功(Follower刷新时间),接下来就是处理日志:

    1. 自身的日志太少了(根本没有那么多index),那么就要更新Leader记录的nextIndex[server]为自己的lastLogIndex+1
    2. 日志index满足,但是term不一致,说明日志冲突,那么nextIndex[server]需要自减,比对上一条日志(也可以直接越过冲突日志相同Term的一批日志)
    3. prev日志不冲突,将追加的日志覆盖到PrevLog之后。Follower append log之后,需要检查自己的commitIndex是不是小于LeaderCommit,小于的话就需要更新自己的commitIndex,apply log,
  3. 日志成功追加,leader检查该日志是否生效(过半服务器成功写入):

    1. 更新matchIndex和nextIndex

      日志写入leader —》 leader追加日志 —》过半服务器追加成功 —》leader更新commitIndex —》leader将命令写入applyCh,更新lastApplied —》Follower下次收到心跳就会发现LeaderCommit比自己的更大了,自己也会apply log。

    2. Follower的日志提交成功,需要更新lastApplied。Follower无法统计到其它peer是否提交,所以它需要等到下一个来自Leader的RPC的leaderCommit(本条log如果成功提交(指的是Leader apply log),那下一条的leaderCommit就会更新),然后更新自己的commitIndex,lastApplied,log写入applyCh

Leader每一轮发送心跳就是追加日志,只是根据rf.nextIndex[server]决定是仅心跳还是要携带日志。

Follower apply log的速度一定慢于Leader apply log的速度:

  1. Leader追加日志给Follower
  2. Follower先比对PrevLog,假设无误,就append至自己的log中,返回Success为True
  3. Leader检测到追加成功,则统计Entries中全部append成功(过半节点数追加成功,则视为成功,这里是对matchIndex统计)的日志。
  4. Leader统计到日志追加成功,则会apply log,刷新Leader的commitIndex
  5. 下次Leader再发心跳(追加日志)RPC,Follower就会比对自己的commitIndex和LeaderCommit,若比其小,说明应该更上Leader的步伐apply日志了。在LeaderCommit和自己的LastLogIndex中选出最小值为commitIndex,进行apply log。

apply log必须写入Make()参数内的applyCh:

func (rf *Raft) applyToService(applyCh chan ApplyMsg) {
	// 监听rf.chanApplyLog
	for range rf.chanApplyLog {
		// rf.mu.Lock()
		DPrintf("[%d](term %d, state %d) lastApplied is %d, commitIndex is %d", rf.me, rf.currentTerm, rf.state, rf.lastApplied, rf.commitIndex)
		applyLogs := rf.log[rf.lastApplied+1-rf.log[0].Index : rf.commitIndex+1-rf.log[0].Index]
		for i := 0; i < len(applyLogs); i++ {
			applyMsg := ApplyMsg{
				Command:      applyLogs[i].Command,
				CommandValid: true,
				CommandIndex: applyLogs[i].Index,
			}
			DPrintf("[%d](term %d, state %d) apply command(index: %d)", rf.me, rf.currentTerm, rf.state, applyLogs[i].Index)
			rf.lastApplied = applyLogs[i].Index
			DPrintf("[%d](term %d, state %d) lastApplied is %d", rf.me, rf.currentTerm, rf.state, rf.lastApplied)
			applyCh <- applyMsg
		}
		// rf.lastApplied = rf.commitIndex
		// rf.mu.Unlock()
	}
}

Index的性质

来自:https://zhuanlan.zhihu.com/p/258989392

  • **nextIndex:**该字段供Leader使用,标记对于每个Follower发起AppendEntries时会尝试replicated log的位置(请注意,这里说的是尝试,这个值需要设置的比该follower对应的matchIndex大)。更新nextIndex的三个时机:
  1. 刚被选为Leader,更新所有follower nextIndex=len(rf.logs) + 1
  2. AppendEntries成功的时候,说明已成功在follower上replicated了log
  3. AppendEntries失败的时候,说明现在的nextIndex太大,nextIndex = nextIndex - 1再次尝试AppendEntries
  • **matchIndex:**该字段供Leader使用,用于标记每个Follower已replicated Leader的位置。可以看出来,matchIndex表示的是确定的已和Leader同步的最高位置。更新matchIndex的2个时机:
  1. 当一个Leader AppendEntries成功以后,更新matchIndex到最新位置
  2. 在接受一个新命令的时候,重设Leader的自己matchIndex。即rf.logs = append(rf.logs, command), matchIndex[me] = len(rf.logs)。需要注意,这里设置Leader自己的matchIndex的目的主要是方便统计是否有多于一半的peers和Leader的log entry一致,然后进行commit(参考commitIndex)
  • commitIndex:已commit的最高index。何时commit?当每次AppendEntries成功以后,检测某个index位置是否有多于一半的peers,如果是,标记为commit。注意,这里我们需要使用matchIndex来作为检测的手段,因为matchIndex代表着牢靠的,和Leader没有分歧的位置。
  • **lastApplied:**已applied的最高index。lastApplied和commitIndex类似,在标记完commitIndex以后,我们要做的就是通过applyCh同步apply消息,然后将lastApplied更新lastApplied。因此从某种程度来说,在我们Lab2中,两者几乎等效。在更新完commitIndex以后,通知applyCh,再更新lastApplied

Tester

  • TestBasicAgree2B():最基础的追加日志测试。先使用nCommitted()检查有多少的server认为日志已经提交(在执行Start()函数之前,所有的服务器都不应该提交日志),若满足条件则调用cfg.one(),其通过调用rf.Start(cmd)来追加日志。rf.Start(cmd)用于模拟Raft实例从Client接收实例的情况。

  • TestRPCBytes2B:基于RPC的字节数检查保证每个cmd都只对每个peer发送一次。

    bytes1 := cfg.bytesTotal()
    got := bytes1 - bytes0
    expected := int64(servers) * sent
    if got > expected+50000 {
        t.Fatalf("too many RPC bytes; got %v, expected %v", got, expected)
    }
    
  • TestFailAgree2B:断连小部分,不影响整体Raft集群的情况检测追加日志。

  • TestFailNoAgree2B:断连过半数节点,保证无日志可以正常追加。然后又重新恢复节点,检测追加日志情况。

  • TestConcurrentStarts2B:模拟客户端并发发送多个命令

  • TestRejoin2B:Leader 1断连,再让旧leader 1接受日志,再给新Leader 2发送日志,2断连,再重连旧Leader 1,提交日志,再让2重连,再提交日志。

  • TestBackup2B:先给Leader 1发送日志,然后断连3个Follower(总共1Ledaer 4Follower),网络分区。提交大量命令给1。然后让leader 1和其Follower下线,之前的3个Follower上线,向它们发送日志。然后在对剩下的仅有3个节点的Raft集群重复上面网络分区的过程。https://segmentfault.com/a/1190000023822541

  • TestCount2B:检查无效的RPC个数,不能过多。

Lab 2C: Persistence

Tips

需要持久化的状态:currentTerm, votedFor, log[]

Make() —> readPersist() —> persist() in correct time

持久化的时机是需要持久化的状态发生改变的时候。如果顺利完成了前面两个lab,再做这个lab时,只要你参考注释实现了persist()readPersist()两个方法,然后在合适的时候调用persist()方法,基本就可以正常通过(需要实现对冲突日志处理的优化),感觉还是比2B简单许多。

log[]是唯一能用来重建应用程序状态的信息,所以Log必须要被持久化存储votedFor是为了避免在一个term内,节点对其它两个节点投票,造成脑裂的情况。currentTerm被持久化,是为了防止网络分区的情况下(Leader写入了某个term的日志,但是该日志还没有追加给其它节点),一个分区内的节点宕机再恢复,如果只看日志决定term,会导致选择的term过旧,一个term内写的日志在两个分区内不一致。

关于图8的问题:可以看看这个:https://www.codedump.info/post/20211011-raft-propose-prev-term/。主要在解释raft为什么不能直接commit old term replicate的log,我的理解是d和e都是因为c的操作而可能导致的情况,如果c直接提交了term2的log(而没有提交currentTerm的日志),在d中会导致已提交的日志被重写。如果c没有提交term2 log,而是先提交currentTerm log再提交之前的日志,这就是e的情况。

Tester

  • TestPersist12C():basic persistence,将节点宕机恢复验证持久化正确性
  • TestPersist22C():more persistence,验证网络分区故障的情况下持久化数据的正确性
  • TestPersist32C():partitioned leader and one follower crash, leader restarts,验证Leader宕机能否正确回复日志(除了所有类型节点都要在append、vote那里要持久化,Leader还有其它的地方需要持久化)
  • TestFigure82C():Figure 8,测试paper中图8的错误情况,避免直接提交以前term的日志,其实只要知道怎么做就行了,在apply前加个判断term是否为最新的条件。
  • TestUnreliableAgree2C():模拟不可靠网络的情况
  • TestFigure8Unreliable2C():基于不可靠网络的图8测试
  • TestReliableChurn2C():感觉像测试并发啥的,这里没怎么出错就没看了
  • TestUnreliableChurn2C():同上

Part 2D: log compaction

Tips

快照是日志压缩的一个方式,上层服务告知下层raft现在的状态,下层日志就可以删除该状态之前的日志。需要实现以下三个方法:

  • Snapshot(index int, snapshot []byte) s e r v i c e → r a f t service \rightarrow raft serviceraft,上层服务调用Snapshot()将其状态的快照传递给Raft。上层一旦调用下层的该方法,就会让下层修剪日志。

  • InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) L e a d e r → F o l l o w e r Leader \rightarrow Follower LeaderFollower,同步快照

  • CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool s e r v i c e → r a f t service \rightarrow raft serviceraft

系统刚启动时,我们实现的raft.go一直都是向上层发送日志,但是config.go中的applierSnap()会根据apply的日志条目数,周期性的执行快照请求:

if (m.CommandIndex+1)%SnapShotInterval == 0 {
    w := new(bytes.Buffer)
    e := labgob.NewEncoder(w)
    v := m.Command
    e.Encode(v)
    cfg.rafts[i].Snapshot(m.CommandIndex, w.Bytes())
}

并且通知raft对日志进行压缩,然后下层raft apply snapshot。当某个peer的日志发生了滞后的情况,Leader向该peer发送InstallSnapshot RPC,将Leader的快照和所有的日志同步到该Follower,日志发生变更,Leader需要修改对应的nextIndex和matchIndex。

$snapshot(service_call)\rightarrow applySnapshot(raft_call) $

快照由上层服务决定保存什么内容,上层服务在它想要保存快照的时候(tester内是apply10条日志就会执行快照)通过调用下层raft中的snapshot()告知下层其想要保存快照的内容,下层就会通过applyCh将快照apply至上层服务。上层apply快照时,会经过CondInsteallSnapshot()进行检测,保证应用的快照状态一定是最新commit log的状态

遇到的问题

panic: runtime error: index out of range [10] with length 5
for i := rf.lastApplied + 1; i < rf.commitIndex+1; i++ {
    applyMsg := ApplyMsg{
        Command:      rf.log[i].Command,
        CommandValid: true,
        CommandIndex: i,
    }
    DPrintf("[%d](term %d, state %d) apply command", rf.me, rf.currentTerm, rf.state)
    applyCh <- applyMsg
}

apply 0~13,apply至9时,就会触发快照,Leader会修剪log,快照完毕之后,会继续apply剩下的日志,此时直接用rf.log[i]显然就不对了,因为修剪日志操作会移动日志。可以在添加这一批日志之前先把这批日志用另一个变量保存再apply

Hint中也说了要独立于日志位置的索引方案,往往越界都是因为这个问题。


关于condInstallSnapshot(),lab设置该方法是为了保证每次snapshot安装的状态都为最新apply的状态,在config.go中的appliersnap()方法安装快照之后,会更新该方法内的lastApplied(不是raft自身保存的lastApplied)为快照的最后的index(相当于回溯了)。所以如果在安装snapshot的过程中,插入apply了其它的log,会导致server %v apply out of order %v的问题,原因就是apply log的index不是连续的。

applying snapshot(19) ----> apply log(20)(lastApplied=20) ---> Installsnapshot successfully(lastApplied=19) ---> apply log(21): apply out of order 21(lost apply 20)

所以需要实现condInstallSnapshot(),在apply了20的时候,停止apply 19的快照,否则就会导致上述问题。


第三个tester,会让节点被kill,持久化信息没有记录自己的lastApplied,而日志是被压缩过的,所以恢复日志之后,节点也不能正确的apply log。可以在恢复持久化日志之后,再重新设置lastApplied(log[0]的Index),而不是粗暴的设置为0。

Test

对Lab2的整个运行过程进行测试:

=== RUN   TestInitialElection2A
Test (2A): initial election ...
检查term
检查term
  ... Passed --   3.1  3   94   24700    0
--- PASS: TestInitialElection2A (3.10s)
=== RUN   TestReElection2A
Test (2A): election after network failure ...
  ... Passed --   4.5  3  194   38774    0
--- PASS: TestReElection2A (4.51s)
=== RUN   TestManyElections2A
Test (2A): multiple elections ...
  ... Passed --   5.4  7  833  162740    0
--- PASS: TestManyElections2A (5.42s)
=== RUN   TestBasicAgree2B
Test (2B): basic agreement ...
  ... Passed --   0.6  3   16    4220    3
--- PASS: TestBasicAgree2B (0.63s)
=== RUN   TestRPCBytes2B
Test (2B): RPC byte count ...
  ... Passed --   1.7  3   48  113288   11
--- PASS: TestRPCBytes2B (1.68s)
=== RUN   TestFailAgree2B
Test (2B): agreement despite follower disconnection ...
  ... Passed --   5.6  3  180   45727    8
--- PASS: TestFailAgree2B (5.65s)
=== RUN   TestFailNoAgree2B
Test (2B): no agreement if too many followers disconnect ...
  ... Passed --   3.5  5  284   59077    3
--- PASS: TestFailNoAgree2B (3.45s)
=== RUN   TestConcurrentStarts2B
Test (2B): concurrent Start()s ...
  ... Passed --   0.6  3   16    4200    6
--- PASS: TestConcurrentStarts2B (0.65s)
=== RUN   TestRejoin2B
Test (2B): rejoin of partitioned leader ...
  ... Passed --   4.0  3  216   49285    4
--- PASS: TestRejoin2B (4.00s)
=== RUN   TestBackup2B
Test (2B): leader backs up quickly over incorrect follower logs ...
  ... Passed --  19.1  5 2428 1994326  102
--- PASS: TestBackup2B (19.11s)
=== RUN   TestCount2B
Test (2B): RPC counts aren't too high ...
  ... Passed --   2.2  3   68   18396   12
--- PASS: TestCount2B (2.24s)
=== RUN   TestPersist12C
Test (2C): basic persistence ...
  ... Passed --   3.4  3   98   24835    6
--- PASS: TestPersist12C (3.42s)
=== RUN   TestPersist22C
Test (2C): more persistence ...
  ... Passed --  15.2  5 1284  275938   16
--- PASS: TestPersist22C (15.24s)
=== RUN   TestPersist32C
Test (2C): partitioned leader and one follower crash, leader restarts ...
  ... Passed --   1.6  3   40    9895    4
--- PASS: TestPersist32C (1.61s)
=== RUN   TestFigure82C
Test (2C): Figure 8 ...
  ... Passed --  31.9  5 1224  249886   35
--- PASS: TestFigure82C (31.94s)
=== RUN   TestUnreliableAgree2C
Test (2C): unreliable agreement ...
  ... Passed --   3.6  5  220   77871  246
--- PASS: TestUnreliableAgree2C (3.56s)
=== RUN   TestFigure8Unreliable2C
Test (2C): Figure 8 (unreliable) ...
  ... Passed --  37.8  5 5612 19449598  533
--- PASS: TestFigure8Unreliable2C (37.82s)
=== RUN   TestReliableChurn2C
Test (2C): churn ...
  ... Passed --  16.2  5 1032 1224304  556
--- PASS: TestReliableChurn2C (16.21s)
=== RUN   TestUnreliableChurn2C
Test (2C): unreliable churn ...
  ... Passed --  16.2  5  936  441053  126
--- PASS: TestUnreliableChurn2C (16.22s)
=== RUN   TestSnapshotBasic2D
Test (2D): snapshots basic ...
  ... Passed --   4.5  3  142   50832  251
--- PASS: TestSnapshotBasic2D (4.49s)
=== RUN   TestSnapshotInstall2D
Test (2D): install snapshots (disconnect) ...
  ... Passed --  66.9  3 2216  608901  377
--- PASS: TestSnapshotInstall2D (66.90s)
=== RUN   TestSnapshotInstallUnreliable2D
Test (2D): install snapshots (disconnect+unreliable) ...
  ... Passed --  72.9  3 2428  628080  387
--- PASS: TestSnapshotInstallUnreliable2D (72.87s)
=== RUN   TestSnapshotInstallCrash2D
Test (2D): install snapshots (crash) ...
  ... Passed --  35.9  3 1074  319369  413
--- PASS: TestSnapshotInstallCrash2D (35.88s)
=== RUN   TestSnapshotInstallUnCrash2D
Test (2D): install snapshots (unreliable+crash) ...
  ... Passed --  56.3  3 1599  427570  353
--- PASS: TestSnapshotInstallUnCrash2D (56.28s)
PASS

执行了100次,还是失败了10次左右,失败都是集中在2D,大致看了一下基本都是因为日志数组越界的问题,还有很大的优化空间。

个人认为关于难度:2B>2D>2A>2C。2A相当于基石,关于选举超时、状态变更触发相应操作的内容都是之后lab的基础,要保证无误才可以顺利的进行后面的实验;2B感觉是最Raft内最复杂的部分,2B能完成的话,后面就水到渠成了;2C只要对持久化时机能够正确理解,基本就可以实现;2D要理解快照优化日志的原理,以及实验内设置的几个关于快照的初始函数的意义和调用过程,认真理解Hint也差不多了,我因为2D卡了好几天,还是要耐心读日志解决bug。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐