0%

etcd v3.1.9

数据是如此重要,有必要看下 etcdctl restore 的实现

etcdctl 的实现均在 etcdctl 目录下,ctlv2 是 v2 的实现,ctlv3 是 v3 的实现,统一入口 main.go,通过环境变量 ETCDCTL_API 指定使用哪个版本的 etcdctl

查看 etcdctl/ctlv3/command/snapshot_command.go 中的 snapshotRestoreCommandFunc 方法

Restore 的整体过程

  1. 使用 —initial-cluster / —name / —initial-cluster-token / —initial-advertise-peer-urls 参数生成 etcd 集群及成员参数
  2. 校验参数
  3. 如果 data-dir 已经存在,报错退出
  4. 生成 etcd v3 backend db 文件 (makeDB)
  5. 生成 .wal 和 .snap 文件 (makeWALAndSnap)

makeDB

具体查看 makeDB 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// makeDB copies the database snapshot to the snapshot directory
func makeDB(snapdir, dbfile string, commit int) {
// 打开 db 文件
f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
if ferr != nil {
ExitWithError(ExitInvalidInput, ferr)
}
defer f.Close()
// get snapshot integrity hash
if _, err := f.Seek(-sha256.Size, os.SEEK_END); err != nil {
ExitWithError(ExitIO, err)
}
sha := make([]byte, sha256.Size)
if _, err := f.Read(sha); err != nil {
ExitWithError(ExitIO, err)
}
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
ExitWithError(ExitIO, err)
}

// 创建 data-dir/member/snap 目录
if err := fileutil.CreateDirAll(snapdir); err != nil {
ExitWithError(ExitIO, err)
}

// 拷贝 db 文件至 data-dir/member/snap/db
dbpath := filepath.Join(snapdir, "db")
db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600)
if dberr != nil {
ExitWithError(ExitIO, dberr)
}
if _, err := io.Copy(db, f); err != nil {
ExitWithError(ExitIO, err)
}
// truncate away integrity hash, if any.
off, serr := db.Seek(0, os.SEEK_END)
if serr != nil {
ExitWithError(ExitIO, serr)
}

// db 文件中是否存在 hash 值这块有点意思
// 看着是以 512 chunk 的方式写入的
// % 512 以后余下的字节数等于 sha256.Size 的话
// 那么 db 文件存在 hash 值
hasHash := (off % 512) == sha256.Size
if hasHash {
// 去掉 db 文件末尾的 hash 值
if err := db.Truncate(off - sha256.Size); err != nil {
ExitWithError(ExitIO, err)
}
}

// 如果既没有 hash 值,restore 参数又没有指定 --skip-hash-check
// 那么报错退出
// 注意此时已经生成了 data-dir/member/snap/db 文件
if !hasHash && !skipHashCheck {
err := fmt.Errorf("snapshot missing hash but --skip-hash-check=false")
ExitWithError(ExitBadArgs, err)
}
if hasHash && !skipHashCheck {
// check for match
if _, err := db.Seek(0, os.SEEK_SET); err != nil {
ExitWithError(ExitIO, err)
}
h := sha256.New()
if _, err := io.Copy(h, db); err != nil {
ExitWithError(ExitIO, err)
}
dbsha := h.Sum(nil)
if !reflect.DeepEqual(sha, dbsha) {
err := fmt.Errorf("expected sha256 %v, got %v", sha, dbsha)
ExitWithError(ExitInvalidInput, err)
}
}
// db hash is OK, can now modify DB so it can be part of a new cluster
db.Close()
// update consistentIndex so applies go through on etcdserver despite
// having a new raft instance
be := backend.NewDefaultBackend(dbpath)
// a lessor never timeouts leases
lessor := lease.NewLessor(be, math.MaxInt64)
s := mvcc.NewStore(be, lessor, (*initIndex)(&commit))
id := s.TxnBegin()
btx := be.BatchTx()
del := func(k, v []byte) error {
_, _, err := s.TxnDeleteRange(id, k, nil)
return err
}

// db 文件中存储了 member 的信息
// 此处删除
// delete stored members from old cluster since using new members
btx.UnsafeForEach([]byte("members"), del)
// todo: add back new members when we start to deprecate old snap file.
btx.UnsafeForEach([]byte("members_removed"), del)
// trigger write-out of new consistent index
s.TxnEnd(id)
s.Commit()
s.Close()
}

makeWALAndSnap

具体查看 makeWALAndSnap 方法,无图言 x,makeWALAndSnap 生成的 .wal 和 .snap 文件内容如下

restore_wal

