0%

现在环境中 etcd 数据目录独立挂盘,看磁盘命名应该是个 lv

df -h 查看到类似

/device-mapper

和 lvm 相关 https://wiki.archlinux.org/index.php/LVM

pvs https://linux.die.net/man/8/pvs

iotop

iotop -n 1 -b -o

iostat

监控变化

watch -n 2 -d “xxx”

自动执行

watch -n 2 “xxx”

Intellij golang

用来记录一些 golang 的神奇语法及碎碎念

step over: 单步调试

删除数组中的一个元素

删除 index 元素

n.inodes = append(n.inodes[:index], n.inodes[index+1:]…)

Live 版完美,温暖和力量 ——

曾經我也想放棄治療

是因為還沒遇見到你

像你這樣的人存在這世界上,我對世界稍微有了好感

像你這樣的人存在這世界上,我對世界稍微有了期待

很久没写了,这次真的情绪很低落

自己明明那么认真,然而却事与愿违

不过想想也可以理解,毕竟每个人心中重要的事情不一样,接受吧

是呢,工作半年,“我变强了,也变秃了”,不再是小年轻的模样,已是满脸胡须

年轻真好呀,可以

跑到海角之南,只为追一首歌的回忆

天不怕地不怕的闯

藐视一切

可惜,又不可惜

梦醒的感觉而已

美好又遗憾

2017年10月13日21:59:32 杭州


三年后,此时的我,回去翻这些文字(陆陆续续写了 11 篇),只能感叹 “没有岁月可回头” ~ 希望今后有更强大的内心,去面对更多挑战

2020年08月01日00:47:47 杭州

etcd v3.1.9

to be cont.

boltdb 原哥们,不维护了貌似,coreos 的人继续维护 https://github.com/coreos/bbolt

另外这个 issues 和 free page 相关,即能解决部分 boltdb 内存碎片问题

https://github.com/coreos/bbolt/pull/3

boltdb

bolt_unix.go 几个工具方法,如 flock / funlock / mmap / munmap

bolt_linux.go fsync db 文件

入口处在 db.go 的 Open 方法,通过 Open 方法初始化一个 db 文件;对应 etcd 中 mvcc/backend/backend.go 中的 newBackend 方法的调用

1
2
3
4
5
6
7
8
9
func newBackend(path string, d time.Duration, limit int) *backend {
...
// bolt db
db, err := bolt.Open(path, 0600, boltOpenOptions)
if err != nil {
plog.Panicf("cannot open database at %s (%v)", path, err)
}
...
}

Open 方法中,如果是新建的 db 文件,则调用 db.init() 写入 metadata 信息,boltdb 的注释写的很赞,感觉就是个工业又学术的艺术品,赞!

查看 db.init() 干了什么,直接上图吧,往 db 文件中写了四个 page (size 一般是 4096 字节) 的内容

initial_db

然后就是各种初始化了,初始化 pagePool,mmap the data file as a byte slice,初始化 freelist

etcd 的 InitialMmapSize 设置为 10 1024 1024 * 1024,吓人,那是否 etcd 一启动,就会占用这么大的实际内存?并不是的哈,在 mac 上实测 top 命令看到的是 314MB,ps aux 看到的 vsz 556879888,rss 476460,很好奇,按理来说 vsz 的单位应该是 kb 呀,但是这个数字有点儿大的吓人,实际应该是字节,也就是 500 多 MB,那么 rss 又怎么理解呢?rss 的单位又是 kb,实际使用了 470 多 MB?神奇,总之并不是启动之后立马占用 10 G内存

开头写两个一样的 meta page 貌似是为了做保护,看到 mmap 的方法中有这段注释

1
2
3
4
5
6
7
8
// Validate the meta pages. We only return an error if both meta pages fail
// validation, since meta0 failing validation means that it wasn't saved
// properly -- but we can recover using meta1. And vice-versa.
err0 := db.meta0.validate()
err1 := db.meta1.validate()
if err0 != nil && err1 != nil {
return err0
}

说直接一点儿,boltdb 使用 mmap 把一个名为 db 的文件映射到内存中的 []byte 数组,然后直接操作内存,但是不管怎么说是外部存储,最终还是要落盘的,所以呢推测是用了 B+ tree,然后把写入的内容组织成 page,使用指针操作,这块代码有点像 C 其实

