blog

[MIT 6.824 分布式系统课程] Lab2 Raft 心得

Author: ninehills
Labels: blog done
Created: 2018-02-28T09:36:26Z
Link and comments: https://github.com/ninehills/blog/issues/62

Raft struct 的成员

1 Raft 节点的角色,可以使用Go常量

const (
	Follower = iota         // 0
	Candidate		// 1
	Leader			// 2
)

2 使用Buffered Channel进行异步通信,比如等待心跳包结果等情况,我定义了如下Channel

	chanHeartbeat chan bool // 收到心跳
	chanWinVote chan bool	// 赢得选举
	chanGrantVote chan bool // 获得选举票

        chanApply chan ApplyMsg // 用来commit的channel

Make()中需要初始化Channel为Buffered Channel

	rf.chanWinVote = make(chan bool, 10)
	rf.chanGrantVote = make(chan bool, 10)
	rf.chanHeartbeat = make(chan bool, 10)

3 Raft struct大部分成员都是论文的Figure 2的内容

4 发送broadcastRequestVote后,需要进行voteCount 计数,以确定是否赢得Vote,所以 Raft struct需要增加voteCount成员

协程冲突问题

Raft有如下成员来进行加锁

mu        sync.Mutex          // Lock to protect shared access to this peer's state

使用方法一般有两种,一种是在明确的边界区进行

rf.mu.Lock()
......
rf.mu.Unlock()

还有一种是在函数头

rf.mu.Lock()
defer rf.mu.Unlock()
........

加锁时要小心加锁,以确保不会发生死锁,能不加锁就不加锁。此外测试时使用go test -race来判断是否发生冲突

善用 go 协程机制

一般用在异步触发,比如广播requestVote、AppendEntries或者异步Commit上

完整实现论文中的协议,而不要靠猜测

论文中所有的协议都需要准确实现,否则很容易出错(在互联网上找到的实现中,大部分都通不过测试),几个容易出错的地方如下,都debug了很久