代码注释如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// makeWAL creates a WAL for the initial cluster
func makeWALAndSnap(waldir, snapdir string, cl *membership.RaftCluster) {
// 新建 data-dir/member/wal 目录
if err := fileutil.CreateDirAll(waldir); err != nil {
ExitWithError(ExitIO, err)
}
// etcd v2 storage
// add members again to persist them to the store we create.
st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
cl.SetStore(st)
for _, m := range cl.Members() {
cl.AddMember(m)
}
// cluster and member metadata
// write to wal
m := cl.MemberByName(restoreName)
md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
metadata, merr := md.Marshal()
if merr != nil {
ExitWithError(ExitInvalidInput, merr)
}

// 生成初始 wal 文件
w, walerr := wal.Create(waldir, metadata)
if walerr != nil {
ExitWithError(ExitIO, walerr)
}
defer w.Close()
//
// add entries for raft start
peers := make([]raft.Peer, len(cl.MemberIDs()))
for i, id := range cl.MemberIDs() {
ctx, err := json.Marshal((*cl).Member(id))
if err != nil {
ExitWithError(ExitInvalidInput, err)
}
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
}
ents := make([]raftpb.Entry, len(peers))
nodeIDs := make([]uint64, len(peers))
for i, p := range peers {
nodeIDs[i] = p.ID
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: p.ID,
Context: p.Context}
d, err := cc.Marshal()
if err != nil {
ExitWithError(ExitInvalidInput, err)
}
e := raftpb.Entry{
Type: raftpb.EntryConfChange,
Term: 1,
Index: uint64(i + 1),
Data: d,
}
ents[i] = e
}
// add nodes entries are committed
// initial term 1
// save to wal
commit, term := uint64(len(ents)), uint64(1)
if err := w.Save(raftpb.HardState{
Term: term,
Vote: peers[0].ID,
Commit: commit}, ents); err != nil {
ExitWithError(ExitIO, err)
}
b, berr := st.Save()
if berr != nil {
ExitWithError(ExitError, berr)
}
// first snapshot
raftSnap := raftpb.Snapshot{
Data: b,
Metadata: raftpb.SnapshotMetadata{
Index: commit,
Term: term,
ConfState: raftpb.ConfState{
Nodes: nodeIDs,
},
},
}
// save snapshot
// Term: 1
// Index: The number of member in cluster
snapshotter := snap.New(snapdir)
if err := snapshotter.SaveSnap(raftSnap); err != nil {
panic(err)
}
// write to wal
if err := w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}); err != nil {
ExitWithError(ExitIO, err)
}
}

etcd 完成一次写入需要经过哪些过程?

implementation of put grpc request

key.go

1
func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)

v3_server.go

1
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)

v3_server.go

1
func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error)

v3_server.go

1
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error)

注册一个等待 id;完成之后调用 s.w.Trigger 触发完成 or GC
ch := s.w.Register(id)

raft propose (提议写入数据)

1
2
// propose PutRequest
s.r.Propose(cctx, data)

node.go

1
func (n *node) Propose(ctx context.Context, data []byte) error

n.step 往 propc 通道传入数据

node run main roop

1
2
3
4
5
6
7
8
9
func (n *node) run(r *raft) {
...

case m := <-propc:
r.logger.Infof("handle propc message")
m.From = r.id
r.Step(m)
...
}

raft/raft.go

1
2
3
4
5
6
7
8
func (r *raft) Step(m pb.Message) error {
...

default:
r.step(r, m)

...
}

r.step(r, m)

leader 和 follower 行为不同

对于 follower 来说

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func stepFollower(r *raft, m pb.Message) {
...

case pb.MsgProp:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return
}
// forward to leader
m.To = r.lead
// just append to raft pb.Message ?
r.send(m)

...
}

r.send(m) 只是把 message append 到 raft/raft.go 的 msgs []pb.Message 数组中,谁去消费这个 message ?

node.go

1
2
3
4
5
6
7
8
9
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
...
rd := Ready{
Entries: r.raftLog.unstableEntries(),
CommittedEntries: r.raftLog.nextEnts(),
Messages: r.msgs,
}
...
}

Ready 又是由谁消费的?

node.go main roop

func (n node) run(r raft)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (n *node) run(r *raft) {
...
if advancec != nil {
readyc = nil
} else {
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
}

...

case readyc <- rd:
r.logger.Infof("handle ready")
if rd.SoftState != nil {
prevSoftSt = rd.SoftState
}
if len(rd.Entries) > 0 {
prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
havePrevLastUnstablei = true
}
if !IsEmptyHardState(rd.HardState) {
prevHardSt = rd.HardState
}
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Metadata.Index
}
r.msgs = nil
r.readStates = nil
advancec = n.advancec
}

readyc 又由谁消费呢?

实际上 readyc 是 n.readyc,所以找下 n.readyc 由谁消费即可

1
2
3
4
5
6
type node struct {
...
readyc chan Ready
...
}
func (n *node) Ready() <-chan Ready { return n.readyc }

所以我们继续追寻哪里取了 Ready() 通道

终于在

etcdserver/raft.go 中发现了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
func (r *raftNode) start(rh *raftReadyHandler) {
...
case rd := <-r.Ready():
if rd.SoftState != nil {
// lead has changed
if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead {
r.mu.Lock()
r.lt = time.Now()
r.mu.Unlock()
// prometheus record the count of leader changes
leaderChanges.Inc()
}
if rd.SoftState.Lead == raft.None {
hasLeader.Set(0)
} else {
hasLeader.Set(1)
}
// store current seen leader
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
islead = rd.RaftState == raft.StateLeader
// raft handler
rh.updateLeadership()
}
if len(rd.ReadStates) != 0 {
select {
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
case <-time.After(internalTimeout):
plog.Warningf("timed out sending read state")
case <-r.stopped:
return
}
}
raftDone := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
raftDone: raftDone,
}
updateCommittedIndex(&ap, rh)
select {
case r.applyc <- ap:
case <-r.stopped:
return
}
// the leader can write to its disk in parallel with replicating to the followers and them
// writing to their disks.
// For more details, check raft thesis 10.2.1
if islead {
// gofail: var raftBeforeLeaderSend struct{}
r.sendMessages(rd.Messages)
}
// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
plog.Fatalf("raft save state and entries error: %v", err)
}
if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
}
// gofail: var raftAfterSave struct{}
if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
plog.Fatalf("raft save snapshot error: %v", err)
}
// gofail: var raftAfterSaveSnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
// gofail: var raftAfterApplySnap struct{}
}
r.raftStorage.Append(rd.Entries)
if !islead {
// gofail: var raftBeforeFollowerSend struct{}
r.sendMessages(rd.Messages)
}
raftDone <- struct{}{}
r.Advance()
...
}

