0%

k8s 1.19

overview

https://mp.weixin.qq.com/s/0OLdyVwg4Nsw0Xvvg8if5w

阿里巴巴云原生这篇 pod 创建效率优化不错,e2e 分析了 docker pull 的加速技术

https://qiankunli.github.io/2015/09/22/docker_image.html

docker pull phase

上图可见 docker pull 的几个阶段

  1. 下载
  2. 解压
  3. 将文件 union 为 rootfs (图中没写)

所以 docker pull 加速也从上述几个阶段入手

下载加速

p2p powered docker registry

https://d7y.io/

Dragonfly is an intelligent P2P based image and file distribution system

Originally it was born to solve all kinds of distribution at very large scales, such as application distribution, cache distribution, log distribution, image distribution, and so on

https://github.com/uber/kraken

Kraken is a P2P-powered Docker registry that focuses on scalability and availability. It is designed for Docker image management, replication, and distribution in a hybrid cloud environment

all participants can reach a minimum of 80% max upload/download speed in theory (60% with current implementation), and performance doesn’t degrade much as the blob size and cluster size increase

提升 docker image 分发到节点(大规模)的速度,适合容器化服务发布/更新的场景

docker registry mirror

https://docs.docker.com/registry/recipes/mirror/

解压加速

https://kubernetes.io/docs/setup/production-environment/container-runtimes/

https://kubernetes.io/docs/setup/production-environment/container-runtimes/#containerd

gzip/gunzip 是单线程的压缩/解压工具,可考虑采用 pigz/unpigz 进行多线程的压缩/解压,充分利用多核优势。

containerd 从 1.2 版本开始支持 pigz,节点上安装 unpigz 工具后,会优先用其进行解压。通过这种方法,可通过节点多核能力提升镜像解压效率。

k8s 加速

docker images pull policy

https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy

  1. Never
  2. IfNotPresent
  3. Always

Never,kubelet 会直接启动容器镜像;路径最短

IfNotPresent,local docker image list 查询是否命中,命中则启动容器镜像;未命中,走 docker pull;路径中等

Always,即使镜像没有变化,也会多一次 remote docker registry query 的查询时间;未命中,走 docker pull;路径最长

every time the kubelet launches a container, the kubelet queries the container image registry to resolve the name to an image digest. If the kubelet has a container image with that exact digest cached locally, the kubelet uses its cached image; otherwise, the kubelet pulls the image with the resolved digest, and uses that image to launch the container.

docker images pre-pulled

https://kubernetes.io/docs/concepts/containers/images/#pre-pulled-images

schedule imagelocality

https://kubernetes.io/docs/reference/scheduling/config/#scheduling-plugins

  • imagelocality

注意到仅对 containers 生效,对 init containers 不生效;在 k8s sche 的 score 阶段有效,使用镜像大小作为 base score,即镜像大小越大,imagelocality 调度权重越高;当然为了避免 node heating problem,即由于 imagelocality 的策略,很可能 pod 的多个副本被调度到同一个节点,而其他节点没有有效利用上;因此最终 imagelocality score 的计算,还乘以了 image spread 的比例

1
2
3
4
5
6
7
8
// scaledImageScore returns an adaptively scaled score for the given state of an image.
// The size of the image is used as the base score, scaled by a factor which considers how much nodes the image has "spread" to.
// This heuristic aims to mitigate the undesirable "node heating problem", i.e., pods get assigned to the same or
// a few nodes due to image locality.
func scaledImageScore(imageState *framework.ImageStateSummary, totalNumNodes int) int64 {
spread := float64(imageState.NumNodes) / float64(totalNumNodes)
return int64(float64(imageState.Size) * spread)
}

如果 k8s cluster 中只有少量节点缓存了指定 image,则 spread 比例就会低,相应的 imagelocality score 得分也会低;反之,假若大多数 cluster nodes 都缓存了指定 image,则 spread 比例就会高,相应的 imagelocality score 得分也高

可见 k8s 设计之初还是侧重在服务管理,上述的调度策略也是对服务的高可用性友好的

1
2
3
4
5
6
7
8
9
10
11
12
// calculatePriority returns the priority of a node. Given the sumScores of requested images on the node, the node's
// priority is obtained by scaling the maximum priority value with a ratio proportional to the sumScores.
func calculatePriority(sumScores int64, numContainers int) int64 {
maxThreshold := maxContainerThreshold * int64(numContainers)
if sumScores < minThreshold {
sumScores = minThreshold
} else if sumScores > maxThreshold {
sumScores = maxThreshold
}

return int64(framework.MaxNodeScore) * (sumScores - minThreshold) / (maxThreshold - minThreshold)
}

