MegaThinking

better tokens, better intelligence, contributing superior tokens to models

Intellij golang

用来记录一些 golang 的神奇语法及碎碎念

step over: 单步调试

删除数组中的一个元素

删除 index 元素

n.inodes = append(n.inodes[:index], n.inodes[index+1:]…)

Live 版完美,温暖和力量 ——

曾經我也想放棄治療

是因為還沒遇見到你

像你這樣的人存在這世界上,我對世界稍微有了好感

像你這樣的人存在這世界上,我對世界稍微有了期待

工作半年,“我变强了,也变秃了”,不再是小年轻的模样,已是满脸胡须

年轻真好呀,可以

跑到海角之南,只为追一首歌的回忆

天不怕地不怕的闯

藐视一切

可惜,又不可惜

梦醒的感觉而已

美好又遗憾

2017年10月13日21:59:32 杭州


三年后,此时的我,回去翻这些文字(陆陆续续写了 11 篇),只能感叹 “没有岁月可回头” ~ 希望今后有更强大的内心,去面对更多挑战

2020年08月01日00:47:47 杭州

etcd v3.1.9

to be cont.

boltdb 原哥们,不维护了貌似,coreos 的人继续维护 https://github.com/coreos/bbolt

另外这个 issues 和 free page 相关,即能解决部分 boltdb 内存碎片问题

https://github.com/coreos/bbolt/pull/3

boltdb

bolt_unix.go 几个工具方法,如 flock / funlock / mmap / munmap

bolt_linux.go fsync db 文件

入口处在 db.go 的 Open 方法,通过 Open 方法初始化一个 db 文件;对应 etcd 中 mvcc/backend/backend.go 中的 newBackend 方法的调用

1
2
3
4
5
6
7
8
9
func newBackend(path string, d time.Duration, limit int) *backend {
...
// bolt db
db, err := bolt.Open(path, 0600, boltOpenOptions)
if err != nil {
plog.Panicf("cannot open database at %s (%v)", path, err)
}
...
}

Open 方法中,如果是新建的 db 文件,则调用 db.init() 写入 metadata 信息,boltdb 的注释写的很赞,感觉就是个工业又学术的艺术品,赞!

查看 db.init() 干了什么,直接上图吧,往 db 文件中写了四个 page (size 一般是 4096 字节) 的内容

initial_db

然后就是各种初始化了,初始化 pagePool,mmap the data file as a byte slice,初始化 freelist

etcd 的 InitialMmapSize 设置为 10 1024 1024 * 1024,吓人,那是否 etcd 一启动,就会占用这么大的实际内存?并不是的哈,在 mac 上实测 top 命令看到的是 314MB,ps aux 看到的 vsz 556879888,rss 476460,很好奇,按理来说 vsz 的单位应该是 kb 呀,但是这个数字有点儿大的吓人,实际应该是字节,也就是 500 多 MB,那么 rss 又怎么理解呢?rss 的单位又是 kb,实际使用了 470 多 MB?神奇,总之并不是启动之后立马占用 10 G内存

开头写两个一样的 meta page 貌似是为了做保护,看到 mmap 的方法中有这段注释

1
2
3
4
5
6
7
8
// Validate the meta pages. We only return an error if both meta pages fail
// validation, since meta0 failing validation means that it wasn't saved
// properly -- but we can recover using meta1. And vice-versa.
err0 := db.meta0.validate()
err1 := db.meta1.validate()
if err0 != nil && err1 != nil {
return err0
}

说直接一点儿,boltdb 使用 mmap 把一个名为 db 的文件映射到内存中的 []byte 数组,然后直接操作内存,但是不管怎么说是外部存储,最终还是要落盘的,所以呢推测是用了 B+ tree,然后把写入的内容组织成 page,使用指针操作,这块代码有点像 C 其实

freelist allocate 的逻辑是从 freelist 记录的所有 free page 中分配 n 个连续的 page

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// allocate returns the starting page id of a contiguous list of pages of a given size.
// If a contiguous block cannot be found then 0 is returned.
func (f *freelist) allocate(n int) pgid {
if len(f.ids) == 0 {
return 0
}
var initial, previd pgid
for i, id := range f.ids {
if id <= 1 {
panic(fmt.Sprintf("invalid page allocation: %d", id))
}
// Reset initial page if this is not contiguous.
if previd == 0 || id-previd != 1 {
initial = id
}
// If we found a contiguous block then remove it and return it.
if (id-initial)+1 == pgid(n) {
// If we're allocating off the beginning then take the fast path
// and just adjust the existing slice. This will use extra memory
// temporarily but the append() in free() will realloc the slice
// as is necessary.

// 将已分配的连续页移出 free list 记录表 (ids)
// 并释放 ids 空间
if (i + 1) == n {
f.ids = f.ids[i+1:]
} else {
copy(f.ids[i-n+1:], f.ids[i+1:])
f.ids = f.ids[:len(f.ids)-n]
}
// Remove from the free cache.
// 移出 free list cache
for i := pgid(0); i < pgid(n); i++ {
delete(f.cache, initial+i)
}

// 返回起始页
return initial
}
previd = id
}
return 0
}

