etcd-raft-follower
to be cont 先写部分吧
MsgProp
Follower 收到 MsgProp 消息时,有成员发起选举,将该请求转发至 Leader;消息先 append 到 raft.msgs slice 中,注意后续所说的消息发送,均为 append 到 msgs 中,并未产生实际发送
MsgApp
Follower 收到 MsgApp 消息时,即有 Entries 写入时,重置 electionElapsed 为 0,设置其 Leader 为消息来的成员的 ID;调用 handleAppendEntries 方法处理 MsgApp 消息;handleAppendEntries 方法中向 m.From 发送 MsgAppResp 消息;消息中包含经过处理 MsgApp 后,当前节点的 Index;冲突时额外返回 Reject: true,RejectHint: lastIndex
maybeAppend: handleAppendEntries 方法中使用到的 maybeAppend 方法分析
(1)
firstIndex 会尝试从 unstable 的 snapshot 中获取 snapshot meta Index,如果 snapshot 为 nil(maybeFirstIndex),则从 storage 中获取 ents[0].Index
(2)
lastIndex 会尝试从 unstable 的 ents 中获取最后一个 entry 的 index,如果 unstable 的 ents 为空,则获取 unstable 的 snapshot meta index(maybeLastIndex),如果仍然获取不到,则从 storage 中获取最后一个 entry 的 Index
看明白最基础的方法 firstIndex 和 lastIndex 后,继续往下
(3)
term 尝试获取 index 为 i 的 entry 的 term,entries 的第一个 index 为 dummy index,即每次收到 MsgApp 消息时,m.Index 为 dummy entry (index),后续为真正的 entries (m.Ents);dummy index <= i <= lastIndex,如果 index i 不位于该范围中,显然无法找到对应的 term;maybeTerm 尝试在 unstable 中获取 index 为 i 的 entry 的 term,unstable 中无法找到的话,从 storage 中查找
(4)
matchTerm(i uint64, term Term) 的实现,首先尝试获取 index i 的 term,随后匹配是否等于 term
(5)
findConflict 的实现,对 ents 中的每个 entry 调用 matchTerm 方法,Index 升序遍历,遇到 unmatch的 (即遇到相同 Index 不同 Term 的 entry 认为 conflict),如果这个 unmatch 的 entry 的 Index <= lastIndex,则有 conflict,返回第一个 conflict entry 的 Index;如果这个 unmatch 的 entry 的 Index > lastIndex,则认为是新的未包含的 entry,则返回第一个新的 entry 的 Index;如果均 match 则返回 0
(6)
maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries…) 的实现,内部会首先判断 matchTerm(m.Index, m.LogTerm),过了之后,会对每个 entry findConflict,没有 confict 则没啥好添加的,有 conflict,可能是真 conlict 也可能是包含了新的 entries,统一调用 append 方法加入到 unstable 中;注意这是 Follower 的行为,Follower 会使用 Leader 发来的 MsgApp 改写自己本地的 entries;Leader 发来的 MsgApp 中包含了其已经 commited 的 Index 信息,Follower 使用 commited 和 MsgApp 中的最后 Index 中小的那个 Index 作为能 committed 的 index
lastnewi = index + uint64(len(ents))
commitTo(min(commited, lastnewi))
如果 tocommit > l.lastIndex() 会 panic
(7)
综上
- 接收到 MsgHeartbeat 消息会更新 commited
- 接收到 MsgApp 消息可能会更新 commited
FAQ: unstable 什么时候会 stable?
在 node 的 main for loop 中,首先会从 raftlog 中获取 ready to apply 的 entries (即 unstable 和 nextEnts),将其放入 readyc 通道后,等待 advancec 通道消息;当外部 apply 结束后,调用 node.Advance() 方法,node 获取到 advancec 通道中的消息,开始执行 raftlog 的 apply 更新 apply index 和 stable to 将 unstable 变为 stable
raftlog 的逻辑图如下 (没写 snapshot 部分)
MsgHeartbeat
Follower 收到 MsgHeartbeat 消息时,重置 electionElapsed 为 0,设置其 Leader 为消息来的成员的 ID;commitTo MsgHeartbeat 中的 commited index,并向 Leader 回复 MsgHeartbeatResp 消息
MsgReadIndex
Follower 收到 MsgReadIndex 消息,将请求转发至 Leader;Leader 返回 MsgReadIndexResp 消息,有且仅有 1 Entry,返回已到达一致性的 Index (consistency=l);线性 / 序列化,貌似是个术语待查
Follower 收到 MsgReadIndexResp 消息,有且仅有 1 Entry,将其加入 readStates 中
1 | r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data}) |
FAQ: 那么 msgs 什么时候会被发送
回到 node 的 main forloop,在 rd = newReady(r, prevSoftSt, prevHardSt) 方法中会读取 r.msgs,并设置 n.readyc 通道,后续将 rd 放入 readyc 通道中,等待外部消费;外部通过 node.Ready() 方法获得内部需 apply or 待发送的 Messages
在 etcdserver/raft.go 的 main forloop 中获取 readyc 通道消息 rd := <-r.Ready(),该 apply Messages 放入到 r.applyc 通道,该发送的 r.Messages,调用 r.sendMessages(rd.Messages) 发送,结束之后调用 r.Advance()
另外在 etcdserver/server.go 的 main forloop 中获取 ap ap := <-s.r.apply(),将这次 apply 放入 FIFO 中,FIFO 内部协程异步处理 apply job