最后 imagelocality score 经过修正后,得到 image 在调度中的 priority;注意到 < minThreshold 的 score 都是一样的,物理意义上,就是说没有一个节点有缓存,和有少量节点有镜像缓存,在调度上的 priority 都是一样的,这样就避免了 node heating problem

node heating problem

https://oracle.github.io/weblogic-kubernetes-operator/faq/node-heating/

this often results in Kubernetes running many of the Pods for WebLogic Server instances on the same Node while other Nodes are not fairly utilized. This is commonly known as the “Node heating problem.”

k8s gc unused docker images

https://kubernetes.io/docs/concepts/architecture/garbage-collection/#containers-images

Kubernetes manages the lifecycle of all images through its image manager

The kubelet considers the following disk usage limits when making garbage collection decisions:

  • HighThresholdPercent
  • LowThresholdPercent

Disk usage above the configured HighThresholdPercent value triggers garbage collection, which deletes images in order based on the last time they were used, starting with the oldest first. The kubelet deletes images until disk usage reaches the LowThresholdPercent value.

docker image cache management

https://github.com/senthilrch/kube-fledged

kube-fledged is a kubernetes operator for creating and managing a cache of container images directly on the worker nodes of a kubernetes cluster

kube-fledged provides CRUD APIs to manage the lifecycle of the image cache

https://github.com/senthilrch/kube-fledged/blob/master/docs/design-proposal.md

按需加载文件

https://mp.weixin.qq.com/s/0OLdyVwg4Nsw0Xvvg8if5w

当前节点上创建容器时,是需要先把镜像全部数据拉取到本地,然后才能启动容器。再考虑下启动虚拟机的过程,即使是几百 GB 的虚拟机镜像,启动虚拟机也通常是在秒级别,几乎感受不到虚拟机镜像大小带来的影响。

《Slacker: Fast Distribution with Lazy Docker Containers》

该 paper 分析,在镜像启动耗时中,拉取镜像占比 76%,但是在启动时,仅有 6.4% 的数据被使用到,即镜像启动时需要的镜像数据量很少,需要考虑在镜像启动阶段按需加载镜像,改变对镜像的使用方式。

对于「Image 所有 layers 下载完后才能启动镜像」,需要改为启动容器时按需加载镜像,类似启动虚拟机的方式,仅对启动阶段需要的数据进行网络传输。

这个对现有架构的改动是很大了 …

https://developer.aliyun.com/article/742103

2020/01/08

summary

容器镜像启动加速技术

容器镜像

  1. 镜像文件下载加速
    1. docker registry mirror,本地 mirror,路程更近
    2. p2p (Dragonfly, Kraken),分发加速
  2. 镜像文件解压加速
    1. container runtime: containerd, unpigz,多线程解压
  3. 镜像文件按需加载
    1. Slacker: Fast Distribution with Lazy Docker Containers: Our analysis shows that pulling packages accounts for 76% of container start time, but only 6.4% of that data is read.

k8s

  1. imagePullPolicy
    1. Never,路径最短,依赖镜像预下载
    2. IfNotPresent
    3. Always
  2. schedule imagelocality: 调度优化,当集群中大多数节点均有缓存时,优先将 pod 调度到已有缓存的节点
  3. docker images pre-pulled + docker image cache management: 集群 docker image cache 管理

k8s 属于业务逻辑层面的优化

https://www.bilibili.com/video/BV1YA4y197G8?spm_id_from=333.337.search-card.all.click

https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-li_mu.pdf

https://github.com/dmlc/ps-lite

总结李沐大神 ps-worker 的论文实现要点如下

server 高可用

  1. 多副本复制: 例如每次对 server 的修改或写入,均会复制到两个其他副本后再回复 ok。当然这会增加延迟,以及要求客户端错误重试
  2. 一致性哈希: key-value datastore, improve load balancing and recovery

https://memcached.org/ 也采用了类似的高可用策略

server 一致性

  1. vector clock,记录时间点 (t) 发出的权重 (w) 的数据,使用 vector clock 便于实现各种一致性模型,given the potentially complex task dependency graph and
    the need for fast recovery

worker 高可用

论文中提到

  • 一般而言 worker 仅负责部分数据的计算,而部分数据往往不至于对模型的最终效果有很大影响
  • 假若每个 worker 数据量较大,恢复 worker 的代价较高,还不如恢复 server

所以 worker 的高可用,还是交由算法设计者控制。可能算法设计者更愿意实现即使 worker 挂了,模型训练依然能够运行下去的算法

