to be cont 先写部分吧

MsgProp

Follower 收到 MsgProp 消息时,有成员发起选举,将该请求转发至 Leader;消息先 append 到 raft.msgs slice 中,注意后续所说的消息发送,均为 append 到 msgs 中,并未产生实际发送

MsgApp

Follower 收到 MsgApp 消息时,即有 Entries 写入时,重置 electionElapsed 为 0,设置其 Leader 为消息来的成员的 ID;调用 handleAppendEntries 方法处理 MsgApp 消息;handleAppendEntries 方法中向 m.From 发送 MsgAppResp 消息;消息中包含经过处理 MsgApp 后,当前节点的 Index;冲突时额外返回 Reject: true,RejectHint: lastIndex

maybeAppend: handleAppendEntries 方法中使用到的 maybeAppend 方法分析

(1)

firstIndex 会尝试从 unstable 的 snapshot 中获取 snapshot meta Index,如果 snapshot 为 nil(maybeFirstIndex),则从 storage 中获取 ents[0].Index

(2)

lastIndex 会尝试从 unstable 的 ents 中获取最后一个 entry 的 index,如果 unstable 的 ents 为空,则获取 unstable 的 snapshot meta index(maybeLastIndex),如果仍然获取不到,则从 storage 中获取最后一个 entry 的 Index

看明白最基础的方法 firstIndex 和 lastIndex 后,继续往下

(3)

term 尝试获取 index 为 i 的 entry 的 term,entries 的第一个 index 为 dummy index,即每次收到 MsgApp 消息时,m.Index 为 dummy entry (index),后续为真正的 entries (m.Ents);dummy index <= i <= lastIndex,如果 index i 不位于该范围中,显然无法找到对应的 term;maybeTerm 尝试在 unstable 中获取 index 为 i 的 entry 的 term,unstable 中无法找到的话,从 storage 中查找

(4)

matchTerm(i uint64, term Term) 的实现,首先尝试获取 index i 的 term,随后匹配是否等于 term

(5)

findConflict 的实现,对 ents 中的每个 entry 调用 matchTerm 方法,Index 升序遍历,遇到 unmatch的 (即遇到相同 Index 不同 Term 的 entry 认为 conflict),如果这个 unmatch 的 entry 的 Index <= lastIndex,则有 conflict,返回第一个 conflict entry 的 Index;如果这个 unmatch 的 entry 的 Index > lastIndex,则认为是新的未包含的 entry,则返回第一个新的 entry 的 Index;如果均 match 则返回 0

(6)

maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries…) 的实现,内部会首先判断 matchTerm(m.Index, m.LogTerm),过了之后,会对每个 entry findConflict,没有 confict 则没啥好添加的,有 conflict,可能是真 conlict 也可能是包含了新的 entries,统一调用 append 方法加入到 unstable 中;注意这是 Follower 的行为,Follower 会使用 Leader 发来的 MsgApp 改写自己本地的 entries;Leader 发来的 MsgApp 中包含了其已经 commited 的 Index 信息,Follower 使用 commited 和 MsgApp 中的最后 Index 中小的那个 Index 作为能 committed 的 index

lastnewi = index + uint64(len(ents))

commitTo(min(commited, lastnewi))

如果 tocommit > l.lastIndex() 会 panic

(7)

综上

  • 接收到 MsgHeartbeat 消息会更新 commited
  • 接收到 MsgApp 消息可能会更新 commited

FAQ: unstable 什么时候会 stable?

在 node 的 main for loop 中,首先会从 raftlog 中获取 ready to apply 的 entries (即 unstable 和 nextEnts),将其放入 readyc 通道后,等待 advancec 通道消息;当外部 apply 结束后,调用 node.Advance() 方法,node 获取到 advancec 通道中的消息,开始执行 raftlog 的 apply 更新 apply index 和 stable to 将 unstable 变为 stable

raftlog 的逻辑图如下 (没写 snapshot 部分)

raftlog

MsgHeartbeat

Follower 收到 MsgHeartbeat 消息时,重置 electionElapsed 为 0,设置其 Leader 为消息来的成员的 ID;commitTo MsgHeartbeat 中的 commited index,并向 Leader 回复 MsgHeartbeatResp 消息

MsgReadIndex

Follower 收到 MsgReadIndex 消息,将请求转发至 Leader;Leader 返回 MsgReadIndexResp 消息,有且仅有 1 Entry,返回已到达一致性的 Index (consistency=l);线性 / 序列化,貌似是个术语待查

Follower 收到 MsgReadIndexResp 消息,有且仅有 1 Entry,将其加入 readStates 中

1
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})

FAQ: 那么 msgs 什么时候会被发送

回到 node 的 main forloop,在 rd = newReady(r, prevSoftSt, prevHardSt) 方法中会读取 r.msgs,并设置 n.readyc 通道,后续将 rd 放入 readyc 通道中,等待外部消费;外部通过 node.Ready() 方法获得内部需 apply or 待发送的 Messages

在 etcdserver/raft.go 的 main forloop 中获取 readyc 通道消息 rd := <-r.Ready(),该 apply Messages 放入到 r.applyc 通道,该发送的 r.Messages,调用 r.sendMessages(rd.Messages) 发送,结束之后调用 r.Advance()