// RequestVote中判断日志是否up-to-date,应该如下实现
if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) &&
			(args.LastLogTerm > rf.logs[rf.getLastIndex()].Term ||
				(args.LastLogTerm == rf.logs[rf.getLastIndex()].Term && args.LastLogIndex >= rf.getLastIndex())) {
		// Raft determines which of two logs is more up-to-date by comparing the
		// index and term of the last entries in the logs. If the logs have last
		// entries with different terms, then the log with the later term is
		// more up-to-date. If the logs end with the same term, then whichever
		// log is longer is more up-to-date.
		reply.VoteGranted = true
		rf.votedFor = args.CandidateId
	} else {
		reply.VoteGranted = false
	}
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	/*	Receiver implementation:
	1. Reply false if term < currentTerm (§5.1)
	2. Reply false if log doesn’t contain an entry at prevLogIndex
	whose term matches prevLogTerm (§5.3)
	3. If an existing entry conflicts with a new one (same index
	but different terms), delete the existing entry and all that
	follow it (§5.3)
	4. Append any new entries not already in the log
	5. If leaderCommit > commitIndex, set commitIndex =
		min(leaderCommit, index of last new entry)	*/
	rf.mu.Lock()
	defer rf.mu.Unlock()
	rf.chanHeartbeat <- true

	if args.Term > rf.currentTerm {
		rf.log("AppendEntries Term Bigger, convert to follower")
		rf.role = Follower
		rf.currentTerm = args.Term
		rf.votedFor = -1
	}
	DPrintf("1 commitIndex %d lastApplied %d LastIndex %d", rf.commitIndex, rf.lastApplied, rf.getLastIndex())
	reply.Term = rf.currentTerm
	reply.NextTryIndex = -1	//默认为-1,代表index--,此处为简化逻辑,除日志不匹配的其他情况都统一--
	if args.Term < rf.currentTerm {
		reply.Success = false
	} else if len(rf.logs) <= args.PrevLogIndex || rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
		// Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
		if len(rf.logs) <= args.PrevLogIndex{
			DPrintf("logsLen(%d) prevLogIndex(%d)",
				len(rf.logs), args.PrevLogIndex)
		} else {
			DPrintf("logsLen(%d) prevLogIndex(%d) logsTerm(%d) prevLogTerm(%d)",
				len(rf.logs), args.PrevLogIndex, rf.logs[args.PrevLogIndex].Term, args.PrevLogTerm)
		}
		if len(rf.logs) > args.PrevLogIndex {
			// 如果日志内容不匹配,找到同Term中最早的Index,直接回退到那个Index
			term := rf.logs[args.PrevLogIndex].Term
			for reply.NextTryIndex = args.PrevLogIndex - 1;
				reply.NextTryIndex > 0 && rf.logs[reply.NextTryIndex].Term == term; reply.NextTryIndex-- {}
			reply.NextTryIndex++
		} else {
			// 如果日志长度不匹配,则以当前的日志长度为准
			reply.NextTryIndex = rf.getLastIndex() + 1
		}
		reply.Success = false
	} else {
		// If an existing entry conflicts with a new one (same index but different terms),
		// delete the existing entry and all that follow it.
		// ------------------------
		// The if here is crucial. If the follower has all the entries the leader sent,
		// the follower MUST NOT truncate its log. Any elements following the entries
		// sent by the leader MUST be kept. This is because we could be receiving an
		// outdated AppendEntries RPC from the leader, and truncating the log would
		// mean “taking back” entries that we may have already told the leader that
		// we have in our log.
		// ------- wrong code -------
		// preLogs := rf.logs[:args.PrevLogIndex + 1]
		// preLogs = append(preLogs, args.Entries...)
		// rf.log(fmt.Sprintf("%d", preLogs))
		// rf.logs = preLogs
		// --------------------------
		rf.log(fmt.Sprintf("Log append: rfLogs(%d) entries(%d) preLogIndex(%d)",
			rf.logs, args.Entries, args.PrevLogIndex))
		for i := 0; i < len(args.Entries); i++ {
			if i + args.PrevLogIndex + 2 > len(rf.logs) {
				// 如果Entries 长过了 rf.logs,那么直接append
				rf.logs = append(rf.logs, args.Entries[i])
			} else if rf.logs[i + args.PrevLogIndex + 1].Term != args.Entries[i].Term {
				rf.logs = rf.logs[:i + args.PrevLogIndex + 1]
				rf.logs = append(rf.logs, args.Entries[i])
			}
		}
		rf.log(fmt.Sprintf("Log append result: rfLogs(%d)", rf.logs))
		if args.LeaderCommit > rf.commitIndex {
			if args.LeaderCommit > rf.getLastIndex() {
				rf.commitIndex = rf.getLastIndex()
			} else {
				rf.commitIndex = args.LeaderCommit
			}
			if rf.commitIndex > rf.lastApplied {
				go rf.commit()
			}
		}
		reply.Success = true
	}
	DPrintf("2 commitIndex %d lastApplied %d LastIndex %d", rf.commitIndex, rf.lastApplied, rf.getLastIndex())
	rf.log(fmt.Sprintf("reply AppendEntries from %d: %t", args.LeaderId, reply.Success))

	rf.persist()
}