freelist allocate 的逻辑是从 freelist 记录的所有 free page 中分配 n 个连续的 page

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// allocate returns the starting page id of a contiguous list of pages of a given size.
// If a contiguous block cannot be found then 0 is returned.
func (f *freelist) allocate(n int) pgid {
if len(f.ids) == 0 {
return 0
}
var initial, previd pgid
for i, id := range f.ids {
if id <= 1 {
panic(fmt.Sprintf("invalid page allocation: %d", id))
}
// Reset initial page if this is not contiguous.
if previd == 0 || id-previd != 1 {
initial = id
}
// If we found a contiguous block then remove it and return it.
if (id-initial)+1 == pgid(n) {
// If we're allocating off the beginning then take the fast path
// and just adjust the existing slice. This will use extra memory
// temporarily but the append() in free() will realloc the slice
// as is necessary.

// 将已分配的连续页移出 free list 记录表 (ids)
// 并释放 ids 空间
if (i + 1) == n {
f.ids = f.ids[i+1:]
} else {
copy(f.ids[i-n+1:], f.ids[i+1:])
f.ids = f.ids[:len(f.ids)-n]
}
// Remove from the free cache.
// 移出 free list cache
for i := pgid(0); i < pgid(n); i++ {
delete(f.cache, initial+i)
}

// 返回起始页
return initial
}
previd = id
}
return 0
}

freelist 的 write 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// read initializes the freelist from a freelist page.
func (f *freelist) read(p *page) {
// If the page.count is at the max uint16 value (64k) then it's considered
// an overflow and the size of the freelist is stored as the first element.
idx, count := 0, int(p.count)
if count == 0xFFFF {
idx = 1
count = int(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0])
}
// Copy the list of page ids from the freelist.
if count == 0 {
f.ids = nil
} else {
ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx:count]
f.ids = make([]pgid, len(ids))
copy(f.ids, ids)
// Make sure they're sorted.
sort.Sort(pgids(f.ids))
}
// Rebuild the page cache.
f.reindex()
}

freelist 的 read 实现,对应 write;如何 write 的,就如何 read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// read initializes the freelist from a freelist page.
func (f *freelist) read(p *page) {
// If the page.count is at the max uint16 value (64k) then it's considered
// an overflow and the size of the freelist is stored as the first element.
// page 中的 count 怎么理解 ?
// 看着像是 page 下面维护着一系列 pgid
idx, count := 0, int(p.count)
if count == 0xFFFF {
idx = 1
count = int(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0])
}
// Copy the list of page ids from the freelist.
if count == 0 {
f.ids = nil
} else {
ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx:count]
f.ids = make([]pgid, len(ids))
copy(f.ids, ids)

// copy 结束之后,是否可以设置 ids = nil,帮助 gc ?

// Make sure they're sorted.
sort.Sort(pgids(f.ids))
}
// Rebuild the page cache.
f.reindex()
}

freelist 的 reindex 实现,其实就是构造 cache,很直接的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// reindex rebuilds the free cache based on available and pending free lists.
func (f *freelist) reindex() {
// 既然已知大小的话,make 的时候为啥不指定 capacity
// 好吧,我晕了,这个是 map,怎么指定大小?naive
f.cache = make(map[pgid]bool)
for _, id := range f.ids {
f.cache[id] = true
}
// pending 记录了 tx 中使用过,未被释放的 page id
for _, pendingIDs := range f.pending {
for _, pendingID := range pendingIDs {
f.cache[pendingID] = true
}
}
}

boltdb 在 beginRWTx 中释放空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (db *DB) beginRWTx() (*Tx, error) {
// If the database was opened with Options.ReadOnly, return an error.
if db.readOnly {
return nil, ErrDatabaseReadOnly
}
// Obtain writer lock. This is released by the transaction when it closes.
// This enforces only one writer transaction at a time.
db.rwlock.Lock()
// Once we have the writer lock then we can lock the meta pages so that
// we can set up the transaction.
db.metalock.Lock()
defer db.metalock.Unlock()
// Exit if the database is not open yet.
if !db.opened {
db.rwlock.Unlock()
return nil, ErrDatabaseNotOpen
}
// Create a transaction associated with the database.
t := &Tx{writable: true}
t.init(db)
db.rwtx = t
// Free any pages associated with closed read-only transactions.
// 获取当前事务中的最小 txid
var minid txid = 0xFFFFFFFFFFFFFFFF
for _, t := range db.txs {
if t.meta.txid < minid {
minid = t.meta.txid
}
}

// 释放该 txid 之前的 page
if minid > 0 {
db.freelist.release(minid - 1)
}
return t, nil
}