另外在 etcdserver/server.go 的 main forloop 中获取 ap ap := <-s.r.apply(),将这次 apply 放入 FIFO 中,FIFO 内部协程异步处理 apply job

cursor 从 B tree 的 root 开始,提供 B tree 的遍历和搜索实现,遍历过程记录到 stack 中

遍历

first / prev / next / last 的实现

first 就是不断搜索 element 的 index = 0 inode,直到 leaf page 为止

last 是不断搜索 element 的 index = the count of inodes - 1,直到 leaf page 为止

first 和 last 实现后,可相应实现 prev / next

prev 当 inode 中可 – 时则直接回退一格,若为开头 inode,则上移,再 last

next 当 inode 中可 ++ 时则直接前进一格,若为末尾 indoe,则上移,再 first

搜索

func (c *Cursor) search(key []byte, pgid pgid) {}

nsearch(key)

如果搜到了 leaf page / node,那么就在 inodes 中搜索该 key,返回的 index 为第一个大于等于 key 的 index,若不存在返回 inodes 长度

searchNode(key, n)

如果不是 leaf page / node,且 node 不为 nil (n),则 searchNode;searchNode 中如果 key 相等则从该 inodes[index].pgid,继续 search(key, inodes[index].pgid);如果 key 不相等且 index > 0,则设置为最后一个小于的 index,从该 index 继续 search

searchPage(key, p)

实现同上述,不过是从 page 中读取

获取 node

根据 stack 获取 leaf node,如果已经是 node 且为 leaf 直接返回;不是的话从 stack[0] 开始,遍历到 leaf node,遍历过的 page 都读到 node 并缓存到关联的 bucket 中

获取到 node 之后就可以 put 和 del key 了

总结

所以 cursor 常见操作,由 bucket 创建出来,初始绑定 bucket root,从 root 开始搜索 key 值,返回后,c.node().put or del

例如看个创建 bucket 的过程

1
2
3
4
5
6
7
// Move cursor to correct position.
c := b.Cursor()
k, _, flags := c.seek(key)
var value = bucket.write()
// Insert into node.
key = cloneBytes(key)
c.node().put(key, key, value, 0, bucketLeafFlag)

例如看个写入 key / value 的过程

1
2
3
4
5
6
7
8
9
10
// Move cursor to correct position.
c := b.Cursor()
k, _, flags := c.seek(key)
// Return an error if there is an existing key with a bucket value.
if bytes.Equal(key, k) && (flags&bucketLeafFlag) != 0 {
return ErrIncompatibleValue
}
// Insert into node.
key = cloneBytes(key)
c.node().put(key, key, value, 0, 0)

wal fsync delay warning

对于 leader 来说,可以先行向 follower 发送 messages,再进行 wal 的写入等后续持久化操作,最后 n.advance

对于 follower 来说,必须进行 wal 的写入等持久化操作后,才能向其他成员发送 messages,最后 n.advance

wal 的 fsync 调用

1
2
3
4
5
6
7
8
9
mustSync := mustSync(st, w.state, len(ents))
func mustSync(st, prevst raftpb.HardState, entsnum int) bool {
// Persistent state on all servers:
// (Updated on stable storage before responding to RPCs)
// currentTerm
// votedFor
// log entries[]
return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
}

看的出来 fsync 调用很频繁,每次写入都有 fsync 调用,毕竟每次写入时 entsnum 不为 0

fsync 的对象为最新的 wal 文件

1
2
3
4
5
6
start := time.Now()
err := fileutil.Fdatasync(w.tail().File)
duration := time.Since(start)
if duration > warnSyncDuration {
plog.Warningf("sync duration of %v, expected less than %v", duration, warnSyncDuration)
}

fsync 调用时间超过 1s 会告警,磁盘 IO 有波动了 or 不满足要求

boltdb apply delay warning

wal 写完,raft 协议走通,可同步数据后 apply 数据到本地存储

1
2
3
4
5
6
7
8
9
s.applySnapshot(ep, apply)
st := time.Now()
s.applyEntries(ep, apply)
d := time.Since(st)
entriesNum := len(apply.entries)
if entriesNum != 0 && d > time.Duration(entriesNum)*warnApplyDuration {
plog.Warningf("apply entries took too long [%v for %d entries]", d, len(apply.entries))
plog.Warningf("avoid queries with large range/delete range!")
}

平均 apply 一个 entry 耗时 100ms,*如果 apply 总时间超过 n * 100ms 则告警*

比如 put 请求,最后调到 kvstore.go 的 put 方法,kvindex (B tree) 中搜索一把,再用 boltdb tx 写入一把,kvindex 增加一把,有 lease 的加 lease

当然上述的都是耗时,只不过 boltdb put 的耗时一般而言比其他的操作都大

leader send out heartbeat delay warning

在 r.sendMessages(rd.Messages) 方法中,也会打印延时告警日志

1
2
3
4
5
6
7
8
9
10
// a heartbeat message
if ms[i].Type == raftpb.MsgHeartbeat {
// exceed maxDuration time
ok, exceed := r.td.Observe(ms[i].To)
if !ok {
// TODO: limit request rate.
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
plog.Warningf("server is likely overloaded")
}
}

这个地方的算法,是超过 2*hearbeat 时间作为 exceed 时间

leader 将这些 Message 先行发送给 followers,如果是心跳消息,则计算当前时间 - 上次记录的时间是否超过了 2*hearbeat,如果是,则打印超过的值;需注意该值如果接近或超过了 election timeout 时间,则会引发其他成员发起选举,导致集群不稳定