freelist 的 write 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// read initializes the freelist from a freelist page.
func (f *freelist) read(p *page) {
// If the page.count is at the max uint16 value (64k) then it's considered
// an overflow and the size of the freelist is stored as the first element.
idx, count := 0, int(p.count)
if count == 0xFFFF {
idx = 1
count = int(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0])
}
// Copy the list of page ids from the freelist.
if count == 0 {
f.ids = nil
} else {
ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx:count]
f.ids = make([]pgid, len(ids))
copy(f.ids, ids)
// Make sure they're sorted.
sort.Sort(pgids(f.ids))
}
// Rebuild the page cache.
f.reindex()
}

freelist 的 read 实现,对应 write;如何 write 的,就如何 read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// read initializes the freelist from a freelist page.
func (f *freelist) read(p *page) {
// If the page.count is at the max uint16 value (64k) then it's considered
// an overflow and the size of the freelist is stored as the first element.
// page 中的 count 怎么理解 ?
// 看着像是 page 下面维护着一系列 pgid
idx, count := 0, int(p.count)
if count == 0xFFFF {
idx = 1
count = int(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0])
}
// Copy the list of page ids from the freelist.
if count == 0 {
f.ids = nil
} else {
ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx:count]
f.ids = make([]pgid, len(ids))
copy(f.ids, ids)

// copy 结束之后,是否可以设置 ids = nil,帮助 gc ?

// Make sure they're sorted.
sort.Sort(pgids(f.ids))
}
// Rebuild the page cache.
f.reindex()
}

freelist 的 reindex 实现,其实就是构造 cache,很直接的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// reindex rebuilds the free cache based on available and pending free lists.
func (f *freelist) reindex() {
// 既然已知大小的话,make 的时候为啥不指定 capacity
// 好吧,我晕了,这个是 map,怎么指定大小?naive
f.cache = make(map[pgid]bool)
for _, id := range f.ids {
f.cache[id] = true
}
// pending 记录了 tx 中使用过,未被释放的 page id
for _, pendingIDs := range f.pending {
for _, pendingID := range pendingIDs {
f.cache[pendingID] = true
}
}
}

boltdb 在 beginRWTx 中释放空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (db *DB) beginRWTx() (*Tx, error) {
// If the database was opened with Options.ReadOnly, return an error.
if db.readOnly {
return nil, ErrDatabaseReadOnly
}
// Obtain writer lock. This is released by the transaction when it closes.
// This enforces only one writer transaction at a time.
db.rwlock.Lock()
// Once we have the writer lock then we can lock the meta pages so that
// we can set up the transaction.
db.metalock.Lock()
defer db.metalock.Unlock()
// Exit if the database is not open yet.
if !db.opened {
db.rwlock.Unlock()
return nil, ErrDatabaseNotOpen
}
// Create a transaction associated with the database.
t := &Tx{writable: true}
t.init(db)
db.rwtx = t
// Free any pages associated with closed read-only transactions.
// 获取当前事务中的最小 txid
var minid txid = 0xFFFFFFFFFFFFFFFF
for _, t := range db.txs {
if t.meta.txid < minid {
minid = t.meta.txid
}
}

// 释放该 txid 之前的 page
if minid > 0 {
db.freelist.release(minid - 1)
}
return t, nil
}

查看 db.freelist.release 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// release moves all page ids for a transaction id (or older) to the freelist.
func (f *freelist) release(txid txid) {
m := make(pgids, 0)
for tid, ids := range f.pending {
if tid <= txid {
// Move transaction's pending pages to the available freelist.
// Don't remove from the cache since the page is still free.
m = append(m, ids...)
delete(f.pending, tid)
}
}
sort.Sort(m)

// 可重新使用的 pending page 与当前可使用的 page merge sort
f.ids = pgids(f.ids).merge(m)
}

leafPageElement 结构

1
2
3
4
5
6
7
8
type leafPageElement struct {
flags uint32 // 4 bytes; 2 leafElement / 1 branchElement / 4 meta /
pos uint32 // 4 bytes
ksize uint32 // 4 bytes
vsize uint32 // 4 bytes
// pos = 16 that remain space to store key and value
// for example ksize = 8 that 64 bytes for key
}

如图所示

leafPageElement

即叶子节点中存储了 key 和 value