查看 db.freelist.release 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// release moves all page ids for a transaction id (or older) to the freelist.
func (f *freelist) release(txid txid) {
m := make(pgids, 0)
for tid, ids := range f.pending {
if tid <= txid {
// Move transaction's pending pages to the available freelist.
// Don't remove from the cache since the page is still free.
m = append(m, ids...)
delete(f.pending, tid)
}
}
sort.Sort(m)

// 可重新使用的 pending page 与当前可使用的 page merge sort
f.ids = pgids(f.ids).merge(m)
}

leafPageElement 结构

1
2
3
4
5
6
7
8
type leafPageElement struct {
flags uint32 // 4 bytes; 2 leafElement / 1 branchElement / 4 meta /
pos uint32 // 4 bytes
ksize uint32 // 4 bytes
vsize uint32 // 4 bytes
// pos = 16 that remain space to store key and value
// for example ksize = 8 that 64 bytes for key
}

如图所示

leafPageElement

即叶子节点中存储了 key 和 value

etcd

查看 mvcc/kvstore.go 的 func (s *store) put(key, value []byte, leaseID lease.LeaseID) 方法

查看如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
s.txnModify = true

// 每次 put revision + 1
rev := s.currentRev.main + 1
c := rev
oldLease := lease.NoLease
// if the key exists before, use its previous created and
// get its previous leaseID
_, created, ver, err := s.kvindex.Get(key, rev)
if err == nil {
c = created.main
oldLease = s.le.GetLease(lease.LeaseItem{Key: string(key)})
}
// revision to bytes
ibytes := newRevBytes()
revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes)

//
ver = ver + 1
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
d, err := kv.Marshal()
if err != nil {
plog.Fatalf("cannot marshal event: %v", err)
}

// boltdb 中的 key 为 revision
// value 为 mvccpb.KeyValue
// 存入 boltdb 即 db 文件
s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)

// 存入 key -> revision 的索引
s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})

// 这个是啥
// s.changes 什么时候释放,不然内存不会爆?
s.changes = append(s.changes, kv)
s.currentRev.sub += 1
// lease 相关代码先略去不表
}

查看 kvindex 的实现,实际上为 newTreeIndex(),即 mvcc/index.go,摘抄一句 package 注释

Package btree implements in-memory B-Trees of arbitrary degree.

okay, index 底层是 B tree

查看 kvindex.put 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (ti *treeIndex) Put(key []byte, rev revision) {
keyi := &keyIndex{key: key}
ti.Lock()
defer ti.Unlock()

// B tree get
item := ti.tree.Get(keyi)
if item == nil {
keyi.put(rev.main, rev.sub)
ti.tree.ReplaceOrInsert(keyi)
return
}

// update value in B tree
okeyi := item.(*keyIndex)
okeyi.put(rev.main, rev.sub)
}

kvindex 是个完全在内存中的索引,如果 etcd 重启了之后,需要恢复 kvindex 么?答案是需要的

etcdserver/server.go -> s.kv.Restore(newbe) -> func (s store) Restore(b backend.Backend) error {} -> func (s store) restore() error {}

在最后这个方法中从 db 文件恢复 kvindex

etcd v3.1.9

数据是如此重要,有必要看下 etcdctl restore 的实现

etcdctl 的实现均在 etcdctl 目录下,ctlv2 是 v2 的实现,ctlv3 是 v3 的实现,统一入口 main.go,通过环境变量 ETCDCTL_API 指定使用哪个版本的 etcdctl

查看 etcdctl/ctlv3/command/snapshot_command.go 中的 snapshotRestoreCommandFunc 方法

Restore 的整体过程

  1. 使用 —initial-cluster / —name / —initial-cluster-token / —initial-advertise-peer-urls 参数生成 etcd 集群及成员参数
  2. 校验参数
  3. 如果 data-dir 已经存在,报错退出
  4. 生成 etcd v3 backend db 文件 (makeDB)
  5. 生成 .wal 和 .snap 文件 (makeWALAndSnap)

makeDB

具体查看 makeDB 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// makeDB copies the database snapshot to the snapshot directory
func makeDB(snapdir, dbfile string, commit int) {
// 打开 db 文件
f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
if ferr != nil {
ExitWithError(ExitInvalidInput, ferr)
}
defer f.Close()
// get snapshot integrity hash
if _, err := f.Seek(-sha256.Size, os.SEEK_END); err != nil {
ExitWithError(ExitIO, err)
}
sha := make([]byte, sha256.Size)
if _, err := f.Read(sha); err != nil {
ExitWithError(ExitIO, err)
}
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
ExitWithError(ExitIO, err)
}

