Lab 2C目前感觉过来是最考验debug能力的,因为这个实验中的网络测试环境极其不稳定,十分考验程序的网络容错性。需要处理大量的高延迟的网络包,频繁的出现网络分区,节点崩溃的问题。
因此,这个实验主要的代码部分在于debug找到之前的代码漏洞,而新加的功能部分所需的代码量很少。同时主要需要改进的部分就是Log同步这一步骤,需要我们快速定位leader和follower之间log一致的位置。
此外,本次测试过程中还是发现了节点死锁的问题,找了半天又是发现了有一处地方,锁的申请顺序和前面的申请顺序是相反的,在本次实验中也改过来了,同时在投票机制也进行改进,让leader election更加迅速。
注意:之前Lab2B处的部分代码是有问题的,在本实验中发现并修改了。
介绍
Lab 2C就是让我们实现persist持久化,让节点的必要信息保存下来,确保节点在崩溃后,重启可以恢复。哪些信息需要保存,在paper中都已经明确指出。同样,实验说明中明确指出,这实验需要特别注意的就是leader和follower之间进行log replication时如何进行快速定位一致点,这里不改进的话,测试过程中就会超时。
<p><pre> go test -run 2C
# 这个就是用来进行Lab 2C的测试指令</pre></p>
注意事项
1、为了更加快速的进行log replication,我们需要对ae包的结构体进行改进,让它能够携带更加信息,从而进行优化log复制的效率。
2、在实验测试过程中发现我之前的leader election的投票机制并不完善,导致了在本实验中的这种极端情况下,选举需要大量的轮次。这个极端情况就是,一共有2n+1个节点,发生网络分区,有n+1个节点在一个网络分区内,若需要选举出leader,需要这n+1个节点都一致投给一个节点。
3、协程处理过程中涉及到共享变量的时候,不论是读取还是修改,都需要先检查本协程是否已经过时了,若已经过时则需要立刻终止该线程,否则拿着过时的状态信息去和共享变量进行处理会导致bug。
4、部分协程可能不会造成程序的错误,但是可能会一直存活着,就好比内存泄漏,后续协程越来越多,一样会导致程序崩溃。
本次实验内容
1、完善persist函数
2、完善readPersist函数
3、改进RequestVote函数
4、改进AppendEntries函数以及AppendEntriesReply结构体
5、改进Start函数
6、修复ticker函数中的一个低级错误
代码阶段 完善persist函数
这个函数十分简单,就是把节点必要的信息进行持久化保存。
代码如下:
<p><pre> <code class="hljs">func (rf *Raft) persist() {
// Your code here (2C).
// Example:
// w := new(bytes.Buffer)
// e := labgob.NewEncoder(w)
// e.Encode(rf.xxx)
// e.Encode(rf.yyy)
// data := w.Bytes()
// rf.persister.SaveRaftState(data)
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.voteFor)
e.Encode(rf.log)
data := w.Bytes()
rf.persister.SaveRaftState(data)
fmt.Printf("%v raft%d persist its state, term:%d, voteFor:%d, logLength:%d\n", time.Now(), rf.me, rf.currentTerm, rf.voteFor, len(rf.log))
}</code></pre></p>
完善readPersist函数
这个函数也十分简单,就是把之前保存的信息重新读取并加载给节点。
注意:不能仅仅只是把需要持久化的那几次状态给恢复了。例如,我们这里持久化了currentTerm、voteFor、log。但是我们恢复过程中,可以通过log信息来恢复节点的lastLogTerm以及lastLogIndex,这两个信息也需要恢复,不然就会出现问题。
代码如下:
<p><pre> <code class="language-Go">func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
// Your code here (2C).
// Example:
// r := bytes.NewBuffer(data)
// d := labgob.NewDecoder(r)
// var xxx
// var yyy
// if d.Decode(&xxx) != nil ||
// d.Decode(&yyy) != nil {
// error...
// } else {
// rf.xxx = xxx
// rf.yyy = yyy
// }
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var term int
var voteFor int
var log []LogEntry
if d.Decode(&term) != nil || d.Decode(&voteFor) != nil || d.Decode(&log) != nil {
fmt.Printf("Error: raft%d readPersist.", rf.me)
} else {
rf.mu.Lock()
rf.currentTerm = term
rf.voteFor = voteFor
rf.log = log
var logLength = len(rf.log)
fmt.Printf("%v raft%d readPersist, term:%d, voteFor:%d, logLength:%d\n", time.Now(), rf.me, rf.currentTerm, rf.voteFor, logLength)
rf.lastLogTerm = rf.log[logLength-1].Term
rf.lastLogIndex = rf.log[logLength-1].Index
rf.mu.Unlock()
}
}</code></pre></p>
改进RequestVote函数
之前的投票请求函数,我一直都忽略了某个重要前提,就是节点收到Term比自己大的节点的投票请求,即便是拒绝投票,也会将自己的term进行更新,并将自己的状态变为follower,重置timer,并把voteFor改为-1.
代码如下:
<p><pre> <code class="language-Go">func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm {
fmt.Printf("%v raft%d lastlogterm:%d lastlogindex:%d term:%d refuse raft%d lastlogterm:%d lastlogindex:%d term:%d\n", time.Now(), rf.me, rf.lastLogTerm, rf.lastLogIndex, rf.currentTerm, args.CandidateId, args.LastLogTerm, args.LastLogIndex, args.Term)
reply.VoteGrand = false
}
if args.LastLogTerm < rf.lastLogTerm {
fmt.Printf("%v raft%d lastlogterm:%d lastlogindex:%d term:%d refuse raft%d lastlogterm:%d lastlogindex:%d term:%d\n", time.Now(), rf.me, rf.lastLogTerm, rf.lastLogIndex, rf.currentTerm, args.CandidateId, args.LastLogTerm, args.LastLogIndex, args.Term)
if args.Term > rf.currentTerm {
fmt.Printf("%v raft%d update term from %d to %d\n", time.Now(), rf.me, rf.currentTerm, args.Term)
rf.currentTerm = args.Term
rf.voteFor = -1
rf.changeOpSelect(-1)
rf.state = "follwer"
rf.messageCond.Broadcast()
}
reply.VoteGrand = false
} else if args.LastLogTerm > rf.lastLogTerm {
// grant the candidate
if rf.currentTerm == args.Term && rf.voteFor != -1 {
// the follower has voted for another one
fmt.Printf("%v raft%d lastlogterm:%d lastlogindex:%d term:%d refuse raft%d lastlogterm:%d lastlogindex:%d term:%d\n", time.Now(), rf.me, rf.lastLogTerm, rf.lastLogIndex, rf.currentTerm, args.CandidateId, args.LastLogTerm, args.LastLogIndex, args.Term)
reply.VoteGrand = false
} else {
// rf.currentTerm > args.Term or the follower has not voted for anyone
if rf.currentTerm < args.Term {
fmt.Printf("%v raft%d update term from %d to %d\n", time.Now(), rf.me, rf.currentTerm, args.Term)
}
rf.currentTerm = args.Term
rf.voteFor = args.CandidateId
fmt.Printf("%v raft%d lastlogterm:%d lastlogindex:%d term:%d grand raft%d lastlogterm:%d lastlogindex:%d term:%d\n", time.Now(), rf.me, rf.lastLogTerm, rf.lastLogIndex, rf.currentTerm, args.CandidateId, args.LastLogTerm, args.LastLogIndex, args.Term)
reply.VoteGrand = true
// stop the follower from initiating election, and refresh the timer
rf.changeOpSelect(-1)
rf.state = "follower"
rf.messageCond.Broadcast()
}
} else {
// args.LastLogTerm == rf.lastLogTerm is determined
if args.LastLogIndex >= rf.lastLogIndex {
if rf.currentTerm == args.Term && rf.voteFor != -1 {
fmt.Printf("%v raft%d lastlogterm:%d lastlogindex:%d term:%d refuse raft%d lastlogterm:%d lastlogindex:%d term:%d\n", time.Now(), rf.me, rf.lastLogTerm, rf.lastLogIndex, rf.currentTerm, args.CandidateId, args.LastLogTerm, args.LastLogIndex, args.Term)
reply.VoteGrand = false
} else {
// args.Term > rf.currentTerm is determined
if rf.currentTerm < args.Term {
fmt.Printf("%v raft%d update term from %d to %d\n", time.Now(), rf.me, rf.currentTerm, args.Term)
}
rf.currentTerm = args.Term
rf.voteFor = args.CandidateId
fmt.Printf("%v raft%d lastlogterm:%d lastlogindex:%d term:%d grand raft%d lastlogterm:%d lastlogindex:%d term:%d\n", time.Now(), rf.me, rf.lastLogTerm, rf.lastLogIndex, rf.currentTerm, args.CandidateId, args.LastLogTerm, args.LastLogIndex, args.Term)
reply.VoteGrand = true
// stop the follower from initiating election, and refresh the timer
rf.changeOpSelect(-1)
rf.state = "follower"
rf.messageCond.Broadcast()
}
} else {
// args.LastLogIndex < rf.lastLogIndex
fmt.Printf("%v raft%d lastlogterm:%d lastlogindex:%d term:%d refuse raft%d lastlogterm:%d lastlogindex:%d term:%d\n", time.Now(), rf.me, rf.lastLogTerm, rf.lastLogIndex, rf.currentTerm, args.CandidateId, args.LastLogTerm, args.LastLogIndex, args.Term)
if args.Term > rf.currentTerm {
fmt.Printf("%v raft%d update term from %d to %d\n", time.Now(), rf.me, rf.currentTerm, args.Term)
rf.currentTerm = args.Term
rf.voteFor = -1
rf.changeOpSelect(-1)
rf.state = "follwer"
rf.messageCond.Broadcast()
}
reply.VoteGrand = false
}
}
}</code></pre></p>
改进AppendEntries函数以及AppendEntriesReply结构体
这里改进主要是为了加速log replication,由于测试的网络环境中有网络分区,以及节点长时间崩溃的情况,那么主从节点之间的log可能有上百个Log的差别,如果使用最简单的遍历需要在网络中进行频繁的交互将会导致时间超时。同时传统的AppendEntriesReply结构体中能够携带的信息过少,因此也需要进行改进。
改进后的AppendEntriesReply结构体如下所示:
<p><pre> <code class="language-Go">type AppendEntriesReply struct {
Term int
ConflictIndex int
Success bool
}</code></pre></p>
多了一个ConflictIndex属性,这样回复的信息中就包含index和term这两个信息,可以方便leader进行快速的Log定位。
首先需要让leader和follower查看是否有相同的term的log,因此需要先进行term的匹配。
为什么不先进行index的匹配,显然Index即使匹配上了,term也不一定相同,但是反过来,如果term匹配上了,那么一定能找到能匹配上的index。
第一阶段:follower从log数组中的最后一个log开始依次向前进行遍历,将Term从大到小向Leader进行反馈,如果发现follower和Leader有相同的Term的log,那么第一阶段就完成了。
第二阶段:还需要进行index的匹配,如果匹配上的Term=n,在n号Term的log中,如果follower的logIndex=a,leader的logIndex=b。
如果a > b,表明存在部分log是作废的,仅需将leader发来的logEntries中,从index=b开始的后续log覆盖到本节点的log中即可。
如果a < b,表明leader需要从term=n,index=a的log处开始发给follower,那么follower仅需将ConflictIndex=a反馈给leader即可。
如果a == b,这种情况就是直接成了,直接进行后续步骤即可。
代码如下:
<p><pre> <code class="language-Go">func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm {
reply.Success = false
} else {
// fmt.Printf("raft%d receive ae from leader%d\n", rf.me, args.LeaderId)
if args.Entries == nil {
// if the args.Entries is empty, it means that the ae message is a heartbeat message.
if args.LeaderCommit > rf.commitIndex {
fmt.Printf("%v raft%d update commitIndex from %d to %d\n", time.Now(), rf.me, rf.commitIndex, args.LeaderCommit)
rf.commitIndex = args.LeaderCommit
for rf.lastApplied < rf.commitIndex {
rf.lastApplied++
var applyMsg = ApplyMsg{}
applyMsg.Command = rf.log[rf.lastApplied].Command
applyMsg.CommandIndex = rf.log[rf.lastApplied].Index
applyMsg.CommandValid = true
rf.applyCh <- applyMsg
fmt.Printf("%v raft%d insert the msg%d into applyCh\n", time.Now(), rf.me, rf.lastApplied)
}
}
reply.Success = true
} else {
// if the args.Entries is not empty, it means that we should update our entries to be aligned with leader's.
var match bool = false
if args.PrevLogTerm > rf.lastLogTerm {
reply.Term = rf.lastLogTerm
reply.Success = false
} else if args.PrevLogTerm == rf.lastLogTerm {
if args.PrevLogIndex <= rf.lastLogIndex {
match = true
} else {
reply.Term = rf.lastLogTerm
reply.ConflictIndex = rf.lastLogIndex
reply.Success = false
}
} else if args.PrevLogTerm < rf.lastLogTerm {
var logIndex = len(rf.log) - 1
for logIndex >= 0 {
if rf.log[logIndex].Term > args.PrevLogTerm {
logIndex--
continue
}
if rf.log[logIndex].Term == args.PrevLogTerm {
reply.Term = args.PrevLogTerm
if rf.log[logIndex].Index >= args.PrevLogIndex {
match = true
reply.Success = true
} else {
reply.ConflictIndex = rf.log[logIndex].Index
reply.Success = false
}
break
}
if rf.log[logIndex].Term < args.PrevLogTerm {
reply.Term = rf.log[logIndex].Term
reply.Success = false
break
}
}
}
if match {
// Notice!!
// we need to consider a special situation: followers may receive a older log replication request, and followers should do nothing at that time
// so followers should ignore those out-of-date log replication requests or followers will choose to synchronized and delete lastest logs
var length = len(args.Entries)
var index = args.PrevLogIndex + length
reply.Success = true
if index < rf.lastLogIndex {
// check if the ae is out-of-date
if args.Entries[length-1].Term == rf.log[index].Term {
fmt.Printf("%v raft%d receive a out-of-date ae and do nothing. prevLogIndex:%d, length:%d from leader%d\n", time.Now(), rf.me, args.PrevLogIndex, length, args.LeaderId)
return
}
}
fmt.Printf("%v raft%d receive prevLogIndex:%d, length:%d from leader%d\n", time.Now(), rf.me, args.PrevLogIndex, length, args.LeaderId)
rf.log = rf.log[:args.PrevLogIndex+1]
rf.log = append(rf.log, args.Entries...)
// fmt.Printf("%v raft%d log:%v\n", time.Now(), rf.me, rf.log)
var logLength = len(rf.log)
rf.lastLogIndex = rf.log[logLength-1].Index
rf.lastLogTerm = rf.log[logLength-1].Term
rf.persist()
}
}
if rf.currentTerm < args.Term {
fmt.Printf("%v raft%d update term from %d to %d\n", time.Now(), rf.me, rf.currentTerm, args.Term)
}
rf.currentTerm = args.Term
rf.state = "follower"
rf.changeOpSelect(-1)
rf.messageCond.Broadcast()
}
}</code></pre></p>
改进Start函数
本函数的改进主要是在一些协程上的细节上。
在leader创建协程给follower进行log replication时,那么协程开始时需要访问rf.log这个共享变量,即使是在开始部分也需要先检查本协程是否过时,毕竟在复杂的环境中,可能leaderA一直处于被网络分区的情况下,早就有新的leaderB了,leaderA在收到command,进行log replication时网络恢复,发现了新的leader并更新了自己的状态,log等信息都改变了,那么此时这些过时的log replication的协程就必须立刻退出。
同时就是需要和AppendEntries函数的改进进行呼应,需要添加对conflictIndex的处理。
对ConflictIndex的处理十分简单,就是在follower返回的ae包中,发现有ConflictIndex,直接定位到那个index的log即可。
代码如下:
<p><pre> <code class="language-Go">func (rf *Raft) Start(command interface{}) (int, int, bool) {
index := -1
term := -1
isLeader := true
// Your code here (2B).
_, isLeader = rf.GetState()
if !isLeader {
return index, term, isLeader
}
rf.mu.Lock()
// var length = len(rf.log)
// index = rf.log[length-1].Index + 1
rf.lastLogTerm = rf.currentTerm
rf.lastLogIndex = rf.nextIndex[rf.me]
index = rf.nextIndex[rf.me]
term = rf.lastLogTerm
var peerNum = rf.peerNum
var entry = LogEntry{Index: index, Term: term, Command: command}
rf.log = append(rf.log, entry)
if command == 0 {
fmt.Printf("%v leader%d send a command:%v to update followers' log, index:%d term:%d\n", time.Now(), rf.me, command, index, term)
} else {
fmt.Printf("%v leader%d receive a command:%v, index:%d term:%d\n", time.Now(), rf.me, command, index, term)
}
rf.matchIndex[rf.me] = index
rf.nextIndex[rf.me] = index + 1
rf.persist()
// rf.mu.Unlock()
for i := 0; i < peerNum; i++ {
if i == rf.me {
continue
}
// rf.mu.Lock()
go func(id int, nextIndex int) {
var args = &AppendEntriesArgs{}
rf.mu.Lock()
if rf.currentTerm > term {
rf.mu.Unlock()
return
}
args.Entries = make([]LogEntry, 0)
// if rf.nextIndex[id] < index {
// for j := rf.nextIndex[id] + 1; j <= index; j++ {
// args.Entries = append(args.Entries, rf.log[j])
// }
// }
if nextIndex < index {
for j := nextIndex + 1; j <= index; j++ {
args.Entries = append(args.Entries, rf.log[j])
}
}
args.Term = term
args.LeaderId = rf.me
rf.mu.Unlock()
for {
var reply = &AppendEntriesReply{}
rf.mu.Lock()
if rf.currentTerm > term {
fmt.Printf("%v raft%d is no longer leader and stop sending log to raft%d\n", time.Now(), rf.me, id)
rf.mu.Unlock()
return
}
args.PrevLogIndex = rf.log[nextIndex-1].Index
args.PrevLogTerm = rf.log[nextIndex-1].Term
args.Entries = rf.log[nextIndex : index+1]
// args.Entries = append([]LogEntry{rf.log[nextIndex]}, args.Entries...)
rf.mu.Unlock()
var count = 0
for {
if count == 3 {
return
}
// if sendAE failed, retry util success
if rf.sendAppendEntries(id, args, reply) {
break
}
count++
}
rf.mu.Lock()
if reply.Term > args.Term {
// fmt.Printf("%v when sending log leader%d find a higher term, term:%d\n", time.Now(), rf.me, args.Term)
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.state = "follower"
rf.voteFor = -1
fmt.Printf("%v raft%d update its term to %d\n", time.Now(), rf.me, reply.Term)
rf.mu.Unlock()
break
}
// fmt.Printf("%v goroutine (term:%d, raft%d send log to raft%d) is out of date. Stop the goroutine.\n", time.Now(), args.Term, rf.me, id)
rf.mu.Unlock()
break
}
if !reply.Success {
if rf.log[nextIndex-1].Term > reply.Term {
for rf.log[nextIndex-1].Term > reply.Term {
nextIndex--
}
} else {
if reply.ConflictIndex != 0 {
nextIndex = reply.ConflictIndex
} else {
nextIndex--
}
}
// fmt.Printf("%v leader%d try sending nextIndex:%d log to follower%d\n", time.Now(), rf.me, nextIndex, id)
// nextIndex--
if nextIndex == 0 {
fmt.Printf("Error:leader%d send log to raft%d, length:%d \n", rf.me, id, len(args.Entries))
rf.mu.Unlock()
break
}
rf.mu.Unlock()
} else {
fmt.Printf("%v leader%d send log from %d to %d to raft%d\n", time.Now(), rf.me, nextIndex, index, id)
if rf.matchIndex[id] < index {
rf.matchIndex[id] = index
}
// we need to check if most of the raft nodes have reach a agreement.
var mp = make(map[int]int)
for _, val := range rf.matchIndex {
mp[val]++
}
var tempArray = make([]num2num, 0)
for k, v := range mp {
tempArray = append(tempArray, num2num{key: k, val: v})
}
// sort.Slice(tempArray, func(i, j int) bool {
// return tempArray.val > tempArray[j].val
// })
sort.Slice(tempArray, func(i, j int) bool {
return tempArray.key > tempArray[j].key
})
var voteAddNum = 0
for j := 0; j < len(tempArray); j++ {
if tempArray[j].val+voteAddNum >= (rf.peerNum/2)+1 {
if rf.commitIndex < tempArray[j].key {
fmt.Printf("%v %d nodes have received msg%d, leader%d update commitIndex from %d to %d\n", time.Now(), tempArray[j].val+voteAddNum, tempArray[j].key, rf.me, rf.commitIndex, tempArray[j].key)
rf.commitIndex = tempArray[j].key
for rf.lastApplied < rf.commitIndex {
rf.lastApplied++
var applyMsg = ApplyMsg{}
applyMsg.Command = rf.log[rf.lastApplied].Command
applyMsg.CommandIndex = rf.log[rf.lastApplied].Index
applyMsg.CommandValid = true
rf.applyCh <- applyMsg
fmt.Printf("%v leader%d insert the msg%d into applyCh\n", time.Now(), rf.me, rf.lastApplied)
}
break
}
}
voteAddNum += tempArray[j].val
}
// if tempArray[0].val >= (rf.peerNum/2)+1 {
// if rf.commitIndex < tempArray[0].key {
// fmt.Printf("%v %d nodes have received %d mes, leader%d update commitIndex from %d to %d\n", time.Now(), tempArray[0].val, tempArray[0].key, rf.me, rf.commitIndex, tempArray[0].key)
// rf.commitIndex = tempArray[0].key
// for rf.lastApplied < rf.commitIndex {
// rf.lastApplied++
// var applyMsg = ApplyMsg{}
// applyMsg.Command = rf.log[rf.lastApplied].Command
// applyMsg.CommandIndex = rf.log[rf.lastApplied].Index
// applyMsg.CommandValid = true
// rf.applyCh <- applyMsg
// fmt.Printf("%v leader%d insert the msg%d into applyCh\n", time.Now(), rf.me, rf.lastApplied)
// }
// }
// }
rf.mu.Unlock()
break
}
time.Sleep(10 * time.Millisecond)
}
}(i, rf.nextIndex)
// we update the nextIndex array at first time, even if the follower hasn't received the msg.
if index+1 > rf.nextIndex {
rf.nextIndex = index + 1
}
// rf.mu.Unlock()
}
rf.mu.Unlock()
return index, term, isLeader
}</code></pre></p>
修复ticker函数中的一个低级错误
就是锁的获取顺序改正了,应该统一先获取rf.mu这个lock,后续再获取rf.mesMutex这个lock。
改正区域如下所示:
<p><pre> <code class="language-Go">if voteSuccess {
rf.mu.Lock()
defer rf.mu.Unlock()
rf.mesMutex.Lock()
defer rf.mesMutex.Unlock()
timeMutex.Lock()
defer timeMutex.Unlock()
if rf.opSelect != -1 && rf.opSelect != 4 && term == rf.currentTerm && times == currentTermTimes {
// suceess means the node successfully becomes a leader
rf.opSelect = 2
rf.state = "leader"
var length = len(rf.log)
// reinitialize these two arrays after election
for i := 0; i < rf.peerNum; i++ {
rf.nextIndex = rf.log[length-1].Index + 1
rf.matchIndex = 0
}
fmt.Println("leader's nextIndex array:", rf.nextIndex)
fmt.Println("leader's matchIndex array:", rf.matchIndex)
rf.messageCond.Broadcast()
}
// if a node has collected exactly half of the votes, it will wait util timeout and initiate next election
}</code></pre></p>
实验结果图 |