该部分会将 apply 的 message 放入 applc 通道中,最终由

server.go

1
2
3
4
5
6
7
func (s *EtcdServer) run() {
...
case ap := <-s.r.apply():
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
...
}

做持久化,并且 trigger 写入结束

etcd v3.1.9

.wal 文件,即 write ahead log,wal 的实现集中在 wal 目录下

消息类型

其中 wal/walpb 目录下定义了 wal 中记录的两种消息类型: Record 和 Snapshot

1
2
3
4
5
6
7
8
9
message Record {
optional int64 type = 1 [(gogoproto.nullable) = false];
optional uint32 crc = 2 [(gogoproto.nullable) = false];
optional bytes data = 3;
}
message Snapshot {
optional uint64 index = 1 [(gogoproto.nullable) = false];
optional uint64 term = 2 [(gogoproto.nullable) = false];
}

用 pb 比较省事儿,不用自己实现对象序列化反序列化的逻辑

创建方法

wal 有两个方法会创建 wal 文件,一个是 Create 方法,另一个是 cut 方法
Create 方法会创建初始 wal 文件,名称为 0000000000000000-0000000000000000.wal

1
p := filepath.Join(tmpdirpath, walName(0, 0))

查看 Create 创建初始 wal 文件的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// keep temporary wal directory so WAL initialization appears atomic
tmpdirpath := filepath.Clean(dirpath) + ".tmp"
// 生成初始 wal 文件名
p := filepath.Join(tmpdirpath, walName(0, 0))
// 系统调用 LockFile
// 防止被 purge
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
if err != nil {
return nil, err
}
// 不是很理解这个地方为何要 seek 到文件末尾
if _, err = f.Seek(0, os.SEEK_END); err != nil {
return nil, err
}
// 预分配 wal 文件空间
// SegmentSizeBytes = 64 * 1024 * 1024
// 即 64 MB
if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
return nil, err
}
...
// 将 wal 文件加入 locks slice 中
w.locks = append(w.locks, f)
// 写入初始信息 ...
if w, err = w.renameWal(tmpdirpath); err != nil {
return nil, err
}

w.renameWal(tmpdirpath) 值得抽出来说下

1
2
3
4
5
6
7
8
9
10
11
12
13
// 删除原 wal path
if err := os.RemoveAll(w.dir); err != nil {
return nil, err
}
// 将 tmp wal path -> wal path
if err := os.Rename(tmpdirpath, w.dir); err != nil {
return nil, err
}
// 在这里初始化 FilePipeline
w.fp = newFilePipeline(w.dir, SegmentSizeBytes)
df, err := fileutil.OpenDir(w.dir)
w.dirFile = df
return w, err

初始写入 wal 内容示意如下

initial_wal

预分配空间

在 unix OS 上,首先会使用系统调用 Fallocate 预分配文件空间

如果 Fallocate 失败,则 fallback 到 preallocExtendTrunc 再次尝试分配

查看 preallocExtendTrunc 的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 移动到文件的当前读写位置,返回 offset
// 一般 curOff = 0
curOff, err := f.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
// 从当前文件的末尾处,移动至 +sizeInBytes 位置处,返回 offset
// 一般 size = 067108864
// 即 sizeInBytes 的值 64 * 1024 *1024
size, err := f.Seek(sizeInBytes, os.SEEK_END)
if err != nil {
return err
}
// 移动回文件之前的读写位置,待后续写入
if _, err = f.Seek(curOff, os.SEEK_SET); err != nil {
return err
}
// 已分配足够空间,返回 nil
// 一般 sizeInBytes == size
if sizeInBytes > size {
return nil
}
// 多分配了空间,以 sizeInBytes 截断文件
// truncate 之后,文件大小才显示为 sizeInBytes 大小
return f.Truncate(sizeInBytes)

在 darwin OS 上,首先会调用 preallocFixed,该函数中使用了系统调用 SYS_FCNTL 预先分配文件空间

如果 preallocFixed 失败,则调用 preallocExtendTrunc 再次尝试分配

编码 / 解码

wal/encoder.go 实现了写入逻辑
wal/decoder.go 实现了读取逻辑

File Pipeline

wal/file_pipeline.go 用来预创建文件,为后续生成新的 wal 文件使用

fp.Open() 在 cut 方法中被调用,cut 中的调用如下

1
2
3
4
5
// create a temp wal file with name sequence + 1, or truncate the existing one
newTail, err := w.fp.Open()
if err != nil {
return err
}

而 fp.Open() 从 fp.filec 中获取 locks file 返回