summary

parameter server 综合利用了现有技术(交叉),是领域型的 memcached/redis (这两都是 kv datastore)。当然 ps 是面向机器学习算法设计的,通过优化机器学习算法,使其适应 ps api,,解决了机器学习领域大规模训练的实际(工业界)问题

The novelty of the proposed system lies in the synergy achieved by picking the right systems techniques, adapting them to the machine learning algorithms, and modifying the machine learning algorithms to be more systemsfriendly. In particular, we can relax a number of otherwise hard systems constraints since the associated machine learning algorithms are quite tolerant to perturbations. The consequence is the first general purpose ML system capable of scaling to industrial scale sizes

  • conda 4.12.0
  • macOS 10.15.7

示例为个人 PC 环境下的回显

Conda makes environments first-class citizens, making it easy to create independent environments even for C libraries. Conda is written entirely in Python

1
2
3
4
5
6
7
8
# conda environments:
#
base /Users/huangzhesi/miniconda
conda-dev * /Users/huangzhesi/miniconda/envs/conda-dev
pandas /Users/huangzhesi/miniconda/envs/pandas
pytorch-1.8 /Users/huangzhesi/miniconda/envs/pytorch-1.8
pytorch-dev /Users/huangzhesi/miniconda/envs/pytorch-dev
tensorflow-1.x /Users/huangzhesi/miniconda/envs/tensorflow-1.x

今天来探索下我们输入 conda activate conda-dev 命令后,实际上 conda 为了我们做什么

先说结论

  1. 设置了 conda env bin 到环境变量 PATH(替换 old conda env 值,比如 base conda env)
  2. 未修改环境变量 LD_LIBRARY_PATH,原因如下

https://docs.conda.io/projects/conda-build/en/latest/resources/use-shared-libraries.html#shared-libraries-in-macos-and-linux

https://conda.io/projects/conda-build/en/latest/concepts/recipe.html#prefix-replacement

https://github.com/conda/conda/issues/308#issuecomment-36058087

the problem with activate setting LD_LIBRARY_PATH (even when conda packages themselves don’t need it) is that it might break other things on the users system.

源码

https://github.com/conda/conda/tree/4.12.0

conda activate 命令在 conda/activate.py 文件里边实现

调用顺序如下

  1. activate
  2. build_activate
  3. _build_activate_stack

最终返回一个 structure

1
2
3
4
5
6
7
return {
'unset_vars': unset_vars,
'set_vars': set_vars,
'export_vars': export_vars,
'deactivate_scripts': deactivate_scripts,
'activate_scripts': activate_scripts,
}

可见 conda activate 命令实际上会 unsetsetexport vars,以达到激活环境的效果

激活流程

查询 conda env path

1
2
def conda_prefix(self):
return abspath(sys.prefix)

root_prefix case = conda_prefix = /Users/huangzhesi/miniconda

prefix magic file = {conda_prefix}/conda-meta/history

1
2
3
4
# path is the prefix magic file
if isfile(path):
try:
fh = open(path, 'a+')

测试 history file 是否有读写权限

1
2
ls -alh /Users/huangzhesi/miniconda/conda-meta | grep history
-rw-r--r-- 1 huangzhesi staff 8.2K 12 19 21:44 history

(1) 假若 history file 有读写权限,则 context envs dirs 按如下顺序

  1. /Users/huangzhesi/miniconda/envs
  2. ~/.conda/envs

(2) 若 history file 没有读写权限,则 context envs dirs 按如下顺序

  1. ~/.conda/envs
  2. /Users/huangzhesi/miniconda/envs

从 context envs dirs 中查询待激活的 env

1
2
3
4
5
6
7
# name is the conda activate {name}
for envs_dir in envs_dirs:
if not isdir(envs_dir):
continue
prefix = join(envs_dir, name)
if isdir(prefix):
return abspath(prefix)

到这里 prefix 就确定了 prefix = locate_prefix_by_name(env_name_or_prefix)

prefix = /Users/huangzhesi/miniconda/envs/conda-dev

  • CONDA_SHLVL=1
  • CONDA_PREFIX=/Users/huangzhesi/miniconda

替换 old_conda_prefix,比如 base conda env

1
2
new_path = self.pathsep_join(
self._replace_prefix_in_path(old_conda_prefix, prefix))

需要设置的环境变量

1
2
3
4
5
6
env_vars_to_export = OrderedDict((
('path', new_path),
('conda_prefix', prefix),
('conda_shlvl', new_conda_shlvl),
('conda_default_env', conda_default_env),
('conda_prompt_modifier', conda_prompt_modifier)))