etcd

查看 mvcc/kvstore.go 的 func (s *store) put(key, value []byte, leaseID lease.LeaseID) 方法

查看如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
s.txnModify = true

// 每次 put revision + 1
rev := s.currentRev.main + 1
c := rev
oldLease := lease.NoLease
// if the key exists before, use its previous created and
// get its previous leaseID
_, created, ver, err := s.kvindex.Get(key, rev)
if err == nil {
c = created.main
oldLease = s.le.GetLease(lease.LeaseItem{Key: string(key)})
}
// revision to bytes
ibytes := newRevBytes()
revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes)

//
ver = ver + 1
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
d, err := kv.Marshal()
if err != nil {
plog.Fatalf("cannot marshal event: %v", err)
}

// boltdb 中的 key 为 revision
// value 为 mvccpb.KeyValue
// 存入 boltdb 即 db 文件
s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)

// 存入 key -> revision 的索引
s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})

// 这个是啥
// s.changes 什么时候释放,不然内存不会爆?
s.changes = append(s.changes, kv)
s.currentRev.sub += 1
// lease 相关代码先略去不表
}

查看 kvindex 的实现,实际上为 newTreeIndex(),即 mvcc/index.go,摘抄一句 package 注释

Package btree implements in-memory B-Trees of arbitrary degree.

okay, index 底层是 B tree

查看 kvindex.put 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (ti *treeIndex) Put(key []byte, rev revision) {
keyi := &keyIndex{key: key}
ti.Lock()
defer ti.Unlock()

// B tree get
item := ti.tree.Get(keyi)
if item == nil {
keyi.put(rev.main, rev.sub)
ti.tree.ReplaceOrInsert(keyi)
return
}

// update value in B tree
okeyi := item.(*keyIndex)
okeyi.put(rev.main, rev.sub)
}

kvindex 是个完全在内存中的索引,如果 etcd 重启了之后,需要恢复 kvindex 么?答案是需要的

etcdserver/server.go -> s.kv.Restore(newbe) -> func (s store) Restore(b backend.Backend) error {} -> func (s store) restore() error {}

在最后这个方法中从 db 文件恢复 kvindex

etcd v3.1.9

数据是如此重要,有必要看下 etcdctl restore 的实现

etcdctl 的实现均在 etcdctl 目录下,ctlv2 是 v2 的实现,ctlv3 是 v3 的实现,统一入口 main.go,通过环境变量 ETCDCTL_API 指定使用哪个版本的 etcdctl

查看 etcdctl/ctlv3/command/snapshot_command.go 中的 snapshotRestoreCommandFunc 方法

Restore 的整体过程

  1. 使用 —initial-cluster / —name / —initial-cluster-token / —initial-advertise-peer-urls 参数生成 etcd 集群及成员参数
  2. 校验参数
  3. 如果 data-dir 已经存在,报错退出
  4. 生成 etcd v3 backend db 文件 (makeDB)
  5. 生成 .wal 和 .snap 文件 (makeWALAndSnap)

makeDB

具体查看 makeDB 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// makeDB copies the database snapshot to the snapshot directory
func makeDB(snapdir, dbfile string, commit int) {
// 打开 db 文件
f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
if ferr != nil {
ExitWithError(ExitInvalidInput, ferr)
}
defer f.Close()
// get snapshot integrity hash
if _, err := f.Seek(-sha256.Size, os.SEEK_END); err != nil {
ExitWithError(ExitIO, err)
}
sha := make([]byte, sha256.Size)
if _, err := f.Read(sha); err != nil {
ExitWithError(ExitIO, err)
}
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
ExitWithError(ExitIO, err)
}

// 创建 data-dir/member/snap 目录
if err := fileutil.CreateDirAll(snapdir); err != nil {
ExitWithError(ExitIO, err)
}

// 拷贝 db 文件至 data-dir/member/snap/db
dbpath := filepath.Join(snapdir, "db")
db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600)
if dberr != nil {
ExitWithError(ExitIO, dberr)
}
if _, err := io.Copy(db, f); err != nil {
ExitWithError(ExitIO, err)
}
// truncate away integrity hash, if any.
off, serr := db.Seek(0, os.SEEK_END)
if serr != nil {
ExitWithError(ExitIO, serr)
}

// db 文件中是否存在 hash 值这块有点意思
// 看着是以 512 chunk 的方式写入的
// % 512 以后余下的字节数等于 sha256.Size 的话
// 那么 db 文件存在 hash 值
hasHash := (off % 512) == sha256.Size
if hasHash {
// 去掉 db 文件末尾的 hash 值
if err := db.Truncate(off - sha256.Size); err != nil {
ExitWithError(ExitIO, err)
}
}