在初始化 file pipeline 方法 newFilePipeline 中启动 fp.run() 协程,查看 fp.run() 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (fp *filePipeline) run() {
defer close(fp.errc)
for {
f, err := fp.alloc()
if err != nil {
fp.errc <- err
return
}
select {
// fp.filec 大小为 1
case fp.filec <- f:
case <-fp.donec:
os.Remove(f.Name())
f.Close()
return
}
}
}

查看 fp.alloc() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
// count % 2 so this file isn't the same as the one last published
fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
return nil, err
}
if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
f.Close()
return nil, err
}
fp.count++
return f, nil
}

可见预生成了 [0-1].tmp 文件,并对该文件加了锁,待调用 fp.Open() 方法获取使用

Cut 方法

wal 文件大小上限为 64MB

因此当写入消息之后, wal 文件大小 > 64MB 时,会调用 cut 方法

截断之前的 wal 文件,并生成新的 wal 文件用于写入

cut 的整体思路

  1. 截断当前使用的 wal 文件
  2. 从 file pipeline 中获取 tmp 文件
  3. 向 tmp 文件中写入必要的 headers
  4. 将 tmp 文件 rename to wal 文件,新文件名为 walName(w.seq()+1, w.enti+1)
  5. 将新 wal 文件加入 locks slice 中,并生成 newFileEncoder 用于写入新 wal 文件

详细阅读 cut 方法(保留了原注释)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// cut closes current file written and creates a new one ready to append.
// cut first creates a temp wal file and writes necessary headers into it.
// Then cut atomically rename temp wal file to a wal file.
func (w *WAL) cut() error {
// close old wal file; truncate to avoid wasting space if an early cut
// 从 locks slice 中取最后一个 file
// seek 到当前读写位置
off, serr := w.tail().Seek(0, os.SEEK_CUR)
if serr != nil {
return serr
}
// 截断至当前读写位置
if err := w.tail().Truncate(off); err != nil {
return err
}
// 系统调用 fsync 落盘
if err := w.sync(); err != nil {
return err
}

// 生成新的 wal 文件名
// seq + 1
// index + 1
fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
// create a temp wal file with name sequence + 1, or truncate the existing one
// 从 filePipeline 中获取预创建好的 [0-1].tmp 文件
newTail, err := w.fp.Open()
if err != nil {
return err
}
// update writer and save the previous crc
// 添加至 locks slice 末尾
w.locks = append(w.locks, newTail)
prevCrc := w.encoder.crc.Sum32()
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
if err != nil {
return err
}

// 写入之前 wal 的 crc
if err = w.saveCrc(prevCrc); err != nil {
return err
}
// 写入 metadata
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
return err
}
// 写入 raft HardState
if err = w.saveState(&w.state); err != nil {
return err
}
// atomically move temp wal file to wal file
// fsync 落盘
if err = w.sync(); err != nil {
return err
}

// 获取当前位置 offset
off, err = w.tail().Seek(0, os.SEEK_CUR)
if err != nil {
return err
}

// 同一个文件系统相当于 mv
if err = os.Rename(newTail.Name(), fpath); err != nil {
return err
}
// fsync 父目录
if err = fileutil.Fsync(w.dirFile); err != nil {
return err
}

// 关闭 filePipeline 打开的 newTail
newTail.Close()

// 重新加锁
if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
return err
}

// off, err = w.tail().Seek(0, os.SEEK_CUR)
// 重新设置下次读写位置为当前位置
if _, err = newTail.Seek(off, os.SEEK_SET); err != nil {
return err
}
w.locks[len(w.locks)-1] = newTail

// 莫名,直接使用之前的 prevCrc 不可以么
prevCrc = w.encoder.crc.Sum32()
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
if err != nil {
return err
}
plog.Infof("segmented wal file %v is created", fpath)
return nil
}

所以 cut 方法初始写入 wal 的内容示意如下

cut_wal

Sync 方法

在 wal 中有如下几个地方使用了 sync 方法

  1. func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {}
  2. func (w *WAL) cut() error {}
  3. func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {}
  4. func (w *WAL) Close() error {}

sync 直接来说是使用了系统调用 fsync,确保数据写入磁盘持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (w *WAL) sync() error {
if w.encoder != nil {
if err := w.encoder.flush(); err != nil {
return err
}
}
// 记录开始时间
start := time.Now()

// 底层是系统调用
err := fileutil.Fdatasync(w.tail().File)

// 计算 fsync 耗时
duration := time.Since(start)
// 大于 1s 告警
if duration > warnSyncDuration {
plog.Warningf("sync duration of %v, expected less than %v", duration, warnSyncDuration)
}
syncDurations.Observe(duration.Seconds())
return err
}

重点看下 func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error 方法,毕竟它调用频率较高

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
...
mustSync := mustSync(st, w.state, len(ents))
//func mustSync(st, prevst raftpb.HardState, entsnum int) bool {
// return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
//}
...
curOff, err := w.tail().Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
if curOff < SegmentSizeBytes {
if mustSync {
return w.sync()
}
return nil
}
return w.cut()
}

从上: vote / term 变化,或有 entries 要写入时,调用 w.sync

etcd v3.1.9

etcd http api

clientURL/health

1
2
curl http://localhost:2379/health
{"health": "true"}%

peerURL/members

1
2
curl http://localhost:2380/members
[{"id":10276657743932975437,"peerURLs":["http://localhost:2380"],"name":"default","clientURLs":["http://localhost:2379"]}]