set ld_library_path inside python

https://stackoverflow.com/questions/6543847/setting-ld-library-path-from-inside-python

https://stackoverflow.com/questions/856116/changing-ld-library-path-at-runtime-for-ctypes

比较 hack,不优雅 …

如果 conda env 中安装的非 conda package,其依赖 shared libraries,没太好办法,手动设置 LD_LIBRARY_PATH 环境变量吧

device plugin init and list-watch

init

device plugin 启动时

1
2
3
4
5
6
func (m *NvidiaDevicePlugin) initialize() {
m.cachedDevices = m.Devices()
m.server = grpc.NewServer([]grpc.ServerOption{}...)
m.health = make(chan *Device)
m.stop = make(chan interface{})
}

调用 m.Devices() 获取当前节点上的 gpu 设备列表信息

list-watch

返回 gpu 设备详情,注意到不健康的设备 health 字段会被设置 Unhealthy

1
2
3
4
5
6
7
8
9
10
11
for {
select {
case <-m.stop:
return nil
case d := <-m.health:
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = pluginapi.Unhealthy
log.Printf("'%s' device marked unhealthy: %s", m.resourceName, d.ID)
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.apiDevices()})
}
}

device plugin health check

health 检测的实现也比较直接

1
go m.CheckHealth(m.stop, m.cachedDevices, m.health)

使用 nvml go lib API 将已发现的每个设备注册到 eventSet,假若不支持该 API 的设备,则直接标记为 Unhealthy

注册 ok 后,开启 for loop 等待 event

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// http://docs.nvidia.com/deploy/xid-errors/index.html#topic_4
// Application errors: the GPU should still be healthy
applicationErrorXids := []uint64{
13, // Graphics Engine Exception
31, // GPU memory page fault
43, // GPU stopped processing
45, // Preemptive cleanup, due to previous errors
68, // Video processor exception
}

skippedXids := make(map[uint64]bool)
for _, id := range applicationErrorXids {
skippedXids[id] = true
}

for {
select {
case <-stop:
return
default:
}

e, err := nvml.WaitForEvent(eventSet, 5000)
if err != nil && e.Etype != nvml.XidCriticalError {
continue
}

if skippedXids[e.Edata] {
continue
}

if e.UUID == nil || len(*e.UUID) == 0 {
// All devices are unhealthy
log.Printf("XidCriticalError: Xid=%d, All devices will go unhealthy.", e.Edata)
for _, d := range devices {
unhealthy <- d
}
continue
}

for _, d := range devices {
// Please see https://github.com/NVIDIA/gpu-monitoring-tools/blob/148415f505c96052cb3b7fdf443b34ac853139ec/bindings/go/nvml/nvml.h#L1424
// for the rationale why gi and ci can be set as such when the UUID is a full GPU UUID and not a MIG device UUID.
gpu, gi, ci, err := nvml.ParseMigDeviceUUID(d.ID)
if err != nil {
gpu = d.ID
gi = 0xFFFFFFFF
ci = 0xFFFFFFFF
}

if gpu == *e.UUID && gi == *e.GpuInstanceId && ci == *e.ComputeInstanceId {
log.Printf("XidCriticalError: Xid=%d on Device=%s, the device will go unhealthy.", e.Edata, d.ID)
unhealthy <- d
}
}
}

注意到 gpu device plugin 会忽略特定 Xid,因为这些 Xid 明确不是硬件故障

NVIDIA Health & Diagnostic

https://docs.nvidia.com/deploy/index.html

xid

https://docs.nvidia.com/deploy/xid-errors/index.html#topic_4

The Xid message is an error report from the NVIDIA driver that is printed to the operating system’s kernel log or event log. Xid messages indicate that a general GPU error occurred, most often due to the driver programming the GPU incorrectly or to corruption of the commands sent to the GPU. The messages can be indicative of a hardware problem, an NVIDIA software problem, or a user application problem.

Under Linux, the Xid error messages are placed in the location /var/log/messages. Grep for “NVRM: Xid” to find all the Xid messages.

NVVS (NVIDIA Validation Suite)

https://docs.nvidia.com/deploy/nvvs-user-guide/index.html

Easily integrate into Cluster Scheduler and Cluster Management applications

k8s device

1
2
3
4
5
6
7
8
9
10
11
12
13
type ListAndWatchResponse struct {
Devices []*Device `protobuf:"bytes,1,rep,name=devices,...`

...
}