一般这个告警,是由 wal fsync delay 诱发的,而 wal fsync delay 又与磁盘 IO 有关;另外 apply 不是也有 delay 的 warning ?为啥它的影响不大,答:因为 apply 会走 fifo 的调度,是异步的;当然也是有影响的,总会影响整体时延

1
2
3
case ap := <-s.r.apply():
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)

放入队列就跑

1
2
3
4
5
6
// Schedule schedules a job that will be ran in FIFO order sequentially.
func (f *fifo) Schedule(j Job) {
...
f.pendings = append(f.pendings, j)
...
}

the clock difference againset peer is too high warning

peer 间计算时差大于 1s 告警,ps: 当前 peer 比对端 peer 时间大

etcd 会将其每个 peer 加入到 probe 中,定时发起 get 请求,一方面可以探测 peer health 另一方面通过其返回值,计算 peer 之间的时间差;没发现该 warning 会对业务造成影响;还没过代码,和时间相关的实现也就 lease 了,暂且推测 lease 用的是逻辑时钟,所以没影响

1
2
3
4
5
6
7
8
func monitorProbingStatus(s probing.Status, id string) {
...
if s.ClockDiff() > time.Second {
plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
}
rtts.WithLabelValues(id).Observe(s.SRTT().Seconds())
...
}

probe (4s) 及 monit (30s) 周期

1
2
proberInterval           = ConnReadTimeout - time.Second (5 - 1)
statusMonitoringInterval = 30 * time.Second

开始记录值,start 为本次开始 probe 的时间,hh.Now 为对端 peer 返回的时间

1
2
α = 0.125
s.record(time.Since(start), hh.Now)

时差计算方法

1
2
3
4
5
6
7
8
9
10
// srtt init 0
func (s *status) record(rtt time.Duration, when time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
s.total += 1
s.health = true
s.srtt = time.Duration((1-α)*float64(s.srtt) + α*float64(rtt))
s.clockdiff = time.Now().Sub(when) - s.srtt/2
s.err = nil
}

大概来说就是 local time 减掉 peer time,再减修正时间

to be cont.

bucket -> key/value

Cursor 是内存的概念,记录遍历到 leaf page 的路径

bucket 初始关联了一个 root page,为 db meta page

相关代码,beginTX or beginRWTx 都会有调用

1
2
3
4
5
6
7
8
9
10
11
12
func (tx *Tx) init(db *DB) {
...
tx.root.bucket = &bucket{}
...
*tx.root.bucket = tx.meta.root
...
// 可见读事务不增加 txid,仅读写事务增加
if tx.writable {
tx.pages = make(map[pgid]*page)
tx.meta.txid += txid(1)
}
}

看下 Cursor 的 search 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// search recursively performs a binary search against a given page/node until it finds a given key.
func (c *Cursor) search(key []byte, pgid pgid) {
// 该 pgid 可能在 page or node 中
p, n := c.bucket.pageNode(pgid)
if p != nil && (p.flags&(branchPageFlag|leafPageFlag)) == 0 {
panic(fmt.Sprintf("invalid page type: %d: %x", p.id, p.flags))
}
e := elemRef{page: p, node: n}
c.stack = append(c.stack, e)
// If we're on a leaf page/node then find the specific node.
if e.isLeaf() {
c.nsearch(key)
return
}
if n != nil {
c.searchNode(key, n)
return
}
c.searchPage(key, p)
}

page 和 node

Once the position is found, the bucket materializes the underlying page and the page’s parent pages into memory as “nodes”

Bucket 的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Bucket represents a collection of key/value pairs inside the database.
type Bucket struct {
*bucket
tx *Tx // the associated transaction
buckets map[string]*Bucket // subbucket cache
page *page // inline page reference
rootNode *node // materialized node for the root page.
nodes map[pgid]*node // node cache
// Sets the threshold for filling nodes when they split. By default,
// the bucket will fill to 50% but it can be useful to increase this
// amount if you know that your write workloads are mostly append-only.
//
// This is non-persisted across transactions so it must be set in every Tx.
FillPercent float64
}

bucket 的数据结构

1
2
3
4
5
6
7
8
// bucket represents the on-file representation of a bucket.
// This is stored as the "value" of a bucket key. If the bucket is small enough,
// then its root page can be stored inline in the "value", after the bucket
// header. In the case of inline buckets, the "root" will be 0.
type bucket struct {
root pgid // page id of the bucket's root-level page
sequence uint64 // monotonically incrementing, used by NextSequence()
}

继续过 Cursor 的 search 实现: 根据 pageid 获取到 page 或者 node,如果是 page 类型且为 branch or leaf page 则记录到 Cursor 遍历过的 stack 中,否则 panic;node 类型直接记录;判断是否为 leaf (page or node),是的话,在其中 nsearch(key);nsearch 取出 stack 中最后一个 ele,如果 node 不为空,则搜索 node 中的 inode,是否存在该 key

1
2
3
4
5
6
7
8
9
if n != nil {
// 二分查找;如果没找到返回 len(n.inodes)
index := sort.Search(len(n.inodes), func(i int) bool {
// <
return bytes.Compare(n.inodes[i].key, key) != -1
})
e.index = index
return
}