// 创建 data-dir/member/snap 目录
if err := fileutil.CreateDirAll(snapdir); err != nil {
ExitWithError(ExitIO, err)
}

// 拷贝 db 文件至 data-dir/member/snap/db
dbpath := filepath.Join(snapdir, "db")
db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600)
if dberr != nil {
ExitWithError(ExitIO, dberr)
}
if _, err := io.Copy(db, f); err != nil {
ExitWithError(ExitIO, err)
}
// truncate away integrity hash, if any.
off, serr := db.Seek(0, os.SEEK_END)
if serr != nil {
ExitWithError(ExitIO, serr)
}

// db 文件中是否存在 hash 值这块有点意思
// 看着是以 512 chunk 的方式写入的
// % 512 以后余下的字节数等于 sha256.Size 的话
// 那么 db 文件存在 hash 值
hasHash := (off % 512) == sha256.Size
if hasHash {
// 去掉 db 文件末尾的 hash 值
if err := db.Truncate(off - sha256.Size); err != nil {
ExitWithError(ExitIO, err)
}
}

// 如果既没有 hash 值,restore 参数又没有指定 --skip-hash-check
// 那么报错退出
// 注意此时已经生成了 data-dir/member/snap/db 文件
if !hasHash && !skipHashCheck {
err := fmt.Errorf("snapshot missing hash but --skip-hash-check=false")
ExitWithError(ExitBadArgs, err)
}
if hasHash && !skipHashCheck {
// check for match
if _, err := db.Seek(0, os.SEEK_SET); err != nil {
ExitWithError(ExitIO, err)
}
h := sha256.New()
if _, err := io.Copy(h, db); err != nil {
ExitWithError(ExitIO, err)
}
dbsha := h.Sum(nil)
if !reflect.DeepEqual(sha, dbsha) {
err := fmt.Errorf("expected sha256 %v, got %v", sha, dbsha)
ExitWithError(ExitInvalidInput, err)
}
}
// db hash is OK, can now modify DB so it can be part of a new cluster
db.Close()
// update consistentIndex so applies go through on etcdserver despite
// having a new raft instance
be := backend.NewDefaultBackend(dbpath)
// a lessor never timeouts leases
lessor := lease.NewLessor(be, math.MaxInt64)
s := mvcc.NewStore(be, lessor, (*initIndex)(&commit))
id := s.TxnBegin()
btx := be.BatchTx()
del := func(k, v []byte) error {
_, _, err := s.TxnDeleteRange(id, k, nil)
return err
}

// db 文件中存储了 member 的信息
// 此处删除
// delete stored members from old cluster since using new members
btx.UnsafeForEach([]byte("members"), del)
// todo: add back new members when we start to deprecate old snap file.
btx.UnsafeForEach([]byte("members_removed"), del)
// trigger write-out of new consistent index
s.TxnEnd(id)
s.Commit()
s.Close()
}

makeWALAndSnap

具体查看 makeWALAndSnap 方法,无图言 x,makeWALAndSnap 生成的 .wal 和 .snap 文件内容如下

restore_wal