// E.g:
// struct Device {
// ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e",
// Health: "Healthy",
// Topology:
// Node:
// ID: 1

结合 Health 信息,k8s 调度器就可以忽略 UnHealthy 的 GPU 设备了

相关问题: https://github.com/kubernetes/kubernetes/issues/72486, init container 申请 devices 资源时,似乎会锁定资源,导致 container 无法申请到足够的 devices 资源

讨论目的: 探索 k8s init container 挂载 gpu 的实现逻辑

k8s release-1.19 分支代码

kubelet sync pod

pkg/kubelet/kubelet_pods.go

大致的调用顺序

  1. syncPod
  2. SyncPod
  3. startContainer
  4. generateContainerConfig
  5. GenerateRunContainerOptions
  6. GetResources
  7. GetDeviceRunContainerOptions
  8. Allocate

// syncPod is the transaction script for the sync of a single pod.

SyncPod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Step 6: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
if err := start("init container", containerStartSpec(container)); err != nil {
return
}

// Successfully started the container; clear the entry in the failure
klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
}

// Step 7: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
start("container", containerStartSpec(&pod.Spec.Containers[idx]))
}

pod 状态有变化时,大致的调用顺序

  1. dispatchWork
  2. UpdatePod
  3. managePodLoop (goroutine)

// Creating a new pod worker either means this is a new pod, or that the kubelet just restarted.

managePodLoop 循环从 podUpdates channel 中,调用 syncPod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
var lastSyncTime time.Time
for update := range podUpdates {
err := func() error {
podUID := update.Pod.UID
// This is a blocking call that would return only if the cache
// has an entry for the pod that is newer than minRuntimeCache
// Time. This ensures the worker doesn't start syncing until
// after the cache is at least newer than the finished time of
// the previous sync.
status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
if err != nil {
// This is the legacy event thrown by manage pod loop
// all other events are now dispatched from syncPodFn
p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
return err
}
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.Now()
return err
}()
// notify the call-back function if the operation succeeded or not
if update.OnCompleteFunc != nil {
update.OnCompleteFunc(err)
}
if err != nil {
// IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
klog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)
}
p.wrapUp(update.Pod.UID, err)
}
}

综上可知,kubelet 可以并发处理多个 pod 变化事件(syncPod in goroutine),但是处理单个 pod 的不同事件时(syncPod),为串行处理

kubelet admit pod

那么设备资源分配,如何保证不同 pod 之间无冲突呢?

kubelet 在 pod Admit 时,会调用 deviceManger Allocate api 分配设备资源

kubelet 处理 pod 新增大致顺序如下

  1. syncLoopIteration
  2. kubetypes.ADD
  3. HandlePodAdditions
  4. canAdmitPod

for loop pod canAdminPod

即 kubelet 处理 pod add 时,是没有并发的,逐一处理

resourceAllocator admit handler,注意到分配顺序为 init container, containers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
pod := attrs.Pod

for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
err := m.deviceManager.Allocate(pod, &container)
if err != nil {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
}

if m.cpuManager != nil {
err = m.cpuManager.Allocate(pod, &container)
if err != nil {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
}
}
}

return lifecycle.PodAdmitResult{Admit: true}
}

继续往下看 deviceManger Allocate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Allocate is the call that you can use to allocate a set of devices
// from the registered device plugins.
func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
if _, ok := m.devicesToReuse[string(pod.UID)]; !ok {
m.devicesToReuse[string(pod.UID)] = make(map[string]sets.String)
}
// If pod entries to m.devicesToReuse other than the current pod exist, delete them.
for podUID := range m.devicesToReuse {
if podUID != string(pod.UID) {
delete(m.devicesToReuse, podUID)
}
}
// Allocate resources for init containers first as we know the caller always loops
// through init containers before looping through app containers. Should the caller
// ever change those semantics, this logic will need to be amended.
for _, initContainer := range pod.Spec.InitContainers {
if container.Name == initContainer.Name {
if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
return err
}
m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
return nil
}
}
if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
return err
}
m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
return nil
}

注意到先为 init container 分配 device 资源,且分配后的 device 资源被 addContainerAllocatedResources 加入到 devicesToReuse 中;假设在下一个循环,是为 container 分配资源,则会优先使用 devicesToReuse 去分配,分配完成后,再使用 removeContainerAllocatedResourcesdevicesToReuse 中减去已分配的 device 资源

devicesToAllocate

1
2
3
4
// Allocates from reusableDevices list first.
if allocateRemainingFrom(reusableDevices) {
return allocated, nil
}

summary