// 如果既没有 hash 值,restore 参数又没有指定 --skip-hash-check
// 那么报错退出
// 注意此时已经生成了 data-dir/member/snap/db 文件
if !hasHash && !skipHashCheck {
err := fmt.Errorf("snapshot missing hash but --skip-hash-check=false")
ExitWithError(ExitBadArgs, err)
}
if hasHash && !skipHashCheck {
// check for match
if _, err := db.Seek(0, os.SEEK_SET); err != nil {
ExitWithError(ExitIO, err)
}
h := sha256.New()
if _, err := io.Copy(h, db); err != nil {
ExitWithError(ExitIO, err)
}
dbsha := h.Sum(nil)
if !reflect.DeepEqual(sha, dbsha) {
err := fmt.Errorf("expected sha256 %v, got %v", sha, dbsha)
ExitWithError(ExitInvalidInput, err)
}
}
// db hash is OK, can now modify DB so it can be part of a new cluster
db.Close()
// update consistentIndex so applies go through on etcdserver despite
// having a new raft instance
be := backend.NewDefaultBackend(dbpath)
// a lessor never timeouts leases
lessor := lease.NewLessor(be, math.MaxInt64)
s := mvcc.NewStore(be, lessor, (*initIndex)(&commit))
id := s.TxnBegin()
btx := be.BatchTx()
del := func(k, v []byte) error {
_, _, err := s.TxnDeleteRange(id, k, nil)
return err
}

// db 文件中存储了 member 的信息
// 此处删除
// delete stored members from old cluster since using new members
btx.UnsafeForEach([]byte("members"), del)
// todo: add back new members when we start to deprecate old snap file.
btx.UnsafeForEach([]byte("members_removed"), del)
// trigger write-out of new consistent index
s.TxnEnd(id)
s.Commit()
s.Close()
}

makeWALAndSnap

具体查看 makeWALAndSnap 方法,无图言 x,makeWALAndSnap 生成的 .wal 和 .snap 文件内容如下

restore_wal

代码注释如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// makeWAL creates a WAL for the initial cluster
func makeWALAndSnap(waldir, snapdir string, cl *membership.RaftCluster) {
// 新建 data-dir/member/wal 目录
if err := fileutil.CreateDirAll(waldir); err != nil {
ExitWithError(ExitIO, err)
}
// etcd v2 storage
// add members again to persist them to the store we create.
st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
cl.SetStore(st)
for _, m := range cl.Members() {
cl.AddMember(m)
}
// cluster and member metadata
// write to wal
m := cl.MemberByName(restoreName)
md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
metadata, merr := md.Marshal()
if merr != nil {
ExitWithError(ExitInvalidInput, merr)
}

// 生成初始 wal 文件
w, walerr := wal.Create(waldir, metadata)
if walerr != nil {
ExitWithError(ExitIO, walerr)
}
defer w.Close()
//
// add entries for raft start
peers := make([]raft.Peer, len(cl.MemberIDs()))
for i, id := range cl.MemberIDs() {
ctx, err := json.Marshal((*cl).Member(id))
if err != nil {
ExitWithError(ExitInvalidInput, err)
}
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
}
ents := make([]raftpb.Entry, len(peers))
nodeIDs := make([]uint64, len(peers))
for i, p := range peers {
nodeIDs[i] = p.ID
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: p.ID,
Context: p.Context}
d, err := cc.Marshal()
if err != nil {
ExitWithError(ExitInvalidInput, err)
}
e := raftpb.Entry{
Type: raftpb.EntryConfChange,
Term: 1,
Index: uint64(i + 1),
Data: d,
}
ents[i] = e
}
// add nodes entries are committed
// initial term 1
// save to wal
commit, term := uint64(len(ents)), uint64(1)
if err := w.Save(raftpb.HardState{
Term: term,
Vote: peers[0].ID,
Commit: commit}, ents); err != nil {
ExitWithError(ExitIO, err)
}
b, berr := st.Save()
if berr != nil {
ExitWithError(ExitError, berr)
}
// first snapshot
raftSnap := raftpb.Snapshot{
Data: b,
Metadata: raftpb.SnapshotMetadata{
Index: commit,
Term: term,
ConfState: raftpb.ConfState{
Nodes: nodeIDs,
},
},
}
// save snapshot
// Term: 1
// Index: The number of member in cluster
snapshotter := snap.New(snapdir)
if err := snapshotter.SaveSnap(raftSnap); err != nil {
panic(err)
}
// write to wal
if err := w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}); err != nil {
ExitWithError(ExitIO, err)
}
}

etcd 完成一次写入需要经过哪些过程?

implementation of put grpc request

key.go

1
func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)

v3_server.go

1
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)