page 类型的话,将 ptr 转换为 *[0x7FFFFFF]leafPageElement 数组,即 inodes,在其中二分搜索 key 值

1
2
3
4
5
inodes := p.leafPageElements()
index := sort.Search(int(p.count), func(i int) bool {
return bytes.Compare(inodes[i].key(), key) != -1
})
e.index = index

如果 ele 不是 leaf 元素的话,那么只能继续从 node 中查找了 c.searchNode(key, n)

看到这里,记录下 node 的数据结构,越来越接近 B+ tree 的真相了

1
2
3
4
5
6
7
8
9
10
11
12
// node represents an in-memory, deserialized page.
type node struct {
bucket *Bucket
isLeaf bool
unbalanced bool
spilled bool
key []byte
pgid pgid
parent *node
children nodes
inodes inodes
}

node 树状关系如图,直觉其中的 pgid 对应的是底层的 page,即 mmap db 文件出来的 byte[] array 中的一块

node-graph

node 的 inodes 数目存储在 page.count 中,下面的代码从 read 中摘出

1
2
3
4
5
6
// read initializes the node from a page.
func (n *node) read(p *page) {
...
n.inodes = make(inodes, int(p.count))
...
}

branchPage 中只有 key; leafPage 中有 key 和 value

node 中的 key 存储着其第一个 inode 的 key 值;当然如果其没有 inode 则为 nil

1
2
3
4
5
6
7
// Save first key so we can find the node in the parent when we spill.
if len(n.inodes) > 0 {
n.key = n.inodes[0].key
_assert(len(n.key) > 0, "read: zero-length node key")
} else {
n.key = nil
}

node split,将 inodes 拆分至符合 fillPercent,parent node 的 inodes 也需要添加这些拆分出来的 nodes;还不是特别理解,这么下去的话 root node 岂不是包含所有的 inode,B+ tree 是这么设计的?还不是特别明白

Dream Of A Dream —— “人言南柯一梦,领略了繁华沧桑,谁人过往不相似”

etcd v3.1.9 boltdb pending pages 回收策略

etcdv3 中 backend 使用 boltdb 实现

在 etcdv3.1.9 集成的 boltdb 版本中,仅在 freelist 中记录可释放的 page id (pending: [txid] -> page ids),在 rw txn 中释放当前 txn 中最小 txid 之前的 pending pages[1],因此如果有一个 read txn 运行时间过长,会导致部分 pages 无法及时回收使用,导致 db 大小增加。示意图如下

leak-of-pages

1
[1] func (db *DB) beginRWTx() (*Tx, error) {} // 在该方法中释放 pending pages

mock 代码也很好写,随手写了个示例 (为了效果更明显,在 tx 的 Commit 方法中输出了 freelist 的情况)

1
2
3
4
5
6
func (tx *Tx) Commit() error {
...
fmt.Printf("freelist pending_cnt: %d, freelist free_cnt: %d\n", tx.db.freelist.pending_count(), tx.db.freelist.free_count())
p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
...
}

正式的 mock 代码: 在一个 read txn 中 “休息” 一会儿,同时不断的开启 rw txn 写数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/boltdb/bolt"
)
func main() {
// Open the my.db data file in your current directory.
// It will be created if it doesn't exist.
db, err := bolt.Open("frag.db", 0600, nil)
if err != nil {
log.Fatal(err)
}
defer db.Close()
db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte("MyBucket"))
if err != nil {
return err
}
return err
})
go func() {
db.View(func(tx *bolt.Tx) error {
fmt.Printf("start of long run read txn\n")
fmt.Printf("read txn txid: %d\n", tx.ID())
bucket := tx.Bucket([]byte("MyBucket"))
bucket.Get([]byte("answer"))
<-time.After(10 * time.Second)
fmt.Printf("end of long run read txn\n")
return nil
})
}()
mockValue := make([]byte, 1024)
for i := 0; i < 64; i++ {
db.Update(func(tx *bolt.Tx) error {
fmt.Printf("rw txn txid: %d\n", tx.ID())
b := tx.Bucket([]byte("MyBucket"))
err = b.Put([]byte("answer"), mockValue)
return err
})
}
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
os.Exit(1)
}()
}

运行三次之后,效果明显 (见如下控制台输出) ,read txn 未退出时 pending_count 增加,退出之后,free_count 总量增加,然而此时 db 文件已经扩展增大了,即总的可用页数增加了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
freelist pending_cnt: 1, freelist free_cnt: 12
rw txn txid: 133
freelist pending_cnt: 3, freelist free_cnt: 10
start of long run read txn
read txn txid: 132
rw txn txid: 134
freelist pending_cnt: 6, freelist free_cnt: 7
rw txn txid: 135
freelist pending_cnt: 9, freelist free_cnt: 4
rw txn txid: 136
freelist pending_cnt: 12, freelist free_cnt: 1
rw txn txid: 137
freelist pending_cnt: 15, freelist free_cnt: 0
rw txn txid: 138
freelist pending_cnt: 18, freelist free_cnt: 0
rw txn txid: 139
freelist pending_cnt: 21, freelist free_cnt: 0
rw txn txid: 140
freelist pending_cnt: 24, freelist free_cnt: 0
rw txn txid: 141
end of long run read txn
freelist pending_cnt: 27, freelist free_cnt: 0
rw txn txid: 142
freelist pending_cnt: 3, freelist free_cnt: 25
rw txn txid: 143
freelist pending_cnt: 3, freelist free_cnt: 25