代码注释如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// makeWAL creates a WAL for the initial cluster
func makeWALAndSnap(waldir, snapdir string, cl *membership.RaftCluster) {
// 新建 data-dir/member/wal 目录
if err := fileutil.CreateDirAll(waldir); err != nil {
ExitWithError(ExitIO, err)
}
// etcd v2 storage
// add members again to persist them to the store we create.
st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
cl.SetStore(st)
for _, m := range cl.Members() {
cl.AddMember(m)
}
// cluster and member metadata
// write to wal
m := cl.MemberByName(restoreName)
md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
metadata, merr := md.Marshal()
if merr != nil {
ExitWithError(ExitInvalidInput, merr)
}

// 生成初始 wal 文件
w, walerr := wal.Create(waldir, metadata)
if walerr != nil {
ExitWithError(ExitIO, walerr)
}
defer w.Close()
//
// add entries for raft start
peers := make([]raft.Peer, len(cl.MemberIDs()))
for i, id := range cl.MemberIDs() {
ctx, err := json.Marshal((*cl).Member(id))
if err != nil {
ExitWithError(ExitInvalidInput, err)
}
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
}
ents := make([]raftpb.Entry, len(peers))
nodeIDs := make([]uint64, len(peers))
for i, p := range peers {
nodeIDs[i] = p.ID
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: p.ID,
Context: p.Context}
d, err := cc.Marshal()
if err != nil {
ExitWithError(ExitInvalidInput, err)
}
e := raftpb.Entry{
Type: raftpb.EntryConfChange,
Term: 1,
Index: uint64(i + 1),
Data: d,
}
ents[i] = e
}
// add nodes entries are committed
// initial term 1
// save to wal
commit, term := uint64(len(ents)), uint64(1)
if err := w.Save(raftpb.HardState{
Term: term,
Vote: peers[0].ID,
Commit: commit}, ents); err != nil {
ExitWithError(ExitIO, err)
}
b, berr := st.Save()
if berr != nil {
ExitWithError(ExitError, berr)
}
// first snapshot
raftSnap := raftpb.Snapshot{
Data: b,
Metadata: raftpb.SnapshotMetadata{
Index: commit,
Term: term,
ConfState: raftpb.ConfState{
Nodes: nodeIDs,
},
},
}
// save snapshot
// Term: 1
// Index: The number of member in cluster
snapshotter := snap.New(snapdir)
if err := snapshotter.SaveSnap(raftSnap); err != nil {
panic(err)
}
// write to wal
if err := w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}); err != nil {
ExitWithError(ExitIO, err)
}
}

etcd 完成一次写入需要经过哪些过程?

implementation of put grpc request

key.go

1
func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)

v3_server.go

1
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)

v3_server.go

1
func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error)

v3_server.go

1
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error)

注册一个等待 id;完成之后调用 s.w.Trigger 触发完成 or GC
ch := s.w.Register(id)

raft propose (提议写入数据)

1
2
// propose PutRequest
s.r.Propose(cctx, data)

node.go

1
func (n *node) Propose(ctx context.Context, data []byte) error

n.step 往 propc 通道传入数据

node run main roop

1
2
3
4
5
6
7
8
9
func (n *node) run(r *raft) {
...

case m := <-propc:
r.logger.Infof("handle propc message")
m.From = r.id
r.Step(m)
...
}

raft/raft.go

1
2
3
4
5
6
7
8
func (r *raft) Step(m pb.Message) error {
...

default:
r.step(r, m)

...
}

r.step(r, m)

leader 和 follower 行为不同

对于 follower 来说

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func stepFollower(r *raft, m pb.Message) {
...

case pb.MsgProp:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return
}
// forward to leader
m.To = r.lead
// just append to raft pb.Message ?
r.send(m)

...
}

r.send(m) 只是把 message append 到 raft/raft.go 的 msgs []pb.Message 数组中,谁去消费这个 message ?

node.go

1
2
3
4
5
6
7
8
9
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
...
rd := Ready{
Entries: r.raftLog.unstableEntries(),
CommittedEntries: r.raftLog.nextEnts(),
Messages: r.msgs,
}
...
}

Ready 又是由谁消费的?

node.go main roop

func (n node) run(r raft)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (n *node) run(r *raft) {
...
if advancec != nil {
readyc = nil
} else {
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
}

...

case readyc <- rd:
r.logger.Infof("handle ready")
if rd.SoftState != nil {
prevSoftSt = rd.SoftState
}
if len(rd.Entries) > 0 {
prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
havePrevLastUnstablei = true
}
if !IsEmptyHardState(rd.HardState) {
prevHardSt = rd.HardState
}
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Metadata.Index
}
r.msgs = nil
r.readStates = nil
advancec = n.advancec
}

readyc 又由谁消费呢?

实际上 readyc 是 n.readyc,所以找下 n.readyc 由谁消费即可

1
2
3
4
5
6
type node struct {
...
readyc chan Ready
...
}
func (n *node) Ready() <-chan Ready { return n.readyc }

所以我们继续追寻哪里取了 Ready() 通道

终于在