相关问题: https://github.com/kubernetes/kubernetes/issues/72486#issuecomment-482554372

回到相关问题,从上述的分配逻辑可知,init container 申请 device,导致 container 无法继续申请 device 的 bug 已经被 fixed

从代码实现上也可知,假若如 issue 中的 pod yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
apiVersion: v1
kind: Pod
metadata:
name: busybox
spec:
containers:
- name: busybox
image: busybox
args:
- sleep
- "10"
resources:
requests:
alpha.kubernetes.io/nvidia-gpu: 4
cpu: "2"
memory: 4Gi
limits:
alpha.kubernetes.io/nvidia-gpu: 4
cpu: "2"
memory: 4Gi
initContainers:
- name: init-myservice
image: busybox
command: ['sh', '-c', 'sleep 200']
resources:
requests:
alpha.kubernetes.io/nvidia-gpu: 4
cpu: "2"
memory: 4Gi
limits:
alpha.kubernetes.io/nvidia-gpu: 4
cpu: "2"
memory: 4Gi
restartPolicy: Never

从 k8s 的实现逻辑上看,init container 申请的 device,实际上会与 container 申请的 device 相同;原因如下

  1. device 分配(admit)逐一 pod 进行,因此没有 pod 并发分配约束
  2. pod 内部按先 init container 后 container 的顺序依次分配 device
  3. init container 已分配的 device 作为 devicesToReuse
  4. 在后续的 container 分配时,优先使用 devicesToReuse 分配 device

不过在 syncPod 内部也有一个 workaround 的 case

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {

...

for k := range container.Resources.Limits {
...

// This is a device plugin resource yet we don't have cached
// resource state. This is likely due to a race during node
// restart. We re-issue allocate request to cover this race.
if m.podDevices.containerDevices(podUID, contName, resource) == nil {
needsReAllocate = true
}
}

if needsReAllocate {
klog.V(2).Infof("needs re-allocate device plugin resources for pod %s, container %s", podUID, container.Name)
if err := m.Allocate(pod, container); err != nil {
return nil, err
}
}

...

}

19/11/10 的 commit

Checks whether we have cached runtime state before starting a container that requests any device plugin resource. If not, re-issue Allocate grpc calls. This allows us to handle the edge case that a pod got assigned to a node even before it populates its extended resource capacity.

注释说明这种情况出现在 node 重启,pod 又被分配到了一个 node 上,但是这个 node 的 extended resource capacity 又并未 polulates 的情况

回到 deviceManger Allocate 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Allocate is the call that you can use to allocate a set of devices
// from the registered device plugins.
func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
if _, ok := m.devicesToReuse[string(pod.UID)]; !ok {
m.devicesToReuse[string(pod.UID)] = make(map[string]sets.String)
}
// If pod entries to m.devicesToReuse other than the current pod exist, delete them.
for podUID := range m.devicesToReuse {
if podUID != string(pod.UID) {
delete(m.devicesToReuse, podUID)
}
}

...

}

可见其中使用了 map,并不是并发安全的;因此上述 workaround 代码,假若触发条件非单一 pod 的情况下,是有并发问题的;既然提交了如此久,未被修复,那么我也认为该处 workaround 代码无多 pod 并发冲突 … :) 当然啦,这不是乱说的,找到上边代码合入的 PR 讨论,也可以佐证是 serialized 的

https://github.com/kubernetes/kubernetes/pull/87759

https://github.com/kubernetes/kubernetes/pull/87759#pullrequestreview-364185345

其实大佬们也注意到了这个实现的诡异之处,只是 leave it behind,因为之前就有,此次重构并未修改原来的逻辑

https://github.com/kubernetes/kubernetes/pull/87759#pullrequestreview-353195106

设计思路呢,其实就是 init container 的资源,继续分配给 container

I’d need to look closer at this, but is the idea to:

  1. Unconditionally allocate CPUs to the container from the pool of available CPUs

  2. Check if the container we just allocated to is an init container

  3. if it IS an init container, reset the pool of available CPUs to re-include the CPUs just assigned to the init container (but keep them assigned to the init container in the process).

  4. If it is NOT an init container, just return (leaving the CPUs removed from the pool of available CPUs).

https://github.com/kubernetes/kubernetes/pull/87759#discussion_r383888297

This would only work if Pod admission is serialized. @derekwaynecarr can you confirm that this is the case?

总之呢,最后是确认了是 work 的

https://developer.aliyun.com/article/784148

https://go.dev/ref/mem

Note that a read r may observe the value written by a write w that happens concurrently with r. Even if this occurs, it does not imply that reads happening after r will observe writes that happened before w.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var a, b int