当然 long run read txn,会获取 mmap 读锁,因此当 rw txn 需要 mmap 写锁以扩大存储空间时,会阻塞

1
Read-only transactions and read-write transactions should not depend on one another and generally shouldn’t be opened simultaneously in the same goroutine. This can cause a deadlock as the read-write transaction needs to periodically re-map the data file but it cannot do so while a read-only transaction is open. https://github.com/boltdb/bolt#transactions

为了优化这个点儿,当然也因为 boltdb 原作者不干了,coreos 的大佬们自己拉了一个库继续搞,就是 https://github.com/coreos/bbolt,这个新库在它的第二个合入 pr https://github.com/coreos/bbolt/pull/3 中尝试解决这个问题

附赠一个删除 key 之后空间不会变小的解释,直觉来理解的话,boltdb 是 page 管理的空间,底层空间是连续的,boltdb 将这个空间逻辑上划分为一个个页

bbolt 优化后的回收策略

粗略过了一遍代码,总之之前是只能释放当前最小 txn 之前的 pending pages 对吧,现在不管你,能释放的我都释放掉不就行了?示意图如下

free-pages

为了实现这个方案,当然要增加一些记录值,修改一些实现,下面详细看一下这个 pr https://github.com/coreos/bbolt/pull/3/files

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// freePages releases any pages associated with closed read-only transactions.
func (db *DB) freePages() {
// Free all pending pages prior to earliest open transaction.
// txid 升序排序
sort.Sort(txsById(db.txs))
minid := txid(0xFFFFFFFFFFFFFFFF)
if len(db.txs) > 0 {
minid = db.txs[0].meta.txid
}
// 释放最小 txid 之前的 pengding pages
if minid > 0 {
db.freelist.release(minid - 1)
}

// Release unused txid extents.
// 释放 tx 之间的 pending pages
for _, t := range db.txs {
db.freelist.releaseRange(minid, t.meta.txid-1)
minid = t.meta.txid + 1
}

// 释放当前最大 txid 之后的 pending pages
db.freelist.releaseRange(minid, txid(0xFFFFFFFFFFFFFFFF))
// Any page both allocated and freed in an extent is safe to release.
// 假设在 rw txn 之间频繁的有 long run 的 read txn,这个优化很有效
}

原 freelist pending 为 [txid] -> []pgid 的映射,现修改为 [txid] -> txPending{} 的映射

1
2
3
4
5
6
7
8
type txPending struct {
// []pgid 与 []txid 对应
// 每 append 一个 pgid 则 append 一个 txid
// 以记录该 pgid 是在哪个 tx 中被分配
ids []pgid
alloctx []txid // txids allocating the ids
lastReleaseBegin txid // beginning txid of last matching releaseRange
}

freelist 增加一个记录 allocs: map[pgid] -> txid

1
2
3
4
5
6
7
8
9
10
// freelist represents a list of all pages that are available for allocation.
// It also tracks pages that have been freed but are still in use by open transactions.
type freelist struct {
ids []pgid // all free and available free page ids.
// 记录每次 allocate 返回的 page id 与 txid 的对应关系
// allocate 返回的是连续分配的第一个 page id
allocs map[pgid]txid // mapping of txid that allocated a pgid.
pending map[txid]*txPending // mapping of soon-to-be free page ids by tx.
cache map[pgid]bool // fast lookup of all free and pending page ids.
}

freelist allocate 方法增加 txid 参数,用以记录 tx 分配的 page

1
2
3
4
5
6
7
8
// allocate returns the starting page id of a contiguous list of pages of a given size.
// If a contiguous block cannot be found then 0 is returned.
func (f *freelist) allocate(txid txid, n int) pgid {
...
// 记录;仅记录分配的连续 page 的第一个 page id
f.allocs[initial] = txid
...
}

修改 freelist free 方法内部实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// free releases a page and its overflow for a given transaction id.
// If the page is already free then a panic will occur.
func (f *freelist) free(txid txid, p *page) {
if p.id <= 1 {
panic(fmt.Sprintf("cannot free page 0 or 1: %d", p.id))
}
// Free page and all its overflow pages.
txp := f.pending[txid]
if txp == nil {
txp = &txPending{}
f.pending[txid] = txp
}
// 获取是分配给哪个 tx 使用的
allocTxid, ok := f.allocs[p.id]
if ok {
// 解除关联关系
delete(f.allocs, p.id)
} else if (p.flags & (freelistPageFlag | metaPageFlag)) != 0 {
// Safe to claim txid as allocating since these types are private to txid.
// 这两种页类型没记录
allocTxid = txid
}

// 释放连续页
for id := p.id; id <= p.id+pgid(p.overflow); id++ {
// Verify that page is not already free.
if f.cache[id] {
panic(fmt.Sprintf("page %d already freed", id))
}
// Add to the freelist and cache.

// ids 与 alloctx 对应
txp.ids = append(txp.ids, id)
txp.alloctx = append(txp.alloctx, allocTxid)

f.cache[id] = true
}
}