v3_server.go

1
func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error)

v3_server.go

1
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error)

注册一个等待 id;完成之后调用 s.w.Trigger 触发完成 or GC
ch := s.w.Register(id)

raft propose (提议写入数据)

1
2
// propose PutRequest
s.r.Propose(cctx, data)

node.go

1
func (n *node) Propose(ctx context.Context, data []byte) error

n.step 往 propc 通道传入数据

node run main roop

1
2
3
4
5
6
7
8
9
func (n *node) run(r *raft) {
...

case m := <-propc:
r.logger.Infof("handle propc message")
m.From = r.id
r.Step(m)
...
}

raft/raft.go

1
2
3
4
5
6
7
8
func (r *raft) Step(m pb.Message) error {
...

default:
r.step(r, m)

...
}

r.step(r, m)

leader 和 follower 行为不同

对于 follower 来说

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func stepFollower(r *raft, m pb.Message) {
...

case pb.MsgProp:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return
}
// forward to leader
m.To = r.lead
// just append to raft pb.Message ?
r.send(m)

...
}

r.send(m) 只是把 message append 到 raft/raft.go 的 msgs []pb.Message 数组中,谁去消费这个 message ?

node.go

1
2
3
4
5
6
7
8
9
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
...
rd := Ready{
Entries: r.raftLog.unstableEntries(),
CommittedEntries: r.raftLog.nextEnts(),
Messages: r.msgs,
}
...
}

Ready 又是由谁消费的?

node.go main roop

func (n node) run(r raft)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (n *node) run(r *raft) {
...
if advancec != nil {
readyc = nil
} else {
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
}

...

case readyc <- rd:
r.logger.Infof("handle ready")
if rd.SoftState != nil {
prevSoftSt = rd.SoftState
}
if len(rd.Entries) > 0 {
prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
havePrevLastUnstablei = true
}
if !IsEmptyHardState(rd.HardState) {
prevHardSt = rd.HardState
}
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Metadata.Index
}
r.msgs = nil
r.readStates = nil
advancec = n.advancec
}

readyc 又由谁消费呢?

实际上 readyc 是 n.readyc,所以找下 n.readyc 由谁消费即可

1
2
3
4
5
6
type node struct {
...
readyc chan Ready
...
}
func (n *node) Ready() <-chan Ready { return n.readyc }

所以我们继续追寻哪里取了 Ready() 通道

终于在

etcdserver/raft.go 中发现了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
func (r *raftNode) start(rh *raftReadyHandler) {
...
case rd := <-r.Ready():
if rd.SoftState != nil {
// lead has changed
if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead {
r.mu.Lock()
r.lt = time.Now()
r.mu.Unlock()
// prometheus record the count of leader changes
leaderChanges.Inc()
}
if rd.SoftState.Lead == raft.None {
hasLeader.Set(0)
} else {
hasLeader.Set(1)
}
// store current seen leader
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
islead = rd.RaftState == raft.StateLeader
// raft handler
rh.updateLeadership()
}
if len(rd.ReadStates) != 0 {
select {
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
case <-time.After(internalTimeout):
plog.Warningf("timed out sending read state")
case <-r.stopped:
return
}
}
raftDone := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
raftDone: raftDone,
}
updateCommittedIndex(&ap, rh)
select {
case r.applyc <- ap:
case <-r.stopped:
return
}
// the leader can write to its disk in parallel with replicating to the followers and them
// writing to their disks.
// For more details, check raft thesis 10.2.1
if islead {
// gofail: var raftBeforeLeaderSend struct{}
r.sendMessages(rd.Messages)
}
// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
plog.Fatalf("raft save state and entries error: %v", err)
}
if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
}
// gofail: var raftAfterSave struct{}
if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
plog.Fatalf("raft save snapshot error: %v", err)
}
// gofail: var raftAfterSaveSnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
// gofail: var raftAfterApplySnap struct{}
}
r.raftStorage.Append(rd.Entries)
if !islead {
// gofail: var raftBeforeFollowerSend struct{}
r.sendMessages(rd.Messages)
}
raftDone <- struct{}{}
r.Advance()
...
}

该部分会将 apply 的 message 放入 applc 通道中,最终由

server.go

1
2
3
4
5
6
7
func (s *EtcdServer) run() {
...
case ap := <-s.r.apply():
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
...
}

做持久化,并且 trigger 写入结束

etcd v3.1.9

.wal 文件,即 write ahead log,wal 的实现集中在 wal 目录下

消息类型

其中 wal/walpb 目录下定义了 wal 中记录的两种消息类型: Record 和 Snapshot

