从 root ancestor changeset 开始,逐级解压 layer’s filesystem changeset archive 到目录 (将被使用来作为 the root of a container filesystem) 1.1 每层解压之后再遍历一次目录,删除已被标记删除的目录 removing any files with the prefix .wh. and the corresponding file or directory named without this prefix
funcDefaultControllerRateLimiter()RateLimiter { return NewMaxOfRateLimiter( NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, ) }
// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue funcinsert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) { // if the entry already exists, update the time only if it would cause the item to be queued sooner existing, exists := knownEntries[entry.data] if exists { if existing.readyAt.After(entry.readyAt) { existing.readyAt = entry.readyAt heap.Fix(q, existing.index) } return } heap.Push(q, entry) knownEntries[entry.data] = entry }
// Set up a wait for the first item's readyAt (if one exists) nextReadyAt := never if waitingForQueue.Len() > 0 { entry := waitingForQueue.Peek().(*waitFor) nextReadyAt = q.clock.After(entry.readyAt.Sub(now)) }
apiVersion:serving.knative.dev/v1alpha1# Current version of Knative kind:Service metadata: name:helloworld-go# The name of the app namespace:default# The namespace the app will use spec: runLatest: configuration: revisionTemplate: spec: container: image:gcr.io/knative-samples/helloworld-go# The URL to the image of the app env: -name:TARGET# The environment variable printed out by the sample app value:"Go Sample v1"
services 的 reconcile 首先会查询
service 对应的 configuration (its name is the same with service-name) 是否存在
不存在,创建之
存在,reconcile 之
service 对应的 routes (its name is the same with service-name) 是否存在
list-watch from kube-apiserver in a independant goroutine, once there is events about pods, then these pod data will be put into configCh
syncLoopIteration -> handle the pod data from list-watch from kube-apiserver pod resource -> HandlePodAdditions/… -> dispatchWork -> podWorkers.UpdatePod
UpdatePod
如果之前未处理该 pod,则为该 pod 创建一个大小为 1 的 UpdatePodOptions channel,并启动一个协程调用 managePodLoop(podUpdates)
// This is a blocking call that would return only if the cache // has an entry for the pod that is newer than minRuntimeCache // Time. This ensures the worker doesn't start syncing until // after the cache is at least newer than the finished time of // the previous sync. status, err := p.podCache.GetNewerThan(podUID, lastSyncTime) err = p.syncPodFn(syncPodOptions{ mirrorPod: update.MirrorPod, pod: update.Pod, podStatus: status, killPodOptions: update.KillPodOptions, updateType: update.UpdateType, }) lastSyncTime = time.Now()
syncPodFn(kubelet.syncPod)
mkdir dir
1
/var/lib/kubelet/pods/[podUID]
volumeManager.WaitForAttachAndMount(pod)
获取 imagePullSecrets
1 2
// Fetch the pull secrets for the pod pullSecrets := kl.getPullSecretsForPod(pod)
containerRuntime.SyncPod
1 2
// Call the container runtime's SyncPod callback result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
createPodSandBox
mkdir logs dir
1
/var/log/pods/[podUID]
run init container
run container
pleg
relist 的时候,先从 docker 获取一把全量的 pod 数据
1 2
// Get all the pods. podList, err := g.runtime.GetPods(true)
当前状态与之前的状态一比,生成每个 container 的 PLE (pod life cycle event)
// if a request to kill a pod is pending, we do not let anything overwrite that request. update, found := p.lastUndeliveredWorkUpdate[pod.UID] if !found || update.UpdateType != kubetypes.SyncPodKill { p.lastUndeliveredWorkUpdate[pod.UID] = *options }
即更新 lastUndeliveredWorkUpdate
HandlePodSyncs 执行结束之后(同步)
如果容器挂掉,则执行清理动作
1 2 3 4 5
if e.Type == pleg.ContainerDied { if containerID, ok := e.Data.(string); ok { kl.cleanUpContainersInPod(e.ID, containerID) } }
默认配置,只保留最新一个。若 pod 被 evicted,或者是 DeletionTimestamp != nil && notRunning(apiPodStatus.ContainerStatuses),那么它的所有容器将会被删除