freelist 增加 releaseRange 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// releaseRange moves pending pages allocated within an extent [begin,end] to the free list.
// ps: [begin, end]
func (f *freelist) releaseRange(begin, end txid) {
if begin > end {
return
}
var m pgids
for tid, txp := range f.pending {
if tid < begin || tid > end {
continue
}
// Don't recompute freed pages if ranges haven't updated.
// 已处理
if txp.lastReleaseBegin == begin {
continue
}
for i := 0; i < len(txp.ids); i++ {
if atx := txp.alloctx[i]; atx < begin || atx > end {
continue
}
m = append(m, txp.ids[i])
// 这个实现是够省事儿的
// 如果该 page 能释放,则直接移除
// ids 和 alloctx 数组前移一位
// i-- 以便下次循环保持
txp.ids[i] = txp.ids[len(txp.ids)-1]
txp.ids = txp.ids[:len(txp.ids)-1]
txp.alloctx[i] = txp.alloctx[len(txp.alloctx)-1]
txp.alloctx = txp.alloctx[:len(txp.alloctx)-1]
i--
}
// 该 txid 的 txp 在该 range 已处理
txp.lastReleaseBegin = begin
// 如果均可以释放,则从 pending 中移除
if len(txp.ids) == 0 {
delete(f.pending, tid)
}
}
// 排序
sort.Sort(m)
// 归并排序合入可用 ids
f.ids = pgids(f.ids).merge(m)
}

回过头来梳理 freelist 中的各种映射

pending [txid] -> txPending

而 txPending 中又会存储 ids 和 alloctx,而看 releaseRange 中的实现,这个 alloctx 与 txid 不一定是一致的,那这个 txPending 是在哪儿修改的 ?

问题: txPending 在哪儿被修改

其实刚才我们已经看到了,其在 func (f *freelist) free(txid txid, p *page) 方法中被修改,那么 free 功能又是啥?

  1. free(txid txid, p *page)
  2. 获取 txPending (txp := f.pending[txid])
  3. 获取分配该 page 的 txid (allocTxid, ok := f.allocs[p.id]); 如果获取不到且 page 为 freelist or meta,将 allocTxid 设置为当前 txid
  4. 将释放的连续页记录到 txPending 中: txp.ids = append(txp.ids, id); txp.alloctx = append(txp.alloctx, allocTxid))

是否与 allocate 对应 ?

  1. allocate(txid txid, n int)
  2. 分配连续的 n 个 pages,并返回第一个 page id (initial)
  3. 记录该 page id 被 txid 分配 (freelist.allocs[initial] = txid)

看起来 free 并不与 allocate 对应,即并不是 free 该 txid 的所分配的 pages 的语义,而是将连续页 (p *page) 加入到 txid 的 pending 记录中待释放;这么看来的话 pending [txid] -> txPending 好理解,然而 txPending 中未必只存储 [txid] 的 pending pages,这么实现应该与上层调用 free 方法的语义有关

最后看看 freelist 的 rollback 修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// rollback removes the pages from a given pending tx.
func (f *freelist) rollback(txid txid) {
// Remove page ids from cache.
txp := f.pending[txid]
if txp == nil {
return
}
var m pgids
for i, pgid := range txp.ids {
delete(f.cache, pgid)
tx := txp.alloctx[i]
// tx == 0 ?!
if tx == 0 {
continue
}
// 非当前 rollback 的 tx 分配的 page
if tx != txid {
// Pending free aborted; restore page back to alloc list.
f.allocs[pgid] = tx
} else {
// Freed page was allocated by this txn; OK to throw away.
// 归还 freelist ids
m = append(m, pgid)
}
}
// Remove pages from pending list and mark as free if allocated by txid.
delete(f.pending, txid)
sort.Sort(m)
f.ids = pgids(f.ids).merge(m)
}

更好的回收策略?

https://github.com/coreos/bbolt/issues/14

总结

总之这个 pr 目测能极大缓解 etcd v3.1.9 中偶尔会遇到的 mvcc: database space exceeded 的错误,但是总感觉有些 page 还是没有及时回收的样子,这种没彻底弄清楚的感觉,合入总有点儿不放心 … 随意一说

现在环境中 etcd 数据目录独立挂盘,看磁盘命名应该是个 lv

df -h 查看到类似

/device-mapper

和 lvm 相关 https://wiki.archlinux.org/index.php/LVM

pvs https://linux.die.net/man/8/pvs

iotop

iotop -n 1 -b -o

iostat

监控变化

watch -n 2 -d “xxx”

自动执行

watch -n 2 “xxx”

Intellij golang

用来记录一些 golang 的神奇语法及碎碎念

step over: 单步调试

删除数组中的一个元素

删除 index 元素

n.inodes = append(n.inodes[:index], n.inodes[index+1:]…)

Live 版完美,温暖和力量 ——

曾經我也想放棄治療

是因為還沒遇見到你

像你這樣的人存在這世界上,我對世界稍微有了好感

像你這樣的人存在這世界上,我對世界稍微有了期待

很久没写了,这次真的情绪很低落

自己明明那么认真,然而却事与愿违

不过想想也可以理解,毕竟每个人心中重要的事情不一样,接受吧

是呢,工作半年,“我变强了,也变秃了”,不再是小年轻的模样,已是满脸胡须

年轻真好呀,可以

跑到海角之南,只为追一首歌的回忆

天不怕地不怕的闯

藐视一切

可惜,又不可惜

梦醒的感觉而已

美好又遗憾

2017年10月13日21:59:32 杭州