pipelineHandler

/raft

接收 raftpb.Message 消息,传递给 raft.Process 处理

streamHandler

检查 request id 是否已经被 remove

从 local 中获取 peer,如果不存在,则加入 remote 中 trace,并提示

1
failed to find member [mid] in cluster [cid]

检查 x-raft-to id 是否与本成员一致

(1) /raft/stream/msgapp

处理 streamTypeMsgAppV2 消息类型;建立 stream 链接,保持 w http.ResponseWriter

(2) /raft/stream/message

处理 streamTypeMessage 消息类型;建立 stream 链接,保持 w http.ResponseWriter

snapshotHandler

/raft/snapshot

probingHandler

/raft/probing

etcd v3.1.9

Introduction

基于 protocol buffer,使用 gRPC 框架实现

示例代码:https://github.com/grpc/grpc-go/tree/master/examples

简单易用

Convert .proto to .go

pb 定义位于 etcdserver/etcdserverpb/rpc.proto 中,service 中定义了一系列 RPC 方法,RPC 中的 request 和 response 则在 service 之后被定义

使用 scripts/genproto.sh 可根据 pb 文件生成 go 文件

也可以根据官方文档使用 protoc 工具,从 pb 文件生成 go 文件

由 .proto 文件生成的 go 文件有两,如

1
2
rpc.pb.go
rpc.pb.gw.go

其中 rpc.pb.gw.go 是个反向代理,是将 http 请求再封成 grpc 发送至后端;按我理解就是方便使用 curl 命令调试

详情参考 ref: https://grpc.io/blog/coreos

下图源自上述网址,清晰明了;无图言 x

grpc-rest-gateway

The entry in etcd

gRPC 的入口位于 etcdserver/api/v3rpc/grpc.go

该文件中 new 了 grpc server,注册了 pb 中定义的一系列 service,并返回 grpc server

ETCD v3 vs ETCD v2 in client listener

v3 中保有 peer http,client http 及 client grpc

因此 v3 只是对 client 的 api 变为了 gRPC,peer 之间的通信仍然沿用 v2 的套路

简单的源码阅读

链接: https://github.com/coreos/etcd/tree/release-2.3

Main

  1. 检查运行 arch
  2. parse 启动配置参数
  3. 设置日志级别(参考 admin guide debug part,可以打开 debug 级别日志,亦可单独打开特定 package 的 debug 日志)
  4. 声明停止通道 var stopped <-chan struct{}
  5. 打印 etcd 版本 / GitSHA / 编译时的 Go VERSION / 运行架构信息

检查数据目录是否存在,若存在日志提示 (只有 data-dir 存在,就会打印如下日志)

1
the server is already initialized as member before, starting as etcd member...

不管数据目录存在与否,均如此启动

1
stopped, err = startEtcd(cfg)

最后获取到停止信号停止

1
2
// etcd process stpo
<-stoped

startEtcd

首先会从 initial-cluster 中初始化 peer 信息,利用这些 peer 信息新建 peer listener,主要方法为 rafthttp.NewListener

接下来使用 listen-client-url 新建 client listener,这块直接使用 net.Listen 方法,开启 tcp 服务;然后判断系统文件描述符限制,如果 < 150,则 panic,然后限制 listener 的数量为 系统描述符限制 - 150

初始化 net.Listen 的 keepAliveListener

初始化 etcd server config

初始化 etcdserver

启动 etcdserver


注册操作系统终止回调函数 osutil.RegisterInterruptHandler(s.Stop)


使用 cors 初始化 clientHandler

使用 etcdhttp 初始化 peerHandler

goroutine 为每个 peer listener 开启 http 服务 处理请求,5 min read timeout

goroutine 为每个 client listener 开启 http 服务 处理请求,0 min read timeout // 与 golang 的一个 bug 有关

etcdhttp/peer.go

etcdhttp/client.go

其实是 http server,待阅读


最后,返回停止通道及错误

etcdserver.NewServer

初始化 etcdserver

集群信息存在 /0

key 信息存在 /1

检查数据目录版本,并更新数据目录

判断以何种方式启动

(1) !haveWAL && !cfg.NewCluster

没有 wal 目录且 new-cluster == false,此时通过从启动参数 initial-token 及 initial-cluster 配置的集群信息来访问集群(会将自己的 peer url 排除),获取到当前存在的集群信息

校验 启动参数中配置的集群信息与获取到存在的集群信息,并为 local member 设置 id

此时成员信息从 remote peer 中获取,当前 server 的 cluster id 被设置为 remote peer 中获取到 cluster id

打印启动参数 cfg.Print()

启动 raftnode

1
id, n, s, w = startNode(cfg, cl, nil)

简单的源码阅读

链接: https://github.com/coreos/etcd/tree/release-2.3

Main

  • 检查运行 arch
  • parse 启动配置参数
  • 设置日志级别(参考 admin guide debug part,可以打开 debug 级别日志,亦可单独打开特定 package 的 debug 日志)
  • 声明停止通道 var stopped <-chan struct{}
  • 打印 etcd 版本 / GitSHA / 编译时的 Go VERSION / 运行架构信息

检查数据目录是否存在,若存在日志提示 (只有 data-dir 存在,就会打印如下日志)

1
the server is already initialized as member before, starting as etcd member...

不管数据目录存在与否,均如此启动