// 还有  sendAppendEntries 返回成功后的处理
	if reply.Success {
		// If successful: update nextIndex and matchIndex for follower (§5.3)
		// 此处注意不能简单更新为 rf.getLastIndex(),因为sendAppendEntries是异步的,send的时候最新的index有可能已经变了
		rf.matchIndex[server] = args.PrevLogIndex + len(args.Entries)
		rf.nextIndex[server] = rf.matchIndex[server] + 1

Debug的方法

可以给Raft struce定义一个log方法,打印一些基本信息,如Term等,这样就不用每次都拼接。例如

func (rf *Raft) log(message string) {
	// 封装日志方法
	if Debug > 0 {
		log.Printf("node(%d)role(%d)term(%d)commitIndex(%d)logs(%d): %s",
			rf.me, rf.role, rf.currentTerm, rf.commitIndex, rf.logs, message)
	}
}

当然这里图省事用的string,可以换成format,参见utils.go下的DPrintf的实现

测试

测试随机性比较大,一次测试通过不能代表没问题,建议跑多轮测试都没有问题才算通过

选举超时和心跳间隔的设定

	// 选举超时,Paper中写 150–300ms 随机,测试环境是1000ms
	rand.Seed(time.Now().UnixNano())
	electionTimeout := func() time.Duration {
		return time.Duration(500 + rand.Intn(150)) * time.Millisecond
	}
	// broadcastTime << electionTimeout << MTBF(平均无故障工作时间)
	// << 代表数量级的差别
	heartBeatInterval := time.Duration(50) * time.Millisecond

选举状态机的实现

for {
		rf.log("loop start")
		rf.mu.Lock()
		role := rf.role
		rf.mu.Unlock()
		switch role {
		case Follower:
			// Respond to RPCs from candidates and leaders
			// If election timeout elapses without receiving AppendEntries RPC from current leader
			// or granting vote to candidate: convert to candidate
			select {
				case <-rf.chanHeartbeat:
				case <-rf.chanGrantVote:
				case <-time.After(electionTimeout()):
					rf.mu.Lock()
					rf.log("follower no heartbeat/vote and timeout, convert to candidate")
					rf.role = Candidate
					rf.mu.Unlock()
			}
		case Candidate:
			// On conversion to candidate, start election:
			// - Increment currentTerm
			// - Vote for self
			// - Reset election timer
			// - Send RequestVote RPCs to all other servers
			// If votes received from majority of servers: become leader
			// If AppendEntries RPC received from new leader: convert to follower
			// If election timeout elapses: start new election
			rf.mu.Lock()
			rf.currentTerm++
			rf.votedFor = rf.me
			rf.voteCount = 1	// 自己算一票
			rf.persist()
			rf.mu.Unlock()
			go rf.broadcastRequestVote()
			select {
			case <-rf.chanWinVote:
				rf.mu.Lock()
				if len(rf.chanHeartbeat) > 0 {
					<-rf.chanHeartbeat
				}
				rf.log("win vote, convert to leader")
				if rf.role != Leader {
					// 有一种情况是 chanWinVote 和 chanHeartbeat同时收到消息,这时候Go会随机选择一个case
					// 但是 chanWinVote中的数据还在,会干扰到下次选举
					// 临时的解决办法是在配置chanWinVote的时候同时设定role,此处二次验证
					rf.log("chanWinVote has outdated message, ignore it.")
					continue
				}
				// 初始化leader的变量
				rf.nextIndex = make([]int, len(rf.peers))
				rf.matchIndex = make([]int, len(rf.peers))
				for i := range rf.peers {
					rf.nextIndex[i] = rf.getLastIndex() + 1
					rf.matchIndex[i] = 0
				}
				rf.mu.Unlock()
			case <-rf.chanHeartbeat:
				rf.mu.Lock()
				rf.log("received heartbeat, convert to follower")
				rf.role = Follower
				rf.mu.Unlock()
			case <-time.After(electionTimeout()):
				rf.log("election timeout, skip")
			}
		case Leader:
			// Upon election: send initial empty AppendEntries RPCs (heartbeat) to each server;
			// 		repeat during idle periods to prevent election timeouts (§5.2)
			// If command received from client: append entry to local log,
			// 		respond after entry applied to state machine (§5.3)
			// If last log index ≥ nextIndex for a follower:
			// 		send AppendEntries RPC with log entries starting at nextIndex
			// - If successful: update nextIndex and matchIndex for follower (§5.3)
			// - If AppendEntries fails because of log inconsistency:
			// 		decrement nextIndex and retry (§5.3)
			// If there exists an N such that N > commitIndex, a majority
			// 		of matchIndex[i] ≥ N, and log[N].term == currentTerm:
			// 		set commitIndex = N (§5.3, §5.4).
			rf.broadcastAppendEntries()
			time.Sleep(heartBeatInterval)
		}
	}

追求代码的美

在完成的过程中以及完成后,因为没有人帮忙判卷,看了一些现在互联网已经有的实现作为参考,总的来说,都只是为了完成而完成,注释也好、代码组织形式也好都不是很重视。可能过上几个月就忘记了以前为什么这么写的原因,这点对于Raft协议这种实现的稍微有点差池就出大问题的,非常重要。