func(s *kvServer)Put(ctx context.Context, r *pb.PutRequest)(*pb.PutResponse, error)
v3_server.go
1
func(s *EtcdServer)Put(ctx context.Context, r *pb.PutRequest)(*pb.PutResponse, error)
v3_server.go
1
func(s *EtcdServer)processInternalRaftRequest(ctx context.Context, r pb.InternalRaftRequest)(*applyResult, error)
v3_server.go
1
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error)
注册一个等待 id;完成之后调用 s.w.Trigger 触发完成 or GC ch := s.w.Register(id)
raft propose (提议写入数据)
1 2
// propose PutRequest s.r.Propose(cctx, data)
node.go
1
func (n *node) Propose(ctx context.Context, data []byte) error
n.step 往 propc 通道传入数据
node run main roop
1 2 3 4 5 6 7 8 9
func(n *node)run(r *raft) { ... case m := <-propc: r.logger.Infof("handle propc message") m.From = r.id r.Step(m) ... }
raft/raft.go
1 2 3 4 5 6 7 8
func(r *raft)Step(m pb.Message)error { ... default: r.step(r, m) ... }
r.step(r, m)
leader 和 follower 行为不同
对于 follower 来说
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
funcstepFollower(r *raft, m pb.Message) { ... case pb.MsgProp: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) return } // forward to leader m.To = r.lead // just append to raft pb.Message ? r.send(m) ... }
func(r *raftNode)start(rh *raftReadyHandler) { ... case rd := <-r.Ready(): if rd.SoftState != nil { // lead has changed if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead { r.mu.Lock() r.lt = time.Now() r.mu.Unlock() // prometheus record the count of leader changes leaderChanges.Inc() } if rd.SoftState.Lead == raft.None { hasLeader.Set(0) } else { hasLeader.Set(1) } // store current seen leader atomic.StoreUint64(&r.lead, rd.SoftState.Lead) islead = rd.RaftState == raft.StateLeader // raft handler rh.updateLeadership() } iflen(rd.ReadStates) != 0 { select { case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]: case <-time.After(internalTimeout): plog.Warningf("timed out sending read state") case <-r.stopped: return } } raftDone := make(chanstruct{}, 1) ap := apply{ entries: rd.CommittedEntries, snapshot: rd.Snapshot, raftDone: raftDone, } updateCommittedIndex(&ap, rh) select { case r.applyc <- ap: case <-r.stopped: return } // the leader can write to its disk in parallel with replicating to the followers and them // writing to their disks. // For more details, check raft thesis 10.2.1 if islead { // gofail: var raftBeforeLeaderSend struct{} r.sendMessages(rd.Messages) } // gofail: var raftBeforeSave struct{} if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { plog.Fatalf("raft save state and entries error: %v", err) } if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) } // gofail: var raftAfterSave struct{} if !raft.IsEmptySnap(rd.Snapshot) { // gofail: var raftBeforeSaveSnap struct{} if err := r.storage.SaveSnap(rd.Snapshot); err != nil { plog.Fatalf("raft save snapshot error: %v", err) } // gofail: var raftAfterSaveSnap struct{} r.raftStorage.ApplySnapshot(rd.Snapshot) plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index) // gofail: var raftAfterApplySnap struct{} } r.raftStorage.Append(rd.Entries) if !islead { // gofail: var raftBeforeFollowerSend struct{} r.sendMessages(rd.Messages) } raftDone <- struct{}{} r.Advance() ... }
该部分会将 apply 的 message 放入 applc 通道中,最终由
server.go
1 2 3 4 5 6 7
func(s *EtcdServer)run() { ... case ap := <-s.r.apply(): f := func(context.Context) { s.applyAll(&ep, &ap) } sched.Schedule(f) ... }