1
stopped, err = startEtcd(cfg)

最后获取到停止信号停止

1
2
// etcd process stpo
<-stoped

startEtcd

首先会从 initial-cluster 中初始化 peer 信息,利用这些 peer 信息新建 peer listener,主要方法为 rafthttp.NewListener

接下来使用 listen-client-url 新建 client listener,这块直接使用 net.Listen 方法,开启 tcp 服务;然后判断系统文件描述符限制,如果 < 150,则 panic,然后限制 listener 的数量为 系统描述符限制 - 150

初始化 net.Listen 的 keepAliveListener

初始化 etcd server config

初始化 etcdserver

启动 etcdserver


注册操作系统终止回调函数 osutil.RegisterInterruptHandler(s.Stop)


使用 cors 初始化 clientHandler

使用 etcdhttp 初始化 peerHandler

goroutine 为每个 peer listener 开启 http 服务 处理请求,5 min read timeout

goroutine 为每个 client listener 开启 http 服务 处理请求,0 min read timeout // 与 golang 的一个 bug 有关

etcdhttp/peer.go

etcdhttp/client.go

其实是 http server,待阅读


etcdserver.NewServer

初始化 etcdserver

集群信息存在 /0

key 信息存在 /1

检查数据目录版本,并更新数据目录

判断以何种方式启动

(1) !haveWAL && !cfg.NewCluster

没有 wal 目录且 new-cluster == false,此时通过从启动参数 initial-token 及 initial-cluster 配置的集群信息来访问集群(会将自己的 peer url 排除),获取到当前存在的集群信息

校验 启动参数中配置的集群信息与获取到存在的集群信息,并为 local member 设置 id

此时成员信息从 remote peer 中获取,当前 server 的 cluster id 被设置为 remote peer 中获取到 cluster id

打印启动参数

1
cfg.Print()

启动 raftnode

1
id, n, s, w = startNode(cfg, cl, nil)

(2) !haveWAL && cfg.NewCluster

没有 wal 目录且 NewCluster == true

判断 isMemberBootstrapped,从 remote peer 中获取集群信息,如果 server id 已经存在集群中且其 client url 不为空,则表示已经被启动过了,返回错误

1
member XXX has already been bootstrapped

打印启动参数,并启动 raftnode

1
2
cfg.PrintWithInitial()
id, n, s, w = startNode(cfg, cl, cl.MemberIDs())

可见

  • initial advertise peer url
  • intial cluster

参数是用来 bootstrap 成员用的

(3) haveWAL

有 WAL 目录的情况

data-dir/member

data-dir/member/wal

data-dir/member/snap

load snapshot 文件

从 snapshot 文件中恢复

最后启动 raftNode

1
2
3
4
5
6
7
8
cfg.Print()
if !cfg.ForceNewCluster {
id, cl, n, s, w = restartNode(cfg, snapshot)
} else {
id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
}
cl.SetStore(st)
cl.Recover()

注意备份文件恢复时,一定要使用 force-new-start 的原理就在这里了

判断完了以何种方式启动 raftnode 之后,开始初始化 EtcdServer,并且初始化 rafthttp.Transport,启动 rafthttp.Transport,将从 remote peer 获取到的 peer url 信息全部加入 tr.AddRemote,AddRemote 中会判断是否已经存在 peer 或者 remote 中,已存在则不再加入;将 cluster 中的 member 信息全部加入 tr.AddPeer

当然都是除了自己,加入之前判断了 id,如果 id == 本身则不加入

不过我还是不太明白 remote 和 peer 有啥区别,目前

得看看 remote.go 和 peer.go 都干了什么

EtcdServer.Start

  1. EtcdServer.start()
  2. goroutine publish
  3. goroutine purge file
  4. goroutine monit file descriptor
  5. goroutine monitor versions

EtcdServer.publish

注册 server 信息到 cluster 中,并更新 server 的静态 client url

publish 的过程比较直接,调用 pb.Request 方法,将其 attributes 即 name 和 clientURLs 写入集群中

即写入 key / value 中 (v2 的存储)

key 的格式为 /0/members/id/attributes

实践中,当成员无法 publish 至集群时,一般发生的错误为超时;超时后会重试,毕竟是个 for loop,在 publish 成功或者成员停止时直接 return 结束 for loop 否则会持续 publish

EtcdServer.purge

启了个 goroutine 删除超出 threshold 的 snap 和 wal 文件

EtcdServer.monitorFileDescriptor

启了个 goroutine 检查系统当前使用的 fd 数量是否超过了 limit 的 80%

EtcdServer.monitorVersions

启了个 goroutine 检查集群 version

EtcdServer.start

设置默认 snapshotCount,初始化 done / stop 通道,打印集群版本信息,goroutine run

EtcdServer.run

raftnode start

待阅读

startRemote

newPeerStatus(to)

这个结构体中提供了 activate / deactivate 方法

即日志中常见的 the connection with [Member ID] became active 信息

随后初始化了 remote 结构体

remote 结构体中初始化了 pipeline

sync.waitGroup 初始化为 4

启了 4 个 goroutine 跑 handle

handle 从 msgc 通道中获取 raftpb.Message,并使用 post 方法发送出去

waitGroup 在 stop 方法内调用 wait,确保 handle goroutine 均已关闭

startPeer

newPeerStatus(to)

初始化 peer 结构体