etcdserver/raft.go 中发现了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
func (r *raftNode) start(rh *raftReadyHandler) {
...
case rd := <-r.Ready():
if rd.SoftState != nil {
// lead has changed
if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead {
r.mu.Lock()
r.lt = time.Now()
r.mu.Unlock()
// prometheus record the count of leader changes
leaderChanges.Inc()
}
if rd.SoftState.Lead == raft.None {
hasLeader.Set(0)
} else {
hasLeader.Set(1)
}
// store current seen leader
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
islead = rd.RaftState == raft.StateLeader
// raft handler
rh.updateLeadership()
}
if len(rd.ReadStates) != 0 {
select {
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
case <-time.After(internalTimeout):
plog.Warningf("timed out sending read state")
case <-r.stopped:
return
}
}
raftDone := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
raftDone: raftDone,
}
updateCommittedIndex(&ap, rh)
select {
case r.applyc <- ap:
case <-r.stopped:
return
}
// the leader can write to its disk in parallel with replicating to the followers and them
// writing to their disks.
// For more details, check raft thesis 10.2.1
if islead {
// gofail: var raftBeforeLeaderSend struct{}
r.sendMessages(rd.Messages)
}
// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
plog.Fatalf("raft save state and entries error: %v", err)
}
if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
}
// gofail: var raftAfterSave struct{}
if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
plog.Fatalf("raft save snapshot error: %v", err)
}
// gofail: var raftAfterSaveSnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
// gofail: var raftAfterApplySnap struct{}
}
r.raftStorage.Append(rd.Entries)
if !islead {
// gofail: var raftBeforeFollowerSend struct{}
r.sendMessages(rd.Messages)
}
raftDone <- struct{}{}
r.Advance()
...
}

该部分会将 apply 的 message 放入 applc 通道中,最终由

server.go

1
2
3
4
5
6
7
func (s *EtcdServer) run() {
...
case ap := <-s.r.apply():
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
...
}

做持久化,并且 trigger 写入结束

etcd v3.1.9

.wal 文件,即 write ahead log,wal 的实现集中在 wal 目录下

消息类型

其中 wal/walpb 目录下定义了 wal 中记录的两种消息类型: Record 和 Snapshot

1
2
3
4
5
6
7
8
9
message Record {
optional int64 type = 1 [(gogoproto.nullable) = false];
optional uint32 crc = 2 [(gogoproto.nullable) = false];
optional bytes data = 3;
}
message Snapshot {
optional uint64 index = 1 [(gogoproto.nullable) = false];
optional uint64 term = 2 [(gogoproto.nullable) = false];
}

用 pb 比较省事儿,不用自己实现对象序列化反序列化的逻辑

创建方法

wal 有两个方法会创建 wal 文件,一个是 Create 方法,另一个是 cut 方法
Create 方法会创建初始 wal 文件,名称为 0000000000000000-0000000000000000.wal

1
p := filepath.Join(tmpdirpath, walName(0, 0))

查看 Create 创建初始 wal 文件的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// keep temporary wal directory so WAL initialization appears atomic
tmpdirpath := filepath.Clean(dirpath) + ".tmp"
// 生成初始 wal 文件名
p := filepath.Join(tmpdirpath, walName(0, 0))
// 系统调用 LockFile
// 防止被 purge
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
if err != nil {
return nil, err
}
// 不是很理解这个地方为何要 seek 到文件末尾
if _, err = f.Seek(0, os.SEEK_END); err != nil {
return nil, err
}
// 预分配 wal 文件空间
// SegmentSizeBytes = 64 * 1024 * 1024
// 即 64 MB
if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
return nil, err
}
...
// 将 wal 文件加入 locks slice 中
w.locks = append(w.locks, f)
// 写入初始信息 ...
if w, err = w.renameWal(tmpdirpath); err != nil {
return nil, err
}

w.renameWal(tmpdirpath) 值得抽出来说下

1
2
3
4
5
6
7
8
9
10
11
12
13
// 删除原 wal path
if err := os.RemoveAll(w.dir); err != nil {
return nil, err
}
// 将 tmp wal path -> wal path
if err := os.Rename(tmpdirpath, w.dir); err != nil {
return nil, err
}
// 在这里初始化 FilePipeline
w.fp = newFilePipeline(w.dir, SegmentSizeBytes)
df, err := fileutil.OpenDir(w.dir)
w.dirFile = df
return w, err

初始写入 wal 内容示意如下

initial_wal

预分配空间

在 unix OS 上,首先会使用系统调用 Fallocate 预分配文件空间

如果 Fallocate 失败,则 fallback 到 preallocExtendTrunc 再次尝试分配