1
2
3
4
5
6
7
8
9
message Record {
optional int64 type = 1 [(gogoproto.nullable) = false];
optional uint32 crc = 2 [(gogoproto.nullable) = false];
optional bytes data = 3;
}
message Snapshot {
optional uint64 index = 1 [(gogoproto.nullable) = false];
optional uint64 term = 2 [(gogoproto.nullable) = false];
}

用 pb 比较省事儿,不用自己实现对象序列化反序列化的逻辑

创建方法

wal 有两个方法会创建 wal 文件,一个是 Create 方法,另一个是 cut 方法
Create 方法会创建初始 wal 文件,名称为 0000000000000000-0000000000000000.wal

1
p := filepath.Join(tmpdirpath, walName(0, 0))

查看 Create 创建初始 wal 文件的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// keep temporary wal directory so WAL initialization appears atomic
tmpdirpath := filepath.Clean(dirpath) + ".tmp"
// 生成初始 wal 文件名
p := filepath.Join(tmpdirpath, walName(0, 0))
// 系统调用 LockFile
// 防止被 purge
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
if err != nil {
return nil, err
}
// 不是很理解这个地方为何要 seek 到文件末尾
if _, err = f.Seek(0, os.SEEK_END); err != nil {
return nil, err
}
// 预分配 wal 文件空间
// SegmentSizeBytes = 64 * 1024 * 1024
// 即 64 MB
if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
return nil, err
}
...
// 将 wal 文件加入 locks slice 中
w.locks = append(w.locks, f)
// 写入初始信息 ...
if w, err = w.renameWal(tmpdirpath); err != nil {
return nil, err
}

w.renameWal(tmpdirpath) 值得抽出来说下

1
2
3
4
5
6
7
8
9
10
11
12
13
// 删除原 wal path
if err := os.RemoveAll(w.dir); err != nil {
return nil, err
}
// 将 tmp wal path -> wal path
if err := os.Rename(tmpdirpath, w.dir); err != nil {
return nil, err
}
// 在这里初始化 FilePipeline
w.fp = newFilePipeline(w.dir, SegmentSizeBytes)
df, err := fileutil.OpenDir(w.dir)
w.dirFile = df
return w, err

初始写入 wal 内容示意如下

initial_wal

预分配空间

在 unix OS 上,首先会使用系统调用 Fallocate 预分配文件空间

如果 Fallocate 失败,则 fallback 到 preallocExtendTrunc 再次尝试分配

查看 preallocExtendTrunc 的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 移动到文件的当前读写位置,返回 offset
// 一般 curOff = 0
curOff, err := f.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
// 从当前文件的末尾处,移动至 +sizeInBytes 位置处,返回 offset
// 一般 size = 067108864
// 即 sizeInBytes 的值 64 * 1024 *1024
size, err := f.Seek(sizeInBytes, os.SEEK_END)
if err != nil {
return err
}
// 移动回文件之前的读写位置,待后续写入
if _, err = f.Seek(curOff, os.SEEK_SET); err != nil {
return err
}
// 已分配足够空间,返回 nil
// 一般 sizeInBytes == size
if sizeInBytes > size {
return nil
}
// 多分配了空间,以 sizeInBytes 截断文件
// truncate 之后,文件大小才显示为 sizeInBytes 大小
return f.Truncate(sizeInBytes)

在 darwin OS 上,首先会调用 preallocFixed,该函数中使用了系统调用 SYS_FCNTL 预先分配文件空间

如果 preallocFixed 失败,则调用 preallocExtendTrunc 再次尝试分配

编码 / 解码

wal/encoder.go 实现了写入逻辑
wal/decoder.go 实现了读取逻辑

File Pipeline

wal/file_pipeline.go 用来预创建文件,为后续生成新的 wal 文件使用

fp.Open() 在 cut 方法中被调用,cut 中的调用如下

1
2
3
4
5
// create a temp wal file with name sequence + 1, or truncate the existing one
newTail, err := w.fp.Open()
if err != nil {
return err
}

而 fp.Open() 从 fp.filec 中获取 locks file 返回

在初始化 file pipeline 方法 newFilePipeline 中启动 fp.run() 协程,查看 fp.run() 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (fp *filePipeline) run() {
defer close(fp.errc)
for {
f, err := fp.alloc()
if err != nil {
fp.errc <- err
return
}
select {
// fp.filec 大小为 1
case fp.filec <- f:
case <-fp.donec:
os.Remove(f.Name())
f.Close()
return
}
}
}

查看 fp.alloc() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
// count % 2 so this file isn't the same as the one last published
fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
return nil, err
}
if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
f.Close()
return nil, err
}
fp.count++
return f, nil
}

可见预生成了 [0-1].tmp 文件,并对该文件加了锁,待调用 fp.Open() 方法获取使用

Cut 方法