初始化 msgAppV2Writer: msgAppV2Writer / writer: startStreamWriter

初始化 pipeline: newPipeline / snapSender: newSnapshotSender


goroutine startPeer 从 recvc 通道中获取 message 交由底层 raft 处理


goroutine startPeer 从 propc 通道中获取 message 交由底层 raft 处理


初始化 p.msgAppV2Reader = startStreamReader

初始化 p.msgAppReader = startStreamReader

1
2
3
4
5
6
7
8
9
10
11
// peer is the representative of a remote raft node. Local raft node sends
// messages to the remote through peer.
// Each peer has two underlying mechanisms to send out a message: stream and
// pipeline.
// A stream is a receiver initialized long-polling connection, which
// is always open to transfer messages. Besides general stream, peer also has
// a optimized stream for sending msgApp since msgApp accounts for large part
// of all messages. Only raft leader uses the optimized stream to send msgApp
// to the remote follower node.
// A pipeline is a series of http clients that send http requests to the remote.
// It is only used when the stream has not been established.

peer 是 remote raft node 的代表,本地 raft node 通过 peer 发送 message 到 remote

stream and pipeline

pipeline 是一系列 http clients,用于向 remote 发送 http 请求,它只有在 stream 没有建立起来时使用

stream 是接收者 long-polling 链接,用于传递 message;另外 raft leader 使用优化过的 stream 发送 msgApp 信息

stream run 起来之后,尝试去 dial 远端,dial 未返回错误后,将 peerStatus 设置为 active,即此时日志中会打印 the connection with [Member ID] became active

dial 如果返回 errUnsupportedStreamType 亦或是 err := cr.decodeLoop(rc, t) 返回的 err 不是 EOF 或者链接被关闭,则 peerStatus 被设置为 inactive

stream 每 100 ms 会重新尝试 dial remote peer,如果出现 request sent was ignored (cluster ID mismatch: remote[remote member id]=X-Etcd-Cluster-ID in http header, local=local cluster id) 错误的话,那么这个错误日志的打印频率将会很高,需要及时处理

stream 将获取到的 raftpb.Message 放入相应的通道 recvc / propc

Summary

相比 k8s 的复杂来说,etcd 的代码阅读算是还能摸得着头的了

etcd v3.1.9

从 etcd 启动参数中生成 Cluster ID 和 Member ID

1
cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)

上述方法从 –initial-cluster-token and –initial-cluster 这个两个启动参数中生成 Cluster ID 和各个 Member ID

NewClusterFromURLsMap 这个方法中调用 NewMember 生成 Member ID

首先来看 NewMember 方法

1
func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member

核心思路

1
2
3
b []byte: peerUrls + clusterName + time
hash := sha1.Sum(b)
memberID: binary.BigEndian.Uint64(hash[:8])

Member ID 根据 peerUrls / clusterName / current_time 的 sha1 sum 值,取其前 8 个 bytes,为 16 位的 hex 数

回到 NewClusterFromURLsMap 方法中的 NewMember(代码如下)可见最后一个参数为 nil,即不加入时间因素,因此 NewClusterFromURLsMap 生成的 Member ID 是固定的

1
m := NewMember(name, urls, token, nil)

Member Add 生成的 Member ID

直接从 server 端看起 —— etcdserver/api/v3rpc/member.go 中的 MemberAdd 方法

可见如下代码

1
2
3
4
5
6
7
8
9
urls, err := types.NewURLs(r.PeerURLs)
if err != nil {
return nil, rpctypes.ErrGRPCMemberBadURLs
}
now := time.Now()
m := membership.NewMember("", urls, "", &now)
if err = cs.server.AddMember(ctx, *m); err != nil {
return nil, togRPCError(err)
}

m := membership.NewMember(“”, urls, “”, &now) 加入了当前时间,因此 Member ID 是不确定的

总结

cluster ID 仅生成一次,此后不会变化

通过 etcd 启动参数生成 (initial-cluster) 的 Member ID 固定

通过 Member add 生成的 Member ID 不确定

Member add 的时候,没有传递 member 的 name,因此 member add 成功时,member list 出来的 member item,新加入的 member 其 name 为空,且没有 client url,因该 member 尚未 publish 其 client url 到集群中

etcd v3.1.9

什么时候生成

etcd 启动时设置了参数 –snapshot-count 即 index 变化 达到该值时,会生成 .snap 文件

相关代码如下

1
2
3
4
5
6
7
8
func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
if ep.appliedi-ep.snapi <= s.snapCount {
return
}
plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi)
s.snapshot(ep.appliedi, ep.confState)
ep.snapi = ep.appliedi
}

文件名规则

snap 的全称呢 snapshot,是快照的意思,在 etcd v3 里面呢,是把 v2 的内存数据存储到磁盘上,生成 [Term]-[Index].snap 一类的文件

snap 文件名生成规则

fname := fmt.Sprintf(“%016x-%016x%s”, snapshot.Metadata.Term, snapshot.Metadata.Index, snapSuffix)

是否可以删除

说到 v2 的存储,全部在内存中,而且是一个非常直接的实现,看着就是个多叉树