三年后,此时的我,回去翻这些文字(陆陆续续写了 11 篇),只能感叹 “没有岁月可回头” ~ 希望今后有更强大的内心,去面对更多挑战

2020年08月01日00:47:47 杭州

etcd v3.1.9

to be cont.

boltdb 原哥们,不维护了貌似,coreos 的人继续维护 https://github.com/coreos/bbolt

另外这个 issues 和 free page 相关,即能解决部分 boltdb 内存碎片问题

https://github.com/coreos/bbolt/pull/3

boltdb

bolt_unix.go 几个工具方法,如 flock / funlock / mmap / munmap

bolt_linux.go fsync db 文件

入口处在 db.go 的 Open 方法,通过 Open 方法初始化一个 db 文件;对应 etcd 中 mvcc/backend/backend.go 中的 newBackend 方法的调用

1
2
3
4
5
6
7
8
9
func newBackend(path string, d time.Duration, limit int) *backend {
...
// bolt db
db, err := bolt.Open(path, 0600, boltOpenOptions)
if err != nil {
plog.Panicf("cannot open database at %s (%v)", path, err)
}
...
}

Open 方法中,如果是新建的 db 文件,则调用 db.init() 写入 metadata 信息,boltdb 的注释写的很赞,感觉就是个工业又学术的艺术品,赞!

查看 db.init() 干了什么,直接上图吧,往 db 文件中写了四个 page (size 一般是 4096 字节) 的内容

initial_db

然后就是各种初始化了,初始化 pagePool,mmap the data file as a byte slice,初始化 freelist

etcd 的 InitialMmapSize 设置为 10 1024 1024 * 1024,吓人,那是否 etcd 一启动,就会占用这么大的实际内存?并不是的哈,在 mac 上实测 top 命令看到的是 314MB,ps aux 看到的 vsz 556879888,rss 476460,很好奇,按理来说 vsz 的单位应该是 kb 呀,但是这个数字有点儿大的吓人,实际应该是字节,也就是 500 多 MB,那么 rss 又怎么理解呢?rss 的单位又是 kb,实际使用了 470 多 MB?神奇,总之并不是启动之后立马占用 10 G内存

开头写两个一样的 meta page 貌似是为了做保护,看到 mmap 的方法中有这段注释

1
2
3
4
5
6
7
8
// Validate the meta pages. We only return an error if both meta pages fail
// validation, since meta0 failing validation means that it wasn't saved
// properly -- but we can recover using meta1. And vice-versa.
err0 := db.meta0.validate()
err1 := db.meta1.validate()
if err0 != nil && err1 != nil {
return err0
}

说直接一点儿,boltdb 使用 mmap 把一个名为 db 的文件映射到内存中的 []byte 数组,然后直接操作内存,但是不管怎么说是外部存储,最终还是要落盘的,所以呢推测是用了 B+ tree,然后把写入的内容组织成 page,使用指针操作,这块代码有点像 C 其实

freelist allocate 的逻辑是从 freelist 记录的所有 free page 中分配 n 个连续的 page

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// allocate returns the starting page id of a contiguous list of pages of a given size.
// If a contiguous block cannot be found then 0 is returned.
func (f *freelist) allocate(n int) pgid {
if len(f.ids) == 0 {
return 0
}
var initial, previd pgid
for i, id := range f.ids {
if id <= 1 {
panic(fmt.Sprintf("invalid page allocation: %d", id))
}
// Reset initial page if this is not contiguous.
if previd == 0 || id-previd != 1 {
initial = id
}
// If we found a contiguous block then remove it and return it.
if (id-initial)+1 == pgid(n) {
// If we're allocating off the beginning then take the fast path
// and just adjust the existing slice. This will use extra memory
// temporarily but the append() in free() will realloc the slice
// as is necessary.

// 将已分配的连续页移出 free list 记录表 (ids)
// 并释放 ids 空间
if (i + 1) == n {
f.ids = f.ids[i+1:]
} else {
copy(f.ids[i-n+1:], f.ids[i+1:])
f.ids = f.ids[:len(f.ids)-n]
}
// Remove from the free cache.
// 移出 free list cache
for i := pgid(0); i < pgid(n); i++ {
delete(f.cache, initial+i)
}

// 返回起始页
return initial
}
previd = id
}
return 0
}

freelist 的 write 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// read initializes the freelist from a freelist page.
func (f *freelist) read(p *page) {
// If the page.count is at the max uint16 value (64k) then it's considered
// an overflow and the size of the freelist is stored as the first element.
idx, count := 0, int(p.count)
if count == 0xFFFF {
idx = 1
count = int(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0])
}
// Copy the list of page ids from the freelist.
if count == 0 {
f.ids = nil
} else {
ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx:count]
f.ids = make([]pgid, len(ids))
copy(f.ids, ids)
// Make sure they're sorted.
sort.Sort(pgids(f.ids))
}
// Rebuild the page cache.
f.reindex()
}

freelist 的 read 实现,对应 write;如何 write 的,就如何 read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// read initializes the freelist from a freelist page.
func (f *freelist) read(p *page) {
// If the page.count is at the max uint16 value (64k) then it's considered
// an overflow and the size of the freelist is stored as the first element.
// page 中的 count 怎么理解 ?
// 看着像是 page 下面维护着一系列 pgid
idx, count := 0, int(p.count)
if count == 0xFFFF {
idx = 1
count = int(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0])
}
// Copy the list of page ids from the freelist.
if count == 0 {
f.ids = nil
} else {
ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx:count]
f.ids = make([]pgid, len(ids))
copy(f.ids, ids)