wal 文件大小上限为 64MB

因此当写入消息之后, wal 文件大小 > 64MB 时,会调用 cut 方法

截断之前的 wal 文件,并生成新的 wal 文件用于写入

cut 的整体思路

  1. 截断当前使用的 wal 文件
  2. 从 file pipeline 中获取 tmp 文件
  3. 向 tmp 文件中写入必要的 headers
  4. 将 tmp 文件 rename to wal 文件,新文件名为 walName(w.seq()+1, w.enti+1)
  5. 将新 wal 文件加入 locks slice 中,并生成 newFileEncoder 用于写入新 wal 文件

详细阅读 cut 方法(保留了原注释)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// cut closes current file written and creates a new one ready to append.
// cut first creates a temp wal file and writes necessary headers into it.
// Then cut atomically rename temp wal file to a wal file.
func (w *WAL) cut() error {
// close old wal file; truncate to avoid wasting space if an early cut
// 从 locks slice 中取最后一个 file
// seek 到当前读写位置
off, serr := w.tail().Seek(0, os.SEEK_CUR)
if serr != nil {
return serr
}
// 截断至当前读写位置
if err := w.tail().Truncate(off); err != nil {
return err
}
// 系统调用 fsync 落盘
if err := w.sync(); err != nil {
return err
}

// 生成新的 wal 文件名
// seq + 1
// index + 1
fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
// create a temp wal file with name sequence + 1, or truncate the existing one
// 从 filePipeline 中获取预创建好的 [0-1].tmp 文件
newTail, err := w.fp.Open()
if err != nil {
return err
}
// update writer and save the previous crc
// 添加至 locks slice 末尾
w.locks = append(w.locks, newTail)
prevCrc := w.encoder.crc.Sum32()
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
if err != nil {
return err
}

// 写入之前 wal 的 crc
if err = w.saveCrc(prevCrc); err != nil {
return err
}
// 写入 metadata
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
return err
}
// 写入 raft HardState
if err = w.saveState(&w.state); err != nil {
return err
}
// atomically move temp wal file to wal file
// fsync 落盘
if err = w.sync(); err != nil {
return err
}

// 获取当前位置 offset
off, err = w.tail().Seek(0, os.SEEK_CUR)
if err != nil {
return err
}

// 同一个文件系统相当于 mv
if err = os.Rename(newTail.Name(), fpath); err != nil {
return err
}
// fsync 父目录
if err = fileutil.Fsync(w.dirFile); err != nil {
return err
}

// 关闭 filePipeline 打开的 newTail
newTail.Close()

// 重新加锁
if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
return err
}

// off, err = w.tail().Seek(0, os.SEEK_CUR)
// 重新设置下次读写位置为当前位置
if _, err = newTail.Seek(off, os.SEEK_SET); err != nil {
return err
}
w.locks[len(w.locks)-1] = newTail

// 莫名,直接使用之前的 prevCrc 不可以么
prevCrc = w.encoder.crc.Sum32()
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
if err != nil {
return err
}
plog.Infof("segmented wal file %v is created", fpath)
return nil
}

所以 cut 方法初始写入 wal 的内容示意如下

cut_wal

Sync 方法

在 wal 中有如下几个地方使用了 sync 方法

  1. func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {}
  2. func (w *WAL) cut() error {}
  3. func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {}
  4. func (w *WAL) Close() error {}

sync 直接来说是使用了系统调用 fsync,确保数据写入磁盘持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (w *WAL) sync() error {
if w.encoder != nil {
if err := w.encoder.flush(); err != nil {
return err
}
}
// 记录开始时间
start := time.Now()

// 底层是系统调用
err := fileutil.Fdatasync(w.tail().File)

// 计算 fsync 耗时
duration := time.Since(start)
// 大于 1s 告警
if duration > warnSyncDuration {
plog.Warningf("sync duration of %v, expected less than %v", duration, warnSyncDuration)
}
syncDurations.Observe(duration.Seconds())
return err
}

重点看下 func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error 方法,毕竟它调用频率较高

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
...
mustSync := mustSync(st, w.state, len(ents))
//func mustSync(st, prevst raftpb.HardState, entsnum int) bool {
// return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
//}
...
curOff, err := w.tail().Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
if curOff < SegmentSizeBytes {
if mustSync {
return w.sync()
}
return nil
}
return w.cut()
}

从上: vote / term 变化,或有 entries 要写入时,调用 w.sync

etcd v3.1.9

etcd http api

clientURL/health

1
2
curl http://localhost:2379/health
{"health": "true"}%

peerURL/members

1
2
curl http://localhost:2380/members
[{"id":10276657743932975437,"peerURLs":["http://localhost:2380"],"name":"default","clientURLs":["http://localhost:2379"]}]