func f() {
a = 1
b = 2
}

func g() {
print(b)
print(a)
}

func main() {
go f()
g()
}

it can happen that g prints 2 and then 0.

A send on a channel happens before the corresponding receive from that channel completes.

1
2
3
4
5
6
7
8
9
10
11
12
13
var c = make(chan int, 10)
var a string

func f() {
a = "hello, world"
c <- 0 // send on c
}

func main() {
go f()
<-c
print(a)
}

is guaranteed to print “hello, world”. The write to a happens before the send on c, which happens before the corresponding receive on c completes, which happens before the print.

The closing of a channel happens before a receive that returns a zero value because the channel is closed.

In the previous example, replacing c <- 0 with close(c) yields a program with the same guaranteed behavior.

A receive from an unbuffered channel happens before the send on that channel completes.

1
2
3
4
5
6
7
8
9
10
11
12
13
var c = make(chan int)
var a string

func f() {
a = "hello, world"
<-c
}

func main() {
go f()
c <- 0
print(a)
}

is also guaranteed to print “hello, world”. The write to a happens before the receive on c, which happens before the corresponding send on c completes, which happens before the print.

If the channel were buffered (e.g., c = make(chan int, 1)) then the program would not be guaranteed to print “hello, world”. (It might print the empty string, crash, or do something else.)

The kth receive on a channel with capacity C happens before the k+Cth send from that channel completes.

This program starts a goroutine for every entry in the work list, but the goroutines coordinate using the limit channel to ensure that at most three are running work functions at a time.

1
2
3
4
5
6
7
8
9
10
11
12
var limit = make(chan int, 3)

func main() {
for _, w := range work {
go func(w func()) {
limit <- 1
w()
<-limit
}(w)
}
select{}
}

Configure RoCE

https://community.mellanox.com/s/article/howto-configure-roce-on-connectx-4

https://community.mellanox.com/s/article/understanding-show-gids-script

Use ibv_query_gid and ibv_find_gid_index functions defined in libibverbs to get the desired GID index.

根据上述材料可知,RoCE 首先需要网卡设备支持比如 mlnx ConnectX-4

以 mlnx 网卡设备为例

  1. 找到 mlnx 设备 GID 映射到的网络设备

cat /sys/class/infiniband/mlx5_0/ports/1/gid_attrs/ndevs/1

  1. 查看 GIDs 1 对应的 RoCE type

cat /sys/class/infiniband/mlx5_0/ports/1/gid_attrs/types/1

  1. 查看 GIDs 1 地址

cat /sys/class/infiniband/mlx5_0/ports/1/gids/1

Interface GID Index RoCE version GID Address
ens785f0 1 RoCEv2 fe80:0000:0000:0000:e61d:2dff:fef2:a488

确定好需要使用的 GID 后,可使用 ib_send_bw 指定 GID 进行 RoCE 通信

另外注意到

https://community.mellanox.com/s/article/howto-configure-roce-on-connectx-4

在 mlnx 设备映射到的网络设备中增加的 vlan 网卡也支持 RoCE

RoCE in container

NCCL RoCE failed in container

NCCL WARN Call to ibv_modify_qp failed with error No such device

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// IB setup
ibv_context* ctx = ncclIbDevs[lComm->dev].context;
uint8_t ib_port = ncclIbDevs[lComm->dev].port;
struct ibv_port_attr portAttr;
NCCLCHECK(wrap_ibv_query_port(ctx, ib_port, &portAttr));
union ibv_gid gid;
NCCLCHECK(wrap_ibv_query_gid(ctx, ib_port, ncclParamIbGidIndex(), &gid));

// QP Creation
NCCLCHECK(ncclIbInitVerbs(ctx, &rComm->verbs));
NCCLCHECK(ncclIbCreateQp(ib_port, &rComm->verbs, IBV_ACCESS_REMOTE_WRITE, &rComm->qp));

// Adjust the MTU
remQpInfo.mtu = (enum ibv_mtu)std::min(remQpInfo.mtu, portAttr.active_mtu);

// Setup QP
struct ibv_qp* qp = rComm->qp;
NCCLCHECK(ncclIbRtrQp(qp, &remQpInfo));
NCCLCHECK(ncclIbRtsQp(qp));