查看 preallocExtendTrunc 的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 移动到文件的当前读写位置,返回 offset
// 一般 curOff = 0
curOff, err := f.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
// 从当前文件的末尾处,移动至 +sizeInBytes 位置处,返回 offset
// 一般 size = 067108864
// 即 sizeInBytes 的值 64 * 1024 *1024
size, err := f.Seek(sizeInBytes, os.SEEK_END)
if err != nil {
return err
}
// 移动回文件之前的读写位置,待后续写入
if _, err = f.Seek(curOff, os.SEEK_SET); err != nil {
return err
}
// 已分配足够空间,返回 nil
// 一般 sizeInBytes == size
if sizeInBytes > size {
return nil
}
// 多分配了空间,以 sizeInBytes 截断文件
// truncate 之后,文件大小才显示为 sizeInBytes 大小
return f.Truncate(sizeInBytes)

在 darwin OS 上,首先会调用 preallocFixed,该函数中使用了系统调用 SYS_FCNTL 预先分配文件空间

如果 preallocFixed 失败,则调用 preallocExtendTrunc 再次尝试分配

编码 / 解码

wal/encoder.go 实现了写入逻辑
wal/decoder.go 实现了读取逻辑

File Pipeline

wal/file_pipeline.go 用来预创建文件,为后续生成新的 wal 文件使用

fp.Open() 在 cut 方法中被调用,cut 中的调用如下

1
2
3
4
5
// create a temp wal file with name sequence + 1, or truncate the existing one
newTail, err := w.fp.Open()
if err != nil {
return err
}

而 fp.Open() 从 fp.filec 中获取 locks file 返回

在初始化 file pipeline 方法 newFilePipeline 中启动 fp.run() 协程,查看 fp.run() 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (fp *filePipeline) run() {
defer close(fp.errc)
for {
f, err := fp.alloc()
if err != nil {
fp.errc <- err
return
}
select {
// fp.filec 大小为 1
case fp.filec <- f:
case <-fp.donec:
os.Remove(f.Name())
f.Close()
return
}
}
}

查看 fp.alloc() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
// count % 2 so this file isn't the same as the one last published
fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
return nil, err
}
if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
f.Close()
return nil, err
}
fp.count++
return f, nil
}

可见预生成了 [0-1].tmp 文件,并对该文件加了锁,待调用 fp.Open() 方法获取使用

Cut 方法

wal 文件大小上限为 64MB

因此当写入消息之后, wal 文件大小 > 64MB 时,会调用 cut 方法

截断之前的 wal 文件,并生成新的 wal 文件用于写入

cut 的整体思路

  1. 截断当前使用的 wal 文件
  2. 从 file pipeline 中获取 tmp 文件
  3. 向 tmp 文件中写入必要的 headers
  4. 将 tmp 文件 rename to wal 文件,新文件名为 walName(w.seq()+1, w.enti+1)
  5. 将新 wal 文件加入 locks slice 中,并生成 newFileEncoder 用于写入新 wal 文件

详细阅读 cut 方法(保留了原注释)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// cut closes current file written and creates a new one ready to append.
// cut first creates a temp wal file and writes necessary headers into it.
// Then cut atomically rename temp wal file to a wal file.
func (w *WAL) cut() error {
// close old wal file; truncate to avoid wasting space if an early cut
// 从 locks slice 中取最后一个 file
// seek 到当前读写位置
off, serr := w.tail().Seek(0, os.SEEK_CUR)
if serr != nil {
return serr
}
// 截断至当前读写位置
if err := w.tail().Truncate(off); err != nil {
return err
}
// 系统调用 fsync 落盘
if err := w.sync(); err != nil {
return err
}

// 生成新的 wal 文件名
// seq + 1
// index + 1
fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
// create a temp wal file with name sequence + 1, or truncate the existing one
// 从 filePipeline 中获取预创建好的 [0-1].tmp 文件
newTail, err := w.fp.Open()
if err != nil {
return err
}
// update writer and save the previous crc
// 添加至 locks slice 末尾
w.locks = append(w.locks, newTail)
prevCrc := w.encoder.crc.Sum32()
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
if err != nil {
return err
}

// 写入之前 wal 的 crc
if err = w.saveCrc(prevCrc); err != nil {
return err
}
// 写入 metadata
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
return err
}
// 写入 raft HardState
if err = w.saveState(&w.state); err != nil {
return err
}
// atomically move temp wal file to wal file
// fsync 落盘
if err = w.sync(); err != nil {
return err
}

// 获取当前位置 offset
off, err = w.tail().Seek(0, os.SEEK_CUR)
if err != nil {
return err
}

// 同一个文件系统相当于 mv
if err = os.Rename(newTail.Name(), fpath); err != nil {
return err
}
// fsync 父目录
if err = fileutil.Fsync(w.dirFile); err != nil {
return err
}