pipelineHandler

/raft

接收 raftpb.Message 消息,传递给 raft.Process 处理

streamHandler

检查 request id 是否已经被 remove

从 local 中获取 peer,如果不存在,则加入 remote 中 trace,并提示

1
failed to find member [mid] in cluster [cid]

检查 x-raft-to id 是否与本成员一致

(1) /raft/stream/msgapp

处理 streamTypeMsgAppV2 消息类型;建立 stream 链接,保持 w http.ResponseWriter

(2) /raft/stream/message

处理 streamTypeMessage 消息类型;建立 stream 链接,保持 w http.ResponseWriter

snapshotHandler

/raft/snapshot

probingHandler

/raft/probing

etcd v3.1.9

Introduction

基于 protocol buffer,使用 gRPC 框架实现

示例代码:https://github.com/grpc/grpc-go/tree/master/examples

简单易用

Convert .proto to .go

pb 定义位于 etcdserver/etcdserverpb/rpc.proto 中,service 中定义了一系列 RPC 方法,RPC 中的 request 和 response 则在 service 之后被定义

使用 scripts/genproto.sh 可根据 pb 文件生成 go 文件

也可以根据官方文档使用 protoc 工具,从 pb 文件生成 go 文件

由 .proto 文件生成的 go 文件有两,如

1
2
rpc.pb.go
rpc.pb.gw.go

其中 rpc.pb.gw.go 是个反向代理,是将 http 请求再封成 grpc 发送至后端;按我理解就是方便使用 curl 命令调试

详情参考 ref: https://grpc.io/blog/coreos

下图源自上述网址,清晰明了;无图言 x

grpc-rest-gateway

The entry in etcd

gRPC 的入口位于 etcdserver/api/v3rpc/grpc.go

该文件中 new 了 grpc server,注册了 pb 中定义的一系列 service,并返回 grpc server

ETCD v3 vs ETCD v2 in client listener

v3 中保有 peer http,client http 及 client grpc

因此 v3 只是对 client 的 api 变为了 gRPC,peer 之间的通信仍然沿用 v2 的套路

简单的源码阅读

链接: https://github.com/coreos/etcd/tree/release-2.3

Main

  1. 检查运行 arch
  2. parse 启动配置参数
  3. 设置日志级别(参考 admin guide debug part,可以打开 debug 级别日志,亦可单独打开特定 package 的 debug 日志)
  4. 声明停止通道 var stopped <-chan struct{}
  5. 打印 etcd 版本 / GitSHA / 编译时的 Go VERSION / 运行架构信息

检查数据目录是否存在,若存在日志提示 (只有 data-dir 存在,就会打印如下日志)

1
the server is already initialized as member before, starting as etcd member...

不管数据目录存在与否,均如此启动

1
stopped, err = startEtcd(cfg)

最后获取到停止信号停止

1
2
// etcd process stpo
<-stoped

startEtcd

首先会从 initial-cluster 中初始化 peer 信息,利用这些 peer 信息新建 peer listener,主要方法为 rafthttp.NewListener

接下来使用 listen-client-url 新建 client listener,这块直接使用 net.Listen 方法,开启 tcp 服务;然后判断系统文件描述符限制,如果 < 150,则 panic,然后限制 listener 的数量为 系统描述符限制 - 150

初始化 net.Listen 的 keepAliveListener

初始化 etcd server config

初始化 etcdserver

启动 etcdserver


注册操作系统终止回调函数 osutil.RegisterInterruptHandler(s.Stop)


使用 cors 初始化 clientHandler

使用 etcdhttp 初始化 peerHandler

goroutine 为每个 peer listener 开启 http 服务 处理请求,5 min read timeout

goroutine 为每个 client listener 开启 http 服务 处理请求,0 min read timeout // 与 golang 的一个 bug 有关

etcdhttp/peer.go

etcdhttp/client.go

其实是 http server,待阅读


最后,返回停止通道及错误

etcdserver.NewServer

初始化 etcdserver

集群信息存在 /0

key 信息存在 /1

检查数据目录版本,并更新数据目录

判断以何种方式启动

(1) !haveWAL && !cfg.NewCluster

没有 wal 目录且 new-cluster == false,此时通过从启动参数 initial-token 及 initial-cluster 配置的集群信息来访问集群(会将自己的 peer url 排除),获取到当前存在的集群信息

校验 启动参数中配置的集群信息与获取到存在的集群信息,并为 local member 设置 id

此时成员信息从 remote peer 中获取,当前 server 的 cluster id 被设置为 remote peer 中获取到 cluster id

打印启动参数 cfg.Print()

启动 raftnode

1
id, n, s, w = startNode(cfg, cl, nil)
0%