// Validate the meta pages. We only return an error if both meta pages fail // validation, since meta0 failing validation means that it wasn't saved // properly -- but we can recover using meta1. And vice-versa. err0 := db.meta0.validate() err1 := db.meta1.validate() if err0 != nil && err1 != nil { return err0 }
说直接一点儿,boltdb 使用 mmap 把一个名为 db 的文件映射到内存中的 []byte 数组,然后直接操作内存,但是不管怎么说是外部存储,最终还是要落盘的,所以呢推测是用了 B+ tree,然后把写入的内容组织成 page,使用指针操作,这块代码有点像 C 其实
// allocate returns the starting page id of a contiguous list of pages of a given size. // If a contiguous block cannot be found then 0 is returned. func(f *freelist)allocate(n int)pgid { iflen(f.ids) == 0 { return0 } var initial, previd pgid for i, id := range f.ids { if id <= 1 { panic(fmt.Sprintf("invalid page allocation: %d", id)) } // Reset initial page if this is not contiguous. if previd == 0 || id-previd != 1 { initial = id } // If we found a contiguous block then remove it and return it. if (id-initial)+1 == pgid(n) { // If we're allocating off the beginning then take the fast path // and just adjust the existing slice. This will use extra memory // temporarily but the append() in free() will realloc the slice // as is necessary. // 将已分配的连续页移出 free list 记录表 (ids) // 并释放 ids 空间 if (i + 1) == n { f.ids = f.ids[i+1:] } else { copy(f.ids[i-n+1:], f.ids[i+1:]) f.ids = f.ids[:len(f.ids)-n] } // Remove from the free cache. // 移出 free list cache for i := pgid(0); i < pgid(n); i++ { delete(f.cache, initial+i) } // 返回起始页 return initial } previd = id } return0 }
// read initializes the freelist from a freelist page. func(f *freelist)read(p *page) { // If the page.count is at the max uint16 value (64k) then it's considered // an overflow and the size of the freelist is stored as the first element. idx, count := 0, int(p.count) if count == 0xFFFF { idx = 1 count = int(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0]) } // Copy the list of page ids from the freelist. if count == 0 { f.ids = nil } else { ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx:count] f.ids = make([]pgid, len(ids)) copy(f.ids, ids) // Make sure they're sorted. sort.Sort(pgids(f.ids)) } // Rebuild the page cache. f.reindex() }
// read initializes the freelist from a freelist page. func(f *freelist)read(p *page) { // If the page.count is at the max uint16 value (64k) then it's considered // an overflow and the size of the freelist is stored as the first element. // page 中的 count 怎么理解 ? // 看着像是 page 下面维护着一系列 pgid idx, count := 0, int(p.count) if count == 0xFFFF { idx = 1 count = int(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0]) } // Copy the list of page ids from the freelist. if count == 0 { f.ids = nil } else { ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx:count] f.ids = make([]pgid, len(ids)) copy(f.ids, ids) // copy 结束之后,是否可以设置 ids = nil,帮助 gc ? // Make sure they're sorted. sort.Sort(pgids(f.ids)) } // Rebuild the page cache. f.reindex() }
freelist 的 reindex 实现,其实就是构造 cache,很直接的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// reindex rebuilds the free cache based on available and pending free lists. func(f *freelist)reindex() { // 既然已知大小的话,make 的时候为啥不指定 capacity // 好吧,我晕了,这个是 map,怎么指定大小?naive f.cache = make(map[pgid]bool) for _, id := range f.ids { f.cache[id] = true } // pending 记录了 tx 中使用过,未被释放的 page id for _, pendingIDs := range f.pending { for _, pendingID := range pendingIDs { f.cache[pendingID] = true } } }
func(db *DB)beginRWTx()(*Tx, error) { // If the database was opened with Options.ReadOnly, return an error. if db.readOnly { returnnil, ErrDatabaseReadOnly } // Obtain writer lock. This is released by the transaction when it closes. // This enforces only one writer transaction at a time. db.rwlock.Lock() // Once we have the writer lock then we can lock the meta pages so that // we can set up the transaction. db.metalock.Lock() defer db.metalock.Unlock() // Exit if the database is not open yet. if !db.opened { db.rwlock.Unlock() returnnil, ErrDatabaseNotOpen } // Create a transaction associated with the database. t := &Tx{writable: true} t.init(db) db.rwtx = t // Free any pages associated with closed read-only transactions. // 获取当前事务中的最小 txid var minid txid = 0xFFFFFFFFFFFFFFFF for _, t := range db.txs { if t.meta.txid < minid { minid = t.meta.txid } } // 释放该 txid 之前的 page if minid > 0 { db.freelist.release(minid - 1) } return t, nil }
查看 db.freelist.release 的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// release moves all page ids for a transaction id (or older) to the freelist. func(f *freelist)release(txid txid) { m := make(pgids, 0) for tid, ids := range f.pending { if tid <= txid { // Move transaction's pending pages to the available freelist. // Don't remove from the cache since the page is still free. m = append(m, ids...) delete(f.pending, tid) } } sort.Sort(m) // 可重新使用的 pending page 与当前可使用的 page merge sort f.ids = pgids(f.ids).merge(m) }
leafPageElement 结构
1 2 3 4 5 6 7 8
type leafPageElement struct { flags uint32// 4 bytes; 2 leafElement / 1 branchElement / 4 meta / pos uint32// 4 bytes ksize uint32// 4 bytes vsize uint32// 4 bytes // pos = 16 that remain space to store key and value // for example ksize = 8 that 64 bytes for key }
如图所示
即叶子节点中存储了 key 和 value
etcd
查看 mvcc/kvstore.go 的 func (s *store) put(key, value []byte, leaseID lease.LeaseID) 方法
func(s *kvServer)Put(ctx context.Context, r *pb.PutRequest)(*pb.PutResponse, error)
v3_server.go
1
func(s *EtcdServer)Put(ctx context.Context, r *pb.PutRequest)(*pb.PutResponse, error)
v3_server.go
1
func(s *EtcdServer)processInternalRaftRequest(ctx context.Context, r pb.InternalRaftRequest)(*applyResult, error)
v3_server.go
1
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error)
注册一个等待 id;完成之后调用 s.w.Trigger 触发完成 or GC ch := s.w.Register(id)
raft propose (提议写入数据)
1 2
// propose PutRequest s.r.Propose(cctx, data)
node.go
1
func (n *node) Propose(ctx context.Context, data []byte) error
n.step 往 propc 通道传入数据
node run main roop
1 2 3 4 5 6 7 8 9
func(n *node)run(r *raft) { ... case m := <-propc: r.logger.Infof("handle propc message") m.From = r.id r.Step(m) ... }
raft/raft.go
1 2 3 4 5 6 7 8
func(r *raft)Step(m pb.Message)error { ... default: r.step(r, m) ... }
r.step(r, m)
leader 和 follower 行为不同
对于 follower 来说
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
funcstepFollower(r *raft, m pb.Message) { ... case pb.MsgProp: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) return } // forward to leader m.To = r.lead // just append to raft pb.Message ? r.send(m) ... }
func(r *raftNode)start(rh *raftReadyHandler) { ... case rd := <-r.Ready(): if rd.SoftState != nil { // lead has changed if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead { r.mu.Lock() r.lt = time.Now() r.mu.Unlock() // prometheus record the count of leader changes leaderChanges.Inc() } if rd.SoftState.Lead == raft.None { hasLeader.Set(0) } else { hasLeader.Set(1) } // store current seen leader atomic.StoreUint64(&r.lead, rd.SoftState.Lead) islead = rd.RaftState == raft.StateLeader // raft handler rh.updateLeadership() } iflen(rd.ReadStates) != 0 { select { case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]: case <-time.After(internalTimeout): plog.Warningf("timed out sending read state") case <-r.stopped: return } } raftDone := make(chanstruct{}, 1) ap := apply{ entries: rd.CommittedEntries, snapshot: rd.Snapshot, raftDone: raftDone, } updateCommittedIndex(&ap, rh) select { case r.applyc <- ap: case <-r.stopped: return } // the leader can write to its disk in parallel with replicating to the followers and them // writing to their disks. // For more details, check raft thesis 10.2.1 if islead { // gofail: var raftBeforeLeaderSend struct{} r.sendMessages(rd.Messages) } // gofail: var raftBeforeSave struct{} if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { plog.Fatalf("raft save state and entries error: %v", err) } if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) } // gofail: var raftAfterSave struct{} if !raft.IsEmptySnap(rd.Snapshot) { // gofail: var raftBeforeSaveSnap struct{} if err := r.storage.SaveSnap(rd.Snapshot); err != nil { plog.Fatalf("raft save snapshot error: %v", err) } // gofail: var raftAfterSaveSnap struct{} r.raftStorage.ApplySnapshot(rd.Snapshot) plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index) // gofail: var raftAfterApplySnap struct{} } r.raftStorage.Append(rd.Entries) if !islead { // gofail: var raftBeforeFollowerSend struct{} r.sendMessages(rd.Messages) } raftDone <- struct{}{} r.Advance() ... }
该部分会将 apply 的 message 放入 applc 通道中,最终由
server.go
1 2 3 4 5 6 7
func(s *EtcdServer)run() { ... case ap := <-s.r.apply(): f := func(context.Context) { s.applyAll(&ep, &ap) } sched.Schedule(f) ... }
func(fp *filePipeline)run() { deferclose(fp.errc) for { f, err := fp.alloc() if err != nil { fp.errc <- err return } select { // fp.filec 大小为 1 case fp.filec <- f: case <-fp.donec: os.Remove(f.Name()) f.Close() return } } }
查看 fp.alloc() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14
func(fp *filePipeline)alloc()(f *fileutil.LockedFile, err error) { // count % 2 so this file isn't the same as the one last published fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2)) if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil { returnnil, err } if err = fileutil.Preallocate(f.File, fp.size, true); err != nil { plog.Errorf("failed to allocate space when creating new wal file (%v)", err) f.Close() returnnil, err } fp.count++ return f, nil }
可见预生成了 [0-1].tmp 文件,并对该文件加了锁,待调用 fp.Open() 方法获取使用
Cut 方法
wal 文件大小上限为 64MB
因此当写入消息之后, wal 文件大小 > 64MB 时,会调用 cut 方法
截断之前的 wal 文件,并生成新的 wal 文件用于写入
cut 的整体思路
截断当前使用的 wal 文件
从 file pipeline 中获取 tmp 文件
向 tmp 文件中写入必要的 headers
将 tmp 文件 rename to wal 文件,新文件名为 walName(w.seq()+1, w.enti+1)
// peer is the representative of a remote raft node. Local raft node sends // messages to the remote through peer. // Each peer has two underlying mechanisms to send out a message: stream and // pipeline. // A stream is a receiver initialized long-polling connection, which // is always open to transfer messages. Besides general stream, peer also has // a optimized stream for sending msgApp since msgApp accounts for large part // of all messages. Only raft leader uses the optimized stream to send msgApp // to the remote follower node. // A pipeline is a series of http clients that send http requests to the remote. // It is only used when the stream has not been established.
stream 每 100 ms 会重新尝试 dial remote peer,如果出现 request sent was ignored (cluster ID mismatch: remote[remote member id]=X-Etcd-Cluster-ID in http header, local=local cluster id) 错误的话,那么这个错误日志的打印频率将会很高,需要及时处理
上述方法从 –initial-cluster-token and –initial-cluster 这个两个启动参数中生成 Cluster ID 和各个 Member ID
NewClusterFromURLsMap 这个方法中调用 NewMember 生成 Member ID
首先来看 NewMember 方法
1
func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member
核心思路
1 2 3
b []byte: peerUrls + clusterName + time hash := sha1.Sum(b) memberID: binary.BigEndian.Uint64(hash[:8])
Member ID 根据 peerUrls / clusterName / current_time 的 sha1 sum 值,取其前 8 个 bytes,为 16 位的 hex 数
回到 NewClusterFromURLsMap 方法中的 NewMember(代码如下)可见最后一个参数为 nil,即不加入时间因素,因此 NewClusterFromURLsMap 生成的 Member ID 是固定的
1
m := NewMember(name, urls, token, nil)
Member Add 生成的 Member ID
直接从 server 端看起 —— etcdserver/api/v3rpc/member.go 中的 MemberAdd 方法
可见如下代码
1 2 3 4 5 6 7 8 9
urls, err := types.NewURLs(r.PeerURLs) if err != nil { returnnil, rpctypes.ErrGRPCMemberBadURLs } now := time.Now() m := membership.NewMember("", urls, "", &now) if err = cs.server.AddMember(ctx, *m); err != nil { returnnil, togRPCError(err) }
m := membership.NewMember(“”, urls, “”, &now) 加入了当前时间,因此 Member ID 是不确定的
总结
cluster ID 仅生成一次,此后不会变化
通过 etcd 启动参数生成 (initial-cluster) 的 Member ID 固定
通过 Member add 生成的 Member ID 不确定
Member add 的时候,没有传递 member 的 name,因此 member add 成功时,member list 出来的 member item,新加入的 member 其 name 为空,且没有 client url,因该 member 尚未 publish 其 client url 到集群中
ReleaseLockTo releases the locks, which has smaller index than the given index except the largest one among them. For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4.