最近在给 ETCD cluster on k8s 写 FE (front end),此篇总结一下框架性的东西
很久之前在实验室的时候,曾经蹚水过一段时间 fe 开发,深知 fe 领域目前 一天涌现 100 个开发工具 的节奏,从 angularjs (google) 到 react (facebook),都是 SPA (single page application) 的实践
使用这两框架,对于 fe 小白开发来说,最大好处是省去了大部分 jQuery 手工操作 DOM 的繁杂代码,都由框架代为更新 DOM 元素了。当然也引入了比服务器端渲染页面的经典设计模式 MVC (model view controller),更进一步的 MVVM (model view viewModel) 模式,支持视图到模型,模型到视图的双向数据更新特性。由此 fe 的代码得到极大净化
然而无奈 fe 仍然是个劳动密集型的方向,毕竟是眼见为实,与用户距离最近的东西,一言不合就有需求,就有改动了。因此代码一开始可能是规整的,过了一段时间后,就直接起飞了 …
term 尝试获取 index 为 i 的 entry 的 term,entries 的第一个 index 为 dummy index,即每次收到 MsgApp 消息时,m.Index 为 dummy entry (index),后续为真正的 entries (m.Ents);dummy index <= i <= lastIndex,如果 index i 不位于该范围中,显然无法找到对应的 term;maybeTerm 尝试在 unstable 中获取 index 为 i 的 entry 的 term,unstable 中无法找到的话,从 storage 中查找
(4)
matchTerm(i uint64, term Term) 的实现,首先尝试获取 index i 的 term,随后匹配是否等于 term
(5)
findConflict 的实现,对 ents 中的每个 entry 调用 matchTerm 方法,Index 升序遍历,遇到 unmatch的 (即遇到相同 Index 不同 Term 的 entry 认为 conflict),如果这个 unmatch 的 entry 的 Index <= lastIndex,则有 conflict,返回第一个 conflict entry 的 Index;如果这个 unmatch 的 entry 的 Index > lastIndex,则认为是新的未包含的 entry,则返回第一个新的 entry 的 Index;如果均 match 则返回 0
所以 cursor 常见操作,由 bucket 创建出来,初始绑定 bucket root,从 root 开始搜索 key 值,返回后,c.node().put or del
例如看个创建 bucket 的过程
1 2 3 4 5 6 7
// Move cursor to correct position. c := b.Cursor() k, _, flags := c.seek(key) var value = bucket.write() // Insert into node. key = cloneBytes(key) c.node().put(key, key, value, 0, bucketLeafFlag)
例如看个写入 key / value 的过程
1 2 3 4 5 6 7 8 9 10
// Move cursor to correct position. c := b.Cursor() k, _, flags := c.seek(key) // Return an error if there is an existing key with a bucket value. if bytes.Equal(key, k) && (flags&bucketLeafFlag) != 0 { return ErrIncompatibleValue } // Insert into node. key = cloneBytes(key) c.node().put(key, key, value, 0, 0)
start := time.Now() err := fileutil.Fdatasync(w.tail().File) duration := time.Since(start) if duration > warnSyncDuration { plog.Warningf("sync duration of %v, expected less than %v", duration, warnSyncDuration) }
fsync 调用时间超过 1s 会告警,磁盘 IO 有波动了 or 不满足要求
boltdb apply delay warning
wal 写完,raft 协议走通,可同步数据后 apply 数据到本地存储
1 2 3 4 5 6 7 8 9
s.applySnapshot(ep, apply) st := time.Now() s.applyEntries(ep, apply) d := time.Since(st) entriesNum := len(apply.entries) if entriesNum != 0 && d > time.Duration(entriesNum)*warnApplyDuration { plog.Warningf("apply entries took too long [%v for %d entries]", d, len(apply.entries)) plog.Warningf("avoid queries with large range/delete range!") }
比如 put 请求,最后调到 kvstore.go 的 put 方法,kvindex (B tree) 中搜索一把,再用 boltdb tx 写入一把,kvindex 增加一把,有 lease 的加 lease
当然上述的都是耗时,只不过 boltdb put 的耗时一般而言比其他的操作都大
leader send out heartbeat delay warning
在 r.sendMessages(rd.Messages) 方法中,也会打印延时告警日志
1 2 3 4 5 6 7 8 9 10
// a heartbeat message if ms[i].Type == raftpb.MsgHeartbeat { // exceed maxDuration time ok, exceed := r.td.Observe(ms[i].To) if !ok { // TODO: limit request rate. plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed) plog.Warningf("server is likely overloaded") } }
case ap := <-s.r.apply(): f := func(context.Context) { s.applyAll(&ep, &ap) } sched.Schedule(f)
放入队列就跑
1 2 3 4 5 6
// Schedule schedules a job that will be ran in FIFO order sequentially. func(f *fifo)Schedule(j Job) { ... f.pendings = append(f.pendings, j) ... }
the clock difference againset peer is too high warning
peer 间计算时差大于 1s 告警,ps: 当前 peer 比对端 peer 时间大
etcd 会将其每个 peer 加入到 probe 中,定时发起 get 请求,一方面可以探测 peer health 另一方面通过其返回值,计算 peer 之间的时间差;没发现该 warning 会对业务造成影响;还没过代码,和时间相关的实现也就 lease 了,暂且推测 lease 用的是逻辑时钟,所以没影响
1 2 3 4 5 6 7 8
funcmonitorProbingStatus(s probing.Status, id string) { ... if s.ClockDiff() > time.Second { plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second) } rtts.WithLabelValues(id).Observe(s.SRTT().Seconds()) ... }
// search recursively performs a binary search against a given page/node until it finds a given key. func(c *Cursor)search(key []byte, pgid pgid) { // 该 pgid 可能在 page or node 中 p, n := c.bucket.pageNode(pgid) if p != nil && (p.flags&(branchPageFlag|leafPageFlag)) == 0 { panic(fmt.Sprintf("invalid page type: %d: %x", p.id, p.flags)) } e := elemRef{page: p, node: n} c.stack = append(c.stack, e) // If we're on a leaf page/node then find the specific node. if e.isLeaf() { c.nsearch(key) return } if n != nil { c.searchNode(key, n) return } c.searchPage(key, p) }
page 和 node
Once the position is found, the bucket materializes the underlying page and the page’s parent pages into memory as “nodes”
Bucket 的数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Bucket represents a collection of key/value pairs inside the database. type Bucket struct { *bucket tx *Tx // the associated transaction buckets map[string]*Bucket // subbucket cache page *page // inline page reference rootNode *node // materialized node for the root page. nodes map[pgid]*node // node cache // Sets the threshold for filling nodes when they split. By default, // the bucket will fill to 50% but it can be useful to increase this // amount if you know that your write workloads are mostly append-only. // // This is non-persisted across transactions so it must be set in every Tx. FillPercent float64 }
bucket 的数据结构
1 2 3 4 5 6 7 8
// bucket represents the on-file representation of a bucket. // This is stored as the "value" of a bucket key. If the bucket is small enough, // then its root page can be stored inline in the "value", after the bucket // header. In the case of inline buckets, the "root" will be 0. type bucket struct { root pgid // page id of the bucket's root-level page sequence uint64// monotonically incrementing, used by NextSequence() }
// Save first key so we can find the node in the parent when we spill. iflen(n.inodes) > 0 { n.key = n.inodes[0].key _assert(len(n.key) > 0, "read: zero-length node key") } else { n.key = nil }
当然 long run read txn,会获取 mmap 读锁,因此当 rw txn 需要 mmap 写锁以扩大存储空间时,会阻塞
1
Read-only transactions and read-write transactions should not depend on one another and generally shouldn’t be opened simultaneously in the same goroutine. This can cause a deadlock as the read-write transaction needs to periodically re-map the data file but it cannot do so while a read-only transaction is open. https://github.com/boltdb/bolt#transactions
// freelist represents a list of all pages that are available for allocation. // It also tracks pages that have been freed but are still in use by open transactions. type freelist struct { ids []pgid // all free and available free page ids. // 记录每次 allocate 返回的 page id 与 txid 的对应关系 // allocate 返回的是连续分配的第一个 page id allocs map[pgid]txid // mapping of txid that allocated a pgid. pending map[txid]*txPending // mapping of soon-to-be free page ids by tx. cache map[pgid]bool// fast lookup of all free and pending page ids. }
freelist allocate 方法增加 txid 参数,用以记录 tx 分配的 page
1 2 3 4 5 6 7 8
// allocate returns the starting page id of a contiguous list of pages of a given size. // If a contiguous block cannot be found then 0 is returned. func(f *freelist)allocate(txid txid, n int)pgid { ... // 记录;仅记录分配的连续 page 的第一个 page id f.allocs[initial] = txid ... }
// free releases a page and its overflow for a given transaction id. // If the page is already free then a panic will occur. func(f *freelist)free(txid txid, p *page) { if p.id <= 1 { panic(fmt.Sprintf("cannot free page 0 or 1: %d", p.id)) } // Free page and all its overflow pages. txp := f.pending[txid] if txp == nil { txp = &txPending{} f.pending[txid] = txp } // 获取是分配给哪个 tx 使用的 allocTxid, ok := f.allocs[p.id] if ok { // 解除关联关系 delete(f.allocs, p.id) } elseif (p.flags & (freelistPageFlag | metaPageFlag)) != 0 { // Safe to claim txid as allocating since these types are private to txid. // 这两种页类型没记录 allocTxid = txid } // 释放连续页 for id := p.id; id <= p.id+pgid(p.overflow); id++ { // Verify that page is not already free. if f.cache[id] { panic(fmt.Sprintf("page %d already freed", id)) } // Add to the freelist and cache. // ids 与 alloctx 对应 txp.ids = append(txp.ids, id) txp.alloctx = append(txp.alloctx, allocTxid) f.cache[id] = true } }
// rollback removes the pages from a given pending tx. func(f *freelist)rollback(txid txid) { // Remove page ids from cache. txp := f.pending[txid] if txp == nil { return } var m pgids for i, pgid := range txp.ids { delete(f.cache, pgid) tx := txp.alloctx[i] // tx == 0 ?! if tx == 0 { continue } // 非当前 rollback 的 tx 分配的 page if tx != txid { // Pending free aborted; restore page back to alloc list. f.allocs[pgid] = tx } else { // Freed page was allocated by this txn; OK to throw away. // 归还 freelist ids m = append(m, pgid) } } // Remove pages from pending list and mark as free if allocated by txid. delete(f.pending, txid) sort.Sort(m) f.ids = pgids(f.ids).merge(m) }