// copy 结束之后,是否可以设置 ids = nil,帮助 gc ?

// Make sure they're sorted.
sort.Sort(pgids(f.ids))
}
// Rebuild the page cache.
f.reindex()
}

freelist 的 reindex 实现,其实就是构造 cache,很直接的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// reindex rebuilds the free cache based on available and pending free lists.
func (f *freelist) reindex() {
// 既然已知大小的话,make 的时候为啥不指定 capacity
// 好吧,我晕了,这个是 map,怎么指定大小?naive
f.cache = make(map[pgid]bool)
for _, id := range f.ids {
f.cache[id] = true
}
// pending 记录了 tx 中使用过,未被释放的 page id
for _, pendingIDs := range f.pending {
for _, pendingID := range pendingIDs {
f.cache[pendingID] = true
}
}
}

boltdb 在 beginRWTx 中释放空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (db *DB) beginRWTx() (*Tx, error) {
// If the database was opened with Options.ReadOnly, return an error.
if db.readOnly {
return nil, ErrDatabaseReadOnly
}
// Obtain writer lock. This is released by the transaction when it closes.
// This enforces only one writer transaction at a time.
db.rwlock.Lock()
// Once we have the writer lock then we can lock the meta pages so that
// we can set up the transaction.
db.metalock.Lock()
defer db.metalock.Unlock()
// Exit if the database is not open yet.
if !db.opened {
db.rwlock.Unlock()
return nil, ErrDatabaseNotOpen
}
// Create a transaction associated with the database.
t := &Tx{writable: true}
t.init(db)
db.rwtx = t
// Free any pages associated with closed read-only transactions.
// 获取当前事务中的最小 txid
var minid txid = 0xFFFFFFFFFFFFFFFF
for _, t := range db.txs {
if t.meta.txid < minid {
minid = t.meta.txid
}
}

// 释放该 txid 之前的 page
if minid > 0 {
db.freelist.release(minid - 1)
}
return t, nil
}

查看 db.freelist.release 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// release moves all page ids for a transaction id (or older) to the freelist.
func (f *freelist) release(txid txid) {
m := make(pgids, 0)
for tid, ids := range f.pending {
if tid <= txid {
// Move transaction's pending pages to the available freelist.
// Don't remove from the cache since the page is still free.
m = append(m, ids...)
delete(f.pending, tid)
}
}
sort.Sort(m)

// 可重新使用的 pending page 与当前可使用的 page merge sort
f.ids = pgids(f.ids).merge(m)
}

leafPageElement 结构

1
2
3
4
5
6
7
8
type leafPageElement struct {
flags uint32 // 4 bytes; 2 leafElement / 1 branchElement / 4 meta /
pos uint32 // 4 bytes
ksize uint32 // 4 bytes
vsize uint32 // 4 bytes
// pos = 16 that remain space to store key and value
// for example ksize = 8 that 64 bytes for key
}

如图所示

leafPageElement

即叶子节点中存储了 key 和 value

etcd

查看 mvcc/kvstore.go 的 func (s *store) put(key, value []byte, leaseID lease.LeaseID) 方法

查看如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
s.txnModify = true

// 每次 put revision + 1
rev := s.currentRev.main + 1
c := rev
oldLease := lease.NoLease
// if the key exists before, use its previous created and
// get its previous leaseID
_, created, ver, err := s.kvindex.Get(key, rev)
if err == nil {
c = created.main
oldLease = s.le.GetLease(lease.LeaseItem{Key: string(key)})
}
// revision to bytes
ibytes := newRevBytes()
revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes)

//
ver = ver + 1
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
d, err := kv.Marshal()
if err != nil {
plog.Fatalf("cannot marshal event: %v", err)
}

// boltdb 中的 key 为 revision
// value 为 mvccpb.KeyValue
// 存入 boltdb 即 db 文件
s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)

// 存入 key -> revision 的索引
s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})

// 这个是啥
// s.changes 什么时候释放,不然内存不会爆?
s.changes = append(s.changes, kv)
s.currentRev.sub += 1
// lease 相关代码先略去不表
}

查看 kvindex 的实现,实际上为 newTreeIndex(),即 mvcc/index.go,摘抄一句 package 注释

Package btree implements in-memory B-Trees of arbitrary degree.

okay, index 底层是 B tree

查看 kvindex.put 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (ti *treeIndex) Put(key []byte, rev revision) {
keyi := &keyIndex{key: key}
ti.Lock()
defer ti.Unlock()

// B tree get
item := ti.tree.Get(keyi)
if item == nil {
keyi.put(rev.main, rev.sub)
ti.tree.ReplaceOrInsert(keyi)
return
}

// update value in B tree
okeyi := item.(*keyIndex)
okeyi.put(rev.main, rev.sub)
}

kvindex 是个完全在内存中的索引,如果 etcd 重启了之后,需要恢复 kvindex 么?答案是需要的

etcdserver/server.go -> s.kv.Restore(newbe) -> func (s store) Restore(b backend.Backend) error {} -> func (s store) restore() error {}

在最后这个方法中从 db 文件恢复 kvindex

0%