// Variables for interactive containers, these have very specialized use-cases (e.g. debugging) // and shouldn't be used for general purpose containers.
// Whether this container should allocate a buffer for stdin in the container runtime. If this // is not set, reads from stdin in the container will always result in EOF. // Default is false. // +optional Stdin bool`json:"stdin,omitempty" protobuf:"varint,16,opt,name=stdin"` // Whether the container runtime should close the stdin channel after it has been opened by // a single attach. When stdin is true the stdin stream will remain open across multiple attach // sessions. If stdinOnce is set to true, stdin is opened on container start, is empty until the // first client attaches to stdin, and then remains open and accepts data until the client disconnects, // at which time stdin is closed and remains closed until the container is restarted. If this // flag is false, a container processes that reads from stdin will never receive an EOF. // Default is false // +optional StdinOnce bool`json:"stdinOnce,omitempty" protobuf:"varint,17,opt,name=stdinOnce"` // Whether this container should allocate a TTY for itself, also requires 'stdin' to be true. // Default is false. // +optional TTY bool`json:"tty,omitempty" protobuf:"varint,18,opt,name=tty"`
Dragonfly is an intelligent P2P based image and file distribution system
Originally it was born to solve all kinds of distribution at very large scales, such as application distribution, cache distribution, log distribution, image distribution, and so on
Kraken is a P2P-powered Docker registry that focuses on scalability and availability. It is designed for Docker image management, replication, and distribution in a hybrid cloud environment
all participants can reach a minimum of 80% max upload/download speed in theory (60% with current implementation), and performance doesn’t degrade much as the blob size and cluster size increase
every time the kubelet launches a container, the kubelet queries the container image registry to resolve the name to an image digest. If the kubelet has a container image with that exact digest cached locally, the kubelet uses its cached image; otherwise, the kubelet pulls the image with the resolved digest, and uses that image to launch the container.
// scaledImageScore returns an adaptively scaled score for the given state of an image. // The size of the image is used as the base score, scaled by a factor which considers how much nodes the image has "spread" to. // This heuristic aims to mitigate the undesirable "node heating problem", i.e., pods get assigned to the same or // a few nodes due to image locality. funcscaledImageScore(imageState *framework.ImageStateSummary, totalNumNodes int)int64 { spread := float64(imageState.NumNodes) / float64(totalNumNodes) returnint64(float64(imageState.Size) * spread) }
// calculatePriority returns the priority of a node. Given the sumScores of requested images on the node, the node's // priority is obtained by scaling the maximum priority value with a ratio proportional to the sumScores. funccalculatePriority(sumScores int64, numContainers int)int64 { maxThreshold := maxContainerThreshold * int64(numContainers) if sumScores < minThreshold { sumScores = minThreshold } elseif sumScores > maxThreshold { sumScores = maxThreshold }
this often results in Kubernetes running many of the Pods for WebLogic Server instances on the same Node while other Nodes are not fairly utilized. This is commonly known as the “Node heating problem.”
Kubernetes manages the lifecycle of all images through its image manager
The kubelet considers the following disk usage limits when making garbage collection decisions:
HighThresholdPercent
LowThresholdPercent
Disk usage above the configured HighThresholdPercent value triggers garbage collection, which deletes images in order based on the last time they were used, starting with the oldest first. The kubelet deletes images until disk usage reaches the LowThresholdPercent value.
Slacker: Fast Distribution with Lazy Docker Containers: Our analysis shows that pulling packages accounts for 76% of container start time, but only 6.4% of that data is read.
k8s
imagePullPolicy
Never,路径最短,依赖镜像预下载
IfNotPresent
Always
schedule imagelocality: 调度优化,当集群中大多数节点均有缓存时,优先将 pod 调度到已有缓存的节点
The novelty of the proposed system lies in the synergy achieved by picking the right systems techniques, adapting them to the machine learning algorithms, and modifying the machine learning algorithms to be more systemsfriendly. In particular, we can relax a number of otherwise hard systems constraints since the associated machine learning algorithms are quite tolerant to perturbations. The consequence is the first general purpose ML system capable of scaling to industrial scale sizes
Conda makes environments first-class citizens, making it easy to create independent environments even for C libraries. Conda is written entirely in Python
the problem with activate setting LD_LIBRARY_PATH (even when conda packages themselves don’t need it) is that it might break other things on the users system.
# path is the prefix magic file if isfile(path): try: fh = open(path, 'a+')
测试 history file 是否有读写权限
1 2
ls -alh /Users/huangzhesi/miniconda/conda-meta | grep history -rw-r--r-- 1 huangzhesi staff 8.2K 12 19 21:44 history
(1) 假若 history file 有读写权限,则 context envs dirs 按如下顺序
/Users/huangzhesi/miniconda/envs
~/.conda/envs
(2) 若 history file 没有读写权限,则 context envs dirs 按如下顺序
~/.conda/envs
/Users/huangzhesi/miniconda/envs
从 context envs dirs 中查询待激活的 env
1 2 3 4 5 6 7
# name is the conda activate {name} for envs_dir in envs_dirs: ifnot isdir(envs_dir): continue prefix = join(envs_dir, name) if isdir(prefix): return abspath(prefix)
for { select { case <-m.stop: returnnil case d := <-m.health: // FIXME: there is no way to recover from the Unhealthy state. d.Health = pluginapi.Unhealthy log.Printf("'%s' device marked unhealthy: %s", m.resourceName, d.ID) s.Send(&pluginapi.ListAndWatchResponse{Devices: m.apiDevices()}) } }
device plugin health check
health 检测的实现也比较直接
1
go m.CheckHealth(m.stop, m.cachedDevices, m.health)
使用 nvml go lib API 将已发现的每个设备注册到 eventSet,假若不支持该 API 的设备,则直接标记为 Unhealthy
// http://docs.nvidia.com/deploy/xid-errors/index.html#topic_4 // Application errors: the GPU should still be healthy applicationErrorXids := []uint64{ 13, // Graphics Engine Exception 31, // GPU memory page fault 43, // GPU stopped processing 45, // Preemptive cleanup, due to previous errors 68, // Video processor exception }
skippedXids := make(map[uint64]bool) for _, id := range applicationErrorXids { skippedXids[id] = true }
for { select { case <-stop: return default: }
e, err := nvml.WaitForEvent(eventSet, 5000) if err != nil && e.Etype != nvml.XidCriticalError { continue }
if skippedXids[e.Edata] { continue }
if e.UUID == nil || len(*e.UUID) == 0 { // All devices are unhealthy log.Printf("XidCriticalError: Xid=%d, All devices will go unhealthy.", e.Edata) for _, d := range devices { unhealthy <- d } continue }
for _, d := range devices { // Please see https://github.com/NVIDIA/gpu-monitoring-tools/blob/148415f505c96052cb3b7fdf443b34ac853139ec/bindings/go/nvml/nvml.h#L1424 // for the rationale why gi and ci can be set as such when the UUID is a full GPU UUID and not a MIG device UUID. gpu, gi, ci, err := nvml.ParseMigDeviceUUID(d.ID) if err != nil { gpu = d.ID gi = 0xFFFFFFFF ci = 0xFFFFFFFF }
if gpu == *e.UUID && gi == *e.GpuInstanceId && ci == *e.ComputeInstanceId { log.Printf("XidCriticalError: Xid=%d on Device=%s, the device will go unhealthy.", e.Edata, d.ID) unhealthy <- d } } }
The Xid message is an error report from the NVIDIA driver that is printed to the operating system’s kernel log or event log. Xid messages indicate that a general GPU error occurred, most often due to the driver programming the GPU incorrectly or to corruption of the commands sent to the GPU. The messages can be indicative of a hardware problem, an NVIDIA software problem, or a user application problem.
Under Linux, the Xid error messages are placed in the location /var/log/messages. Grep for “NVRM: Xid” to find all the Xid messages.
// syncPod is the transaction script for the sync of a single pod.
SyncPod
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Step 6: start the init container. if container := podContainerChanges.NextInitContainerToStart; container != nil { // Start the next init container. if err := start("init container", containerStartSpec(container)); err != nil { return }
// Successfully started the container; clear the entry in the failure klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod)) }
// Step 7: start containers in podContainerChanges.ContainersToStart. for _, idx := range podContainerChanges.ContainersToStart { start("container", containerStartSpec(&pod.Spec.Containers[idx])) }
pod 状态有变化时,大致的调用顺序
dispatchWork
UpdatePod
managePodLoop (goroutine)
// Creating a new pod worker either means this is a new pod, or that the kubelet just restarted.
func(p *podWorkers)managePodLoop(podUpdates <-chan UpdatePodOptions) { var lastSyncTime time.Time for update := range podUpdates { err := func()error { podUID := update.Pod.UID // This is a blocking call that would return only if the cache // has an entry for the pod that is newer than minRuntimeCache // Time. This ensures the worker doesn't start syncing until // after the cache is at least newer than the finished time of // the previous sync. status, err := p.podCache.GetNewerThan(podUID, lastSyncTime) if err != nil { // This is the legacy event thrown by manage pod loop // all other events are now dispatched from syncPodFn p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err) return err } err = p.syncPodFn(syncPodOptions{ mirrorPod: update.MirrorPod, pod: update.Pod, podStatus: status, killPodOptions: update.KillPodOptions, updateType: update.UpdateType, }) lastSyncTime = time.Now() return err }() // notify the call-back function if the operation succeeded or not if update.OnCompleteFunc != nil { update.OnCompleteFunc(err) } if err != nil { // IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors klog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err) } p.wrapUp(update.Pod.UID, err) } }
综上可知,kubelet 可以并发处理多个 pod 变化事件(syncPod in goroutine),但是处理单个 pod 的不同事件时(syncPod),为串行处理
kubelet admit pod
那么设备资源分配,如何保证不同 pod 之间无冲突呢?
kubelet 在 pod Admit 时,会调用 deviceManger Allocate api 分配设备资源
// Allocate is the call that you can use to allocate a set of devices // from the registered device plugins. func(m *ManagerImpl)Allocate(pod *v1.Pod, container *v1.Container)error { if _, ok := m.devicesToReuse[string(pod.UID)]; !ok { m.devicesToReuse[string(pod.UID)] = make(map[string]sets.String) } // If pod entries to m.devicesToReuse other than the current pod exist, delete them. for podUID := range m.devicesToReuse { if podUID != string(pod.UID) { delete(m.devicesToReuse, podUID) } } // Allocate resources for init containers first as we know the caller always loops // through init containers before looping through app containers. Should the caller // ever change those semantics, this logic will need to be amended. for _, initContainer := range pod.Spec.InitContainers { if container.Name == initContainer.Name { if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil { return err } m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)]) returnnil } } if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil { return err } m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)]) returnnil }
// This is a device plugin resource yet we don't have cached // resource state. This is likely due to a race during node // restart. We re-issue allocate request to cover this race. if m.podDevices.containerDevices(podUID, contName, resource) == nil { needsReAllocate = true } }
if needsReAllocate { klog.V(2).Infof("needs re-allocate device plugin resources for pod %s, container %s", podUID, container.Name) if err := m.Allocate(pod, container); err != nil { returnnil, err } }
...
}
19/11/10 的 commit
Checks whether we have cached runtime state before starting a container that requests any device plugin resource. If not, re-issue Allocate grpc calls. This allows us to handle the edge case that a pod got assigned to a node even before it populates its extended resource capacity.
// Allocate is the call that you can use to allocate a set of devices // from the registered device plugins. func(m *ManagerImpl)Allocate(pod *v1.Pod, container *v1.Container)error { if _, ok := m.devicesToReuse[string(pod.UID)]; !ok { m.devicesToReuse[string(pod.UID)] = make(map[string]sets.String) } // If pod entries to m.devicesToReuse other than the current pod exist, delete them. for podUID := range m.devicesToReuse { if podUID != string(pod.UID) { delete(m.devicesToReuse, podUID) } }
...
}
可见其中使用了 map,并不是并发安全的;因此上述 workaround 代码,假若触发条件非单一 pod 的情况下,是有并发问题的;既然提交了如此久,未被修复,那么我也认为该处 workaround 代码无多 pod 并发冲突 … :) 当然啦,这不是乱说的,找到上边代码合入的 PR 讨论,也可以佐证是 serialized 的
I’d need to look closer at this, but is the idea to:
Unconditionally allocate CPUs to the container from the pool of available CPUs
Check if the container we just allocated to is an init container
if it IS an init container, reset the pool of available CPUs to re-include the CPUs just assigned to the init container (but keep them assigned to the init container in the process).
If it is NOT an init container, just return (leaving the CPUs removed from the pool of available CPUs).