ncclIbRtrQp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ncclResult_t ncclIbRtrQp(ibv_qp* qp, struct ncclIbQpInfo* info) {
struct ibv_qp_attr qpAttr;
memset(&qpAttr, 0, sizeof(struct ibv_qp_attr));
qpAttr.qp_state = IBV_QPS_RTR;
qpAttr.path_mtu = info->mtu;
qpAttr.dest_qp_num = info->qpn;
qpAttr.rq_psn = 0;
qpAttr.max_dest_rd_atomic = 1;
qpAttr.min_rnr_timer = 12;
if (info->lid == 0) {
qpAttr.ah_attr.is_global = 1;
qpAttr.ah_attr.grh.dgid.global.subnet_prefix = info->spn;
qpAttr.ah_attr.grh.dgid.global.interface_id = info->iid;
qpAttr.ah_attr.grh.flow_label = 0;
qpAttr.ah_attr.grh.sgid_index = ncclParamIbGidIndex();
qpAttr.ah_attr.grh.hop_limit = 255;
qpAttr.ah_attr.grh.traffic_class = ncclParamIbTc();
} else {
qpAttr.ah_attr.is_global = 0;
qpAttr.ah_attr.dlid = info->lid;
}
qpAttr.ah_attr.sl = ncclParamIbSl();
qpAttr.ah_attr.src_path_bits = 0;
qpAttr.ah_attr.port_num = info->ib_port;
NCCLCHECK(wrap_ibv_modify_qp(qp, &qpAttr, IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER));
return ncclSuccess;
}

推测是在容器中虽然发现了 mlnx 设备,但是并没有发现 mlnx 设备对应的网络设备(例如 demo 中的 ens785f0),也就无法找到可使用的 GID 进行 RoCE 通信

ib_write_bw failed in container

Failed to modify QP 100 to RTR

使用 ib_write_bw 也会报错,看报错信息,与 NCCL 出错的方法一致 ncclIbRtrQp

multus-cni

https://github.com/k8snetworkplumbingwg/multus-cni

理论上需要使用 multus-cni 以 macvlan 的方式增加 RoCE 网络设备到容器中

https://github.com/Mellanox/k8s-rdma-sriov-dev-plugin/issues/18

instead of calico, you should use macvlan cni where those virtual devices are child of enp175s0. RoCE can make use of those netdevices.

Other users are using multus plugin, which allows you to have multiple netdev interfaces in a Pod. Such as first managed default veth interface via your existing plugin, and second macvlan or sriov interface via 2nd cni.
This way you get both of both world for performance and functionality.

根据 multus-cni quick start 文档,假若 multus 实测可兼容目前 k8s 集群默认的 cni 插件的情况下,需要额外增加 macvlan RoCE 网络设备的 crd 资源配置(假若主机上有多个 RoCE 网络设备,则可分别创建多个 crd 资源配置,每个资源配置对应其中一个 RoCE 网络设备)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
cat <<EOF | kubectl create -f -
apiVersion: "k8s.cni.cncf.io/v1"
kind: NetworkAttachmentDefinition
metadata:
name: macvlan-conf
spec:
config: '{
"cniVersion": "0.3.0",
"type": "macvlan",
"master": "eth0",
"mode": "bridge",
"ipam": {
"type": "host-local",
"subnet": "192.168.1.0/24",
"rangeStart": "192.168.1.200",
"rangeEnd": "192.168.1.216",
"routes": [
{ "dst": "0.0.0.0/0" }
],
"gateway": "192.168.1.1"
}
}'
EOF

当然前提是 k8s 集群中已安装了 macvlan cni

type: This tells CNI which binary to call on disk. Each CNI plugin is a binary that’s called. Typically, these binaries are stored in /opt/cni/bin on each node, and CNI executes this binary. In this case we’ve specified the loopback binary (which create a loopback-type network interface). If this is your first time installing Multus, you might want to verify that the plugins that are in the “type” field are actually on disk in the /opt/cni/bin directory.

https://kubernetes.io/docs/concepts/extend-kubernetes/compute-storage-net/network-plugins/

https://www.cni.dev/plugins/current/main/macvlan/

https://docs.docker.com/network/macvlan/

Some applications, especially legacy applications or applications which monitor network traffic, expect to be directly connected to the physical network. In this type of situation, you can use the macvlan network driver to assign a MAC address to each container’s virtual network interface, making it appear to be a physical network interface directly connected to the physical network.

https://docs.docker.com/network/network-tutorial-macvlan/

init_process_group

store TCPStore

rank == 0 作为 TCPStore rendezvous handler 的 server

hostname

port

tcp://

rank

world_size

TCPStore

isServer 为 True 时,内部启动 TCPStoreDaemon

waitWorkerReady 为 True 时,10ms 轮询一次是否获取到足够到 workerNumber