那么是否 .snap 文件就可以删除掉了呢

  1. 如果存储了 v2 的数据,显然不能删除,否则 etcd 重启之后,就无法从 .snap 文件中恢复出 v2 的数据了;当然恢复也不会是全量的数据,因为有 —snapshot-count 控制,会丢这个数的 Index
  2. 如果没存储 v2 的数据,都是存的 v3 的数据,这种情况下,能否删除?
    这就要看 .snap 文件除了存储 v2 的数据还存了什么东东;以及存的这东东,还有什么其他重要的用途了

看了 etcd v3 的 restore 代码之后,我们知道 restore 会在 snap 文件下生成 v3 的存储 db 文件,以及一个 .snap 文件,这个 .snap 文件存储了啥东西嘞

相关代码如下

1
2
3
4
5
6
7
8
9
10
11
// first snapshot
raftSnap := raftpb.Snapshot{
Data: b,
Metadata: raftpb.SnapshotMetadata{
Index: commit,
Term: term,
ConfState: raftpb.ConfState{
Nodes: nodeIDs,
},
},
}

其中的 Data 并不重要,只是存储了两个 namespace,其实就是两路径

1
st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)

重要的是 Metadata 中的 Index 和 Term

显然也不能删除

etcd 在启动时,会读取 .snap 文件,获取其中的 Metadata.Index,使用这个值去搜索应该从哪个 wal 文件开始继续处理

回忆一下 .wal 文件名的第二段,正是当时 wal 存储中的 index

wal 的搜索代码如下

1
2
3
4
nameIndex, ok := searchIndex(names, snap.Index)
if !ok || !isValidSeq(names[nameIndex:]) {
return nil, ErrFileNotFound
}

从最旧的 wal 搜索到最新的 wal sort.Strings(names),直到搜索到

1
2
3
if index >= curIndex {
return i, true
}

如果 .snap 文件不存在,那么会从 index = 0 开始搜索 wal 文件,也就是说 .snap 文件不存在的时候,必须存在 0000000000000000-0000000000000000.wal 文件,否则 etcd 启动时会报如下错误

1
2017-10-02 13:49:08.313573 C | etcdserver: open wal error: wal: file not found

wal 中存储了 raft MemoryStorage 的 entries / raft HardState,etcd member id 和集群 id

构造一个异常情况

purge 保留 1,snapshot count 设置的超大,重启 etcd 会发生什么?

1
./bin/etcd --max-snapshots '1' --max-wals '1' --snapshot-count '20000000'

这个异常意义在于 snapshot 文件并未生成,而此时 wal 被 purge 之后,第一个 wal 被删掉了,那么重启 etcd 后会出现前述 wal: file not found 的错误。

持续往 etcd 写入数据,直到生成新的 wal 文件,然而不幸的是,并没有观察到 purge 的动作。 那么问题来了,etcd 是在哪儿做了保护?

查看 purge 的代码发现了如下的轨迹

1
2
3
4
l, err := TryLockFile(f, os.O_WRONLY, PrivateFileMode)
if err != nil {
break
}

也就是说 TryLockFile 成功才可以被 purge 掉,那么我们可以进一步推测没生成 .snap 文件之前,etcd 不会释放 LockFile,阻止仍然有用的 wal 文件被 purge 掉

为了验证我们的猜想,查看 wal/wal.go 的 ReleaseLockTo 方法,直接贴该方法的注释吧

1
2
ReleaseLockTo releases the locks, which has smaller index than the given index except the largest one among them.
For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4.

继续看,ReleaseLockTo 方法被谁调用,即什么时候释放 wal 文件的 lock,什么时候允许 wal 被 purge

okay,答案符合我们的预期。ReleaseLockTo 在 etcdserver/storage.go 的 SaveSnap 方法中被调用,还是直接贴该方法的注释吧

1
SaveSnap saves the snapshot to disk and release the locked wal files since they will not be used.

忙了一周 + 1 天,又是一个全员加班的周六

从上周六开始,独立维护了一套平台环境,验证服务升级部署

只能说进一步熟悉了业务

支撑了另外一个部门的服务部署,对面那哥们问我来多久了,我曰一个半月,对方回感觉都要成 xx 专家了,我曰怎么会,术业有专攻,对方回之前在哪儿待的,我曰啊,在学校呀,对方回天将降大任于斯人也

工作呀,对我来说,最需要成就感,哎,果然是个骄傲的不行的人,老师评价的不错,是得改改了,没必要和自己过不去

刚看了一个文档,终于理解了 etcd 的神逻辑。Disaster recovery 的时候,需要使用旧的数据 force-start-new 一个单节点集群,该集群正常启动后,将其 peer url 更新为对外 url,后继可正常添加成员

在 cluster-health 的情况下才可以 Member add。Member add 之前,删除其 data-dir,避免数据不一致

从 snapshot 恢复时,不用 force-start-new,因为 snapshot 没有集群 metadata,只有 key 数据

如果是数据从 v2 迁移至 v3,对 data-dir 执行 migrate 之后只是将 key data 转成了 mvcc 格式,而 cluster meta data 还是旧的,因此需要走 force new start 流程

如果是全新的集群启动,则所有 etcd 以 new 状态启动即可

这样就覆盖了所有流程

终于要去见来杭的 Gloria 了,据说为了见人,还特意剪了个发型,不过被剪成 SB 了,笑,等会儿看看有多惊艳

杭州应该进入了梅雨季节,一言不合就下雨,天气略闷热,而我的工作略忙

这篇在地铁,公交上写完

2017年06月11日12:53:16 杭州