// 关闭 filePipeline 打开的 newTail
newTail.Close()

// 重新加锁
if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
return err
}

// off, err = w.tail().Seek(0, os.SEEK_CUR)
// 重新设置下次读写位置为当前位置
if _, err = newTail.Seek(off, os.SEEK_SET); err != nil {
return err
}
w.locks[len(w.locks)-1] = newTail

// 莫名,直接使用之前的 prevCrc 不可以么
prevCrc = w.encoder.crc.Sum32()
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
if err != nil {
return err
}
plog.Infof("segmented wal file %v is created", fpath)
return nil
}

所以 cut 方法初始写入 wal 的内容示意如下

cut_wal

Sync 方法

在 wal 中有如下几个地方使用了 sync 方法

  1. func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {}
  2. func (w *WAL) cut() error {}
  3. func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {}
  4. func (w *WAL) Close() error {}

sync 直接来说是使用了系统调用 fsync,确保数据写入磁盘持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (w *WAL) sync() error {
if w.encoder != nil {
if err := w.encoder.flush(); err != nil {
return err
}
}
// 记录开始时间
start := time.Now()

// 底层是系统调用
err := fileutil.Fdatasync(w.tail().File)

// 计算 fsync 耗时
duration := time.Since(start)
// 大于 1s 告警
if duration > warnSyncDuration {
plog.Warningf("sync duration of %v, expected less than %v", duration, warnSyncDuration)
}
syncDurations.Observe(duration.Seconds())
return err
}

重点看下 func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error 方法,毕竟它调用频率较高

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
...
mustSync := mustSync(st, w.state, len(ents))
//func mustSync(st, prevst raftpb.HardState, entsnum int) bool {
// return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
//}
...
curOff, err := w.tail().Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
if curOff < SegmentSizeBytes {
if mustSync {
return w.sync()
}
return nil
}
return w.cut()
}

从上: vote / term 变化,或有 entries 要写入时,调用 w.sync

etcd v3.1.9

etcd http api

clientURL/health

1
2
curl http://localhost:2379/health
{"health": "true"}%

peerURL/members

1
2
curl http://localhost:2380/members
[{"id":10276657743932975437,"peerURLs":["http://localhost:2380"],"name":"default","clientURLs":["http://localhost:2379"]}]

pipelineHandler

/raft

接收 raftpb.Message 消息,传递给 raft.Process 处理

streamHandler

检查 request id 是否已经被 remove

从 local 中获取 peer,如果不存在,则加入 remote 中 trace,并提示

1
failed to find member [mid] in cluster [cid]

检查 x-raft-to id 是否与本成员一致

(1) /raft/stream/msgapp

处理 streamTypeMsgAppV2 消息类型;建立 stream 链接,保持 w http.ResponseWriter

(2) /raft/stream/message

处理 streamTypeMessage 消息类型;建立 stream 链接,保持 w http.ResponseWriter

snapshotHandler

/raft/snapshot

probingHandler

/raft/probing

etcd v3.1.9

Introduction

基于 protocol buffer,使用 gRPC 框架实现

示例代码:https://github.com/grpc/grpc-go/tree/master/examples

简单易用

Convert .proto to .go

pb 定义位于 etcdserver/etcdserverpb/rpc.proto 中,service 中定义了一系列 RPC 方法,RPC 中的 request 和 response 则在 service 之后被定义

使用 scripts/genproto.sh 可根据 pb 文件生成 go 文件

也可以根据官方文档使用 protoc 工具,从 pb 文件生成 go 文件

由 .proto 文件生成的 go 文件有两,如

1
2
rpc.pb.go
rpc.pb.gw.go

其中 rpc.pb.gw.go 是个反向代理,是将 http 请求再封成 grpc 发送至后端;按我理解就是方便使用 curl 命令调试

详情参考 ref: https://grpc.io/blog/coreos

下图源自上述网址,清晰明了;无图言 x

grpc-rest-gateway

The entry in etcd

gRPC 的入口位于 etcdserver/api/v3rpc/grpc.go

该文件中 new 了 grpc server,注册了 pb 中定义的一系列 service,并返回 grpc server

ETCD v3 vs ETCD v2 in client listener

v3 中保有 peer http,client http 及 client grpc

因此 v3 只是对 client 的 api 变为了 gRPC,peer 之间的通信仍然沿用 v2 的套路