client go queues
最近看了看 knative-build-controller
中使用的 workqueue,不得不说是个设计复杂
而且对于 controller 来说,非常好用的一个东西。因为 controller 中会使用到 informer
,而 informer
产生的待 reconcile
key 可以放入 workqueue 中,等待 controller 逐一处理
workqueue 的实现是由 client-go
https://github.com/kubernetes/client-go 库提供的,目录位于 util/workqueue
其中 knative-build-controller 使用的为 rate_limitting_queue.go
,也就是通过
1 | func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface { |
创建出来的 queue
使用 default_rate_limiters.go
,也就是通过
1 | func DefaultControllerRateLimiter() RateLimiter { |
创建出来的 limiter,用来计算限流时间的,会取 NewItemExponentialFailureRateLimiter
和 BucketRateLimiter
中限流时间的大者
咱们重点看 queue 的实现
RateLimitingQueue 提供了 rate limit (限流) 的功能,而我们知道 knative-build-controller (当然 k8s 中许多其他的 controller 也是如此模型) 的处理模型为循环从 queue 中获取 item,并 reconcile 之
从 api 来看,rate_limitting_queue
组合了 DelayingInterface
接口,并提供了
- AddRateLimited
- Forget
- NumRequeues
方法
其中 DelayingInterface
接口又组合了 Interface
接口,并提供了
- AddAfter
方法
最后 Interface
接口提供基本的 queue 功能
- Add
- Len
- Get
- Done
- Shutdown
- ShuttingDown
RateLimittingQueue
限流队列
AddRateLimited
实际上是调用了限流算法,根据重试次数计算出当前限流时间,在该时间之后,再将 item 加入 queue 中
1 | func (q *rateLimitingType) AddRateLimited(item interface{}) { |
因此这个实现依赖于 DelayingQueue 的 AddAfter 方法
Forget
这个方法有点儿特殊,Forget 是啥语义?忘了?我 item 好端端的,为啥要忘了我
😂 一开始我也是一脸懵逼,不过想想呐,咱们调用了 AddRateLimited,而这个方法在计算限流时间时,需要使用到 item 的重试次数的,没有这个重试次数,当然计算限流时间就没有意义了
所以 Forget 呢,实际上是清除 item 的重试次数,这样下次再将这个 item AddRateLimited 时,就不会受限流的影响了
Forget 在 ItemExponentialFailureRateLimiter
中的实现
1 | func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) { |
以及计算限流时间的实现
1 | func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration { |
DelayingQueue
如此我们首先查看 DelayingInterface
接口的实现之一 delaying_queue.go
我们看,这个 queue 想实现如何的功能?
AddAfter ? 所谓的在 n time 之后的再将 item 加入 queue 的功能
为了实现这个功能,首先要考虑 AddAfter 是同步的,亦或是异步的方法 ?
当前实现是异步的方法
AddAfter
计算出 readyAt 时间,将该时间与 item 一并存入 waitingForAddCh,这个 ch 的大小为 1000,也就是说未达到 1000 时,AddAfter 是不会被阻塞的
细心的同学可能会问,如果 AddAfter 的时间为 0 甚至为负怎么办,当然这种情况直接加入 queue 即可,就不需要再加入 waitingForAddCh 了
waitingLoop
当然为了实现 AddAfter 这个功能,免不了 queue 需要做一些额外的维护事情,最重要的就是 queue 初始化时,开始用协程执行 waitingLoop 方法
这个方法是实现 delaying_queue
功能的核心逻辑
注意到待加入 queue 的 item 位于 waitingForAddCh
中,waitingLoop
当可以从 waitingForAddCh
获取到 item 时
1 | case waitEntry := <-q.waitingForAddCh: |
首先会判断这个 item 是否可以加入 queue 了,如果时候还没到,那么将该 item 加入以 readyAt 为排序关键的优先队列中。若时候到了,则加入 queue。处理完第一个 item 之后,会将 waitingForAddCh
中剩余的 item 均按照相同的逻辑处理之
这个 item 加入优先队列时,还有一个讲究的地方,注意到下述代码
1 | // insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue |
如果加入的 item 已经存在,并且新加入 item 的 readyAt 时间比已经存在的 item 的时间晚,那么不好意思哈,这个 item 会被直接丢弃。只有新加入的 item 的 readyAt 时间比已存在的 item 时间要早,才会更新已存在的 item 的 readyAt 时间,并调整 item 在优先队列中的位置
所以看到这里,delaying_queue 实际上还有去重功能
回到 waitingLoop 的 loop 来,loop 首先要执行的操作,即为从优先队列中依次 Peek item,即不断从优先队列中取出第一个 item,这个 item 最有可能到触发时间了
1 | // Add ready entries |
显然如果到达了 Add 时间,那么就将其加入 queue 中,并执行一些清理工作。若终于遍历到未到触发时间的 item 了,这个时候可以退出遍历优先队列的循环了
因为这个时候,所有当前可以 Add queue 的 item 都已经处理完了,所以接下来,可以执行一段优化的逻辑,加快 item 的处理
1 | // Set up a wait for the first item's readyAt (if one exists) |
这个有点意思,都处理完了是吧,那我选优先队列中的第一个 item,用它的 readyAt 时间设置一个定时器,这样的话,一旦到达需要 Add 的时间,waitLoop 就会处理了 (当然用户态的程序,定时上都有些许偏差,不会特别特别精确)
都准备好之后,waitLoop 就进入等待数据/事件的逻辑了
1 | select { |
ok,至此 delaying_queue 的要点就说完了,下面提几点使用的时候的注意点,避免踩坑
- AddAfter 为异步方法,所以现象为调用 AddAfter 之后,item 并不会立即被加入到 queue 中
- AddAfter 会做去重处理,在 queue 中依然有相同的 item 时,如果新加入 item 的 readyAt time 靠后的话,新加入的 item 会被丢弃
Queue
代码实现位于 queue.go 中
好了,在经过 DelayQueue 的延时加入策略之后,最终 item 还是被加入 queue 中的,而 queue 的实现也多有讲究,来一探究竟吧
queue 内部有两个重要的数据结构
- processing set
- dirty set
有一个比较特殊的方法
- Done
首先来看一下 queue 的 Add 方法
Add
代码不多,直接上了
1 | func (q *Type) Add(item interface{}) { |
Get
queue 的出口
1 | func (q *Type) Get() (item interface{}, shutdown bool) { |
看了 Add 与 Get 实现后,我们得到几个结论
- queue 也实现了去重: Add 相同 item,若该 item 未被 Get,那仅会被加入 queue 一次
- queue.Get 方法在 queue 中没数据时,是阻塞的,即你可以这写
1 | for { |
Done
1 | func (q *Type) Done(item interface{}) { |
Done 方法会去掉 item 的 processing 标记,并且如果 item 被标记为 dirty,那会再将 item 加入 queue。有这个逻辑那我们应该如何使用 queue ?
我们首先来考虑这种场景
itemA 被 Add queue,此时 X 协程 Get 时,获取到了 itemA,在这种情况下,itemA 被标记为 processing,而没有 dirty 的标记,若此时另一 Y 协程又再次调用 Add 方法将 itemA Add queue,这时 itemA 就被标记为 dirty 了,并且因为有 processing 标记,所以并不会被加入 queue 中
过了一些时间后,X 协程执行 ok 了,调用 Done(itemA) 方法,去除 itemA 的 processing 标记,因为 itemA 为 dirty,所以将其重新加入 queue 中,等待被 Get 并处理
所以在这种场景下就好理解多了,实际上 queue 还有个特性,即 queue 中不会有重复的 item,但是仅允许 item 被 Get 之后,Done 之前,被 Add 一次,在这种情况下,Done 的时候,会重新将 item 加入 queue 中
可以理解为如果这个 item 正在被处理时,queue 允许至多缓存一次相同的 item
所以再总结一次 queue 的特性
- 添加 item 调用 Add 方法
- 获取 item 调用 Get 方法
- 处理 item 之后调用 Done 方法。否则再次 Add 相同 item 时,若该 item 仍未被 Get 则直接被忽略。若该 item 已被 Get,则被打上 dirty 标记,在其被调用 Done 时,该 item 才会被重新加入 queue 中
- 本质上 queue 中不会有重复的 item
Summary
在看了这几种 queue 的实现之后,是否更了解 rate_limmiting_queue.go 该如何使用了?
例如在 knative-build-controller 中它被如此初始化 (天下代码一大抄)
1 | workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Builds"), |
具体使用时
1 | func (c *Controller) processNextWorkItem() bool { |
所以再说一遍浓缩用法
- 添加 item 调用 Add 方法
- 获取 item 调用 Get 方法
- 处理 item 之后调用 Done 方法
- 不增加 item 重试次数调用 Forget 方法
再说一遍 rate limit queue 重点,切莫踩坑
- Add 是异步方法
- Add 有去重功能
- 先经过 DelayQueue 去重处理,对于新加入的 item,在其优先队列中依然有相同的 item 时,如果新加入 item 的 readyAt time 较原 item 的 readyAt 时间靠后的话,新加入的 item 会被丢弃
- 再经过 Queue 去重处理,如果 queue 中有相同 item 则直接被丢弃。若 queue 中没有相同 item,但是 item 处于被处理中,即未被调用 Done 时,会将 item 标记为 dirty,待 item 被调用 Done 时,重新加入 queue
- 处理 item 结束之后,无论如何调用 Done,标识该 item 已被处理结束
- 若不需要增加 item 的重试次数,则结束之后调用 Forget 方法,清除该 item 的重试次数统计
- 如果需要调用 Forget,则先调用 Forget 再调用 Done,确保再次 Add 的时候不受限流影响
之所以关注到这个问题,是因为在写 build-controller 一个 bugfix 的 ut 时,各种坑,遂研究了下 workqueue 的细节,关于这个 bugfix 的讨论看这个链接 Timeout of build may have problem
Thanks for your time 😁