- kubelet源码分析(五)之 syncPod
- 1. managePodLoop
- 2. syncPod
- 2.1. SyncPodKill
- 2.2. SyncPodCreate
- 2.3. Cgroups
- 2.4. Mirror Pod
- 2.5. makePodDataDirs
- 2.6. mount volumes
- 2.7. PullSecretsForPod
- 2.8. containerRuntime.SyncPod
- 3. Runtime.SyncPod
- 3.1. computePodActions
- 3.2. killPodWithSyncResult
- 3.3. killContainer
- 3.4. createPodSandbox
- 3.5. start init container
- 3.6. start containers
- 4. startContainer
- 4.1. pull image
- 4.2. CreateContainer
- 4.3. StartContainer
- 4.4. execute post start hook
- 5. 总结
kubelet源码分析(五)之 syncPod
以下代码分析基于
kubernetes v1.12.0版本。
本文主要分析kubelet中syncPod的部分。
1. managePodLoop
managePodLoop通过读取podUpdateschannel的信息,执行syncPodFn函数,而syncPodFn函数在newPodWorkers的时候赋值了,即kubelet.syncPod。
managePodLoop完整代码如下:
此部分代码位于pkg/kubelet/pod_workers.go
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {var lastSyncTime time.Timefor update := range podUpdates {err := func() error {podUID := update.Pod.UID// This is a blocking call that would return only if the cache// has an entry for the pod that is newer than minRuntimeCache// Time. This ensures the worker doesn't start syncing until// after the cache is at least newer than the finished time of// the previous sync.status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)if err != nil {// This is the legacy event thrown by manage pod loop// all other events are now dispatched from syncPodFnp.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)return err}// 该部分的syncPodFn实际上的实现函数是kubelet.syncPoderr = p.syncPodFn(syncPodOptions{mirrorPod: update.MirrorPod,pod: update.Pod,podStatus: status,killPodOptions: update.KillPodOptions,updateType: update.UpdateType,})lastSyncTime = time.Now()return err}()// notify the call-back function if the operation succeeded or notif update.OnCompleteFunc != nil {update.OnCompleteFunc(err)}if err != nil {// IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errorsglog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)}p.wrapUp(update.Pod.UID, err)}}
以下分析syncPod相关逻辑。
2. syncPod
syncPod可以理解为是一个单个pod进行同步任务的事务脚本。其中入参是syncPodOptions,syncPodOptions记录了需要同步的pod的相关信息。具体定义如下:
// syncPodOptions provides the arguments to a SyncPod operation.type syncPodOptions struct {// the mirror pod for the pod to sync, if it is a static podmirrorPod *v1.Pod// pod to syncpod *v1.Pod// the type of update (create, update, sync)updateType kubetypes.SyncPodType// the current statuspodStatus *kubecontainer.PodStatus// if update type is kill, use the specified options to kill the pod.killPodOptions *KillPodOptions}
syncPod主要执行以下的工作流:
- 如果是正在创建的pod,则记录pod worker的启动
latency。 - 调用
generateAPIPodStatus为pod提供v1.PodStatus信息。 - 如果pod是第一次运行,记录pod的启动
latency。 - 更新
status manager中的pod状态。 - 如果pod不应该被运行则杀死pod。
- 如果pod是一个
static pod,并且没有对应的mirror pod,则创建一个mirror pod。 - 如果没有pod的数据目录则给pod创建对应的数据目录。
- 等待volume被attach或mount。
- 获取pod的secret数据。
- 调用
container runtime的SyncPod函数,执行相关pod操作。 - 更新pod的
ingress和egress的traffic limit。
当以上任务流中有任何的error,则return error。在下一次执行syncPod的任务流会被再次执行。对于错误信息会被记录到event中,方便debug。
以下对syncPod的执行过程进行分析。
syncPod的代码位于pkg/kubelet/kubelet.go
2.1. SyncPodKill
首先,获取syncPodOptions的pod信息。
func (kl *Kubelet) syncPod(o syncPodOptions) error {// pull out the required optionspod := o.podmirrorPod := o.mirrorPodpodStatus := o.podStatusupdateType := o.updateType...}
如果pod是需要被杀死的,则执行killPod,会在指定的宽限期内杀死pod。
// if we want to kill a pod, do it now!if updateType == kubetypes.SyncPodKill {killPodOptions := o.killPodOptionsif killPodOptions == nil || killPodOptions.PodStatusFunc == nil {return fmt.Errorf("kill pod options are required if update type is kill")}apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)kl.statusManager.SetPodStatus(pod, apiPodStatus)// we kill the pod with the specified grace period since this is a terminationif err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)// there was an error killing the pod, so we return that error directlyutilruntime.HandleError(err)return err}return nil}
2.2. SyncPodCreate
如果pod是需要被创建的,则记录pod的启动latency,latency与pod在apiserver中第一次被记录相关。
// Latency measurements for the main workflow are relative to the// first time the pod was seen by the API server.var firstSeenTime time.Timeif firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()}// Record pod worker start latency if being created// TODO: make pod workers record their own latenciesif updateType == kubetypes.SyncPodCreate {if !firstSeenTime.IsZero() {// This is the first time we are syncing the pod. Record the latency// since kubelet first saw the pod if firstSeenTime is set.metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))} else {glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)}}
通过pod和pod status生成最终的api pod status并设置pod的IP。
// Generate final API pod status with pod and status manager statusapiPodStatus := kl.generateAPIPodStatus(pod, podStatus)// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and// set pod IP to hostIP directly in runtime.GetPodStatuspodStatus.IP = apiPodStatus.PodIP
记录pod到running状态的时间。
// Record the time it takes for the pod to become running.existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&!firstSeenTime.IsZero() {metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))}
如果pod是不可运行的,则更新pod和container的状态和相应的原因。
runnable := kl.canRunPod(pod)if !runnable.Admit {// Pod is not runnable; update the Pod and Container statuses to why.apiPodStatus.Reason = runnable.ReasonapiPodStatus.Message = runnable.Message// Waiting containers are not creating.const waitingReason = "Blocked"for _, cs := range apiPodStatus.InitContainerStatuses {if cs.State.Waiting != nil {cs.State.Waiting.Reason = waitingReason}}for _, cs := range apiPodStatus.ContainerStatuses {if cs.State.Waiting != nil {cs.State.Waiting.Reason = waitingReason}}}
并更新status manager中的状态信息,杀死不可运行的pod。
// Update status in the status managerkl.statusManager.SetPodStatus(pod, apiPodStatus)// Kill pod if it should not be runningif !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {var syncErr errorif err := kl.killPod(pod, nil, podStatus, nil); err != nil {kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)syncErr = fmt.Errorf("error killing pod: %v", err)utilruntime.HandleError(syncErr)} else {if !runnable.Admit {// There was no error killing the pod, but the pod cannot be run.// Return an error to signal that the sync loop should back off.syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)}}return syncErr}
如果网络插件还没到Ready状态,则只有在使用host网络模式的情况下才启动pod。
// If the network plugin is not ready, only start the pod if it uses the host networkif rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) {kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, rs)return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, rs)}
2.3. Cgroups
给pod创建Cgroups,如果cgroups-per-qos参数开启,则申请相应的资源。对于terminated的pod不需要创建或更新pod的Cgroups。
当重新启动kubelet并且启用cgroups-per-qos时,应该间歇性地终止所有pod的运行容器并在qos cgroup hierarchy下重新启动。
如果pod的cgroup已经存在或者pod第一次运行,不杀死pod中容器。
// Create Cgroups for the pod and apply resource parameters// to them if cgroups-per-qos flag is enabled.pcm := kl.containerManager.NewPodContainerManager()// If pod has already been terminated then we need not create// or update the pod's cgroupif !kl.podIsTerminated(pod) {// When the kubelet is restarted with the cgroups-per-qos// flag enabled, all the pod's running containers// should be killed intermittently and brought back up// under the qos cgroup hierarchy.// Check if this is the pod's first syncfirstSync := truefor _, containerStatus := range apiPodStatus.ContainerStatuses {if containerStatus.State.Running != nil {firstSync = falsebreak}}// Don't kill containers in pod if pod's cgroups already// exists or the pod is running for the first timepodKilled := falseif !pcm.Exists(pod) && !firstSync {if err := kl.killPod(pod, nil, podStatus, nil); err == nil {podKilled = true}}...
如果pod被杀死并且重启策略是Never,则不创建或更新对应的Cgroups,否则创建和更新pod的Cgroups。
// Create and Update pod's Cgroups// Don't create cgroups for run once pod if it was killed above// The current policy is not to restart the run once pods when// the kubelet is restarted with the new flag as run once pods are// expected to run only once and if the kubelet is restarted then// they are not expected to run again.// We don't create and apply updates to cgroup if its a run once pod and was killed aboveif !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {if !pcm.Exists(pod) {if err := kl.containerManager.UpdateQOSCgroups(); err != nil {glog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)}if err := pcm.EnsureExists(pod); err != nil {kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)}}}
其中创建Cgroups是通过containerManager的UpdateQOSCgroups来执行。
if err := kl.containerManager.UpdateQOSCgroups(); err != nil {glog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)}
2.4. Mirror Pod
如果pod是一个static pod,没有对应的mirror pod,则创建一个mirror pod;如果存在mirror pod则删除再重建一个mirror pod。
// Create Mirror Pod for Static Pod if it doesn't already existif kubepod.IsStaticPod(pod) {podFullName := kubecontainer.GetPodFullName(pod)deleted := falseif mirrorPod != nil {if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {// The mirror pod is semantically different from the static pod. Remove// it. The mirror pod will get recreated later.glog.Warningf("Deleting mirror pod %q because it is outdated", format.Pod(mirrorPod))if err := kl.podManager.DeleteMirrorPod(podFullName); err != nil {glog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)} else {deleted = true}}}if mirrorPod == nil || deleted {node, err := kl.GetNode()if err != nil || node.DeletionTimestamp != nil {glog.V(4).Infof("No need to create a mirror pod, since node %q has been removed from the cluster", kl.nodeName)} else {glog.V(4).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))if err := kl.podManager.CreateMirrorPod(pod); err != nil {glog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)}}}}
2.5. makePodDataDirs
给pod创建数据目录。
// Make data directories for the podif err := kl.makePodDataDirs(pod); err != nil {kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)glog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)return err}
其中数据目录包括
PodDir:{kubelet.rootDirectory}/pods/podUIDPodVolumesDir:{PodDir}/volumesPodPluginsDir:{PodDir}/plugins
// makePodDataDirs creates the dirs for the pod datas.func (kl *Kubelet) makePodDataDirs(pod *v1.Pod) error {uid := pod.UIDif err := os.MkdirAll(kl.getPodDir(uid), 0750); err != nil && !os.IsExist(err) {return err}if err := os.MkdirAll(kl.getPodVolumesDir(uid), 0750); err != nil && !os.IsExist(err) {return err}if err := os.MkdirAll(kl.getPodPluginsDir(uid), 0750); err != nil && !os.IsExist(err) {return err}return nil}
2.6. mount volumes
对非terminated状态的pod挂载volume。
// Volume manager will not mount volumes for terminated podsif !kl.podIsTerminated(pod) {// Wait for volumes to attach/mountif err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)return err}}
2.7. PullSecretsForPod
获取pod的secret数据。
// Fetch the pull secrets for the podpullSecrets := kl.getPullSecretsForPod(pod)
getPullSecretsForPod具体实现函数如下:
// getPullSecretsForPod inspects the Pod and retrieves the referenced pull// secrets.func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) []v1.Secret {pullSecrets := []v1.Secret{}for _, secretRef := range pod.Spec.ImagePullSecrets {secret, err := kl.secretManager.GetSecret(pod.Namespace, secretRef.Name)if err != nil {glog.Warningf("Unable to retrieve pull secret %s/%s for %s/%s due to %v. The image pull may not succeed.", pod.Namespace, secretRef.Name, pod.Namespace, pod.Name, err)continue}pullSecrets = append(pullSecrets, *secret)}return pullSecrets}
2.8. containerRuntime.SyncPod
调用container runtime的SyncPod函数,执行相关pod操作,由此kubelet.syncPod的操作逻辑转入containerRuntime.SyncPod函数中。
// Call the container runtime's SyncPod callbackresult := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)kl.reasonCache.Update(pod.UID, result)if err := result.Error(); err != nil {// Do not return error if the only failures were pods in backofffor _, r := range result.SyncResults {if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {// Do not record an event here, as we keep all event logging for sync pod failures// local to container runtime so we get better errorsreturn err}}return nil}
3. Runtime.SyncPod
SyncPod主要执行sync操作使得运行的pod达到期望状态的pod。主要执行以下操作:
- 计算
sandbox和container的变化。 - 必要的时候杀死pod。
- 杀死所有不需要运行的
container。 - 必要时创建
sandbox。 - 创建
init container。 - 创建正常的
container。
Runtime.SyncPod部分代码位于pkg/kubelet/kuberuntime/kuberuntime_manager.go
3.1. computePodActions
计算sandbox和container的变化。
// Step 1: Compute sandbox and container changes.podContainerChanges := m.computePodActions(pod, podStatus)glog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))if podContainerChanges.CreateSandbox {ref, err := ref.GetReference(legacyscheme.Scheme, pod)if err != nil {glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)}if podContainerChanges.SandboxID != "" {m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")} else {glog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))}}
3.2. killPodWithSyncResult
必要的时候杀死pod。
// Step 2: Kill the pod if the sandbox has changed.if podContainerChanges.KillPod {if !podContainerChanges.CreateSandbox {glog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))} else {glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))}killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)result.AddPodSyncResult(killResult)if killResult.Error() != nil {glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())return}if podContainerChanges.CreateSandbox {m.purgeInitContainers(pod, podStatus)}}
3.3. killContainer
杀死所有不需要运行的container。
// Step 3: kill any running containers in this pod which are not to keep.for containerID, containerInfo := range podContainerChanges.ContainersToKill {glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)result.AddSyncResult(killContainerResult)if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())glog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)return}}
3.4. createPodSandbox
必要时创建sandbox。
// Step 4: Create a sandbox for the pod if necessary....glog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))result.AddSyncResult(createSandboxResult)podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)if err != nil {createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)glog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)ref, referr := ref.GetReference(legacyscheme.Scheme, pod)if referr != nil {glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)}m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed create pod sandbox: %v", err)return}glog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))
3.5. start init container
创建init container。
// Step 5: start the init container.if container := podContainerChanges.NextInitContainerToStart; container != nil {// Start the next init container.startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)result.AddSyncResult(startContainerResult)isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)if isInBackOff {startContainerResult.Fail(err, msg)glog.V(4).Infof("Backing Off restarting init container %+v in pod %v", container, format.Pod(pod))return}glog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod))if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); err != nil {startContainerResult.Fail(err, msg)utilruntime.HandleError(fmt.Errorf("init container start failed: %v: %s", err, msg))return}// Successfully started the container; clear the entry in the failureglog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))}
3.6. start containers
创建正常的container。
// Step 6: start containers in podContainerChanges.ContainersToStart.for _, idx := range podContainerChanges.ContainersToStart {container := &pod.Spec.Containers[idx]startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)result.AddSyncResult(startContainerResult)isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)if isInBackOff {startContainerResult.Fail(err, msg)glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))continue}glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))// 通过startContainer来运行容器if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil {startContainerResult.Fail(err, msg)// known errors that are logged in other places are logged at higher levels here to avoid// repetitive log spamswitch {case err == images.ErrImagePullBackOff:glog.V(3).Infof("container start failed: %v: %s", err, msg)default:utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg))}continue}}
4. startContainer
startContainer启动一个容器并返回是否成功。
主要包括以下几个步骤:
- 拉取镜像
- 创建容器
- 启动容器
- 运行post start lifecycle hooks(如果有设置此项)
startContainer完整代码如下:
startContainer部分代码位于pkg/kubelet/kuberuntime/kuberuntime_container.go
// startContainer starts a container and returns a message indicates why it is failed on error.// It starts the container through the following steps:// * pull the image// * create the container// * start the container// * run the post start lifecycle hooks (if applicable)func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {// Step 1: pull the image.imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)if err != nil {m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))return msg, err}// Step 2: create the container.ref, err := kubecontainer.GenerateContainerRef(pod, container)if err != nil {glog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)}glog.V(4).Infof("Generating ref for container %s: %#v", container.Name, ref)// For a new container, the RestartCount should be 0restartCount := 0containerStatus := podStatus.FindContainerStatusByName(container.Name)if containerStatus != nil {restartCount = containerStatus.RestartCount + 1}containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)if cleanupAction != nil {defer cleanupAction()}if err != nil {m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))return grpc.ErrorDesc(err), ErrCreateContainerConfig}containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)if err != nil {m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))return grpc.ErrorDesc(err), ErrCreateContainer}err = m.internalLifecycle.PreStartContainer(pod, container, containerID)if err != nil {m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", grpc.ErrorDesc(err))return grpc.ErrorDesc(err), ErrPreStartHook}m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container")if ref != nil {m.containerRefManager.SetRef(kubecontainer.ContainerID{Type: m.runtimeName,ID: containerID,}, ref)}// Step 3: start the container.err = m.runtimeService.StartContainer(containerID)if err != nil {m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", grpc.ErrorDesc(err))return grpc.ErrorDesc(err), kubecontainer.ErrRunContainer}m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, "Started container")// Symlink container logs to the legacy container log location for cluster logging// support.// TODO(random-liu): Remove this after cluster logging supports CRI container log path.containerMeta := containerConfig.GetMetadata()sandboxMeta := podSandboxConfig.GetMetadata()legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,sandboxMeta.Namespace)containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)// only create legacy symlink if containerLog path exists (or the error is not IsNotExist).// Because if containerLog path does not exist, only dandling legacySymlink is created.// This dangling legacySymlink is later removed by container gc, so it does not make sense// to create it in the first place. it happens when journald logging driver is used with docker.if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {glog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",legacySymlink, containerID, containerLog, err)}}// Step 4: execute the post start hook.if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {kubeContainerID := kubecontainer.ContainerID{Type: m.runtimeName,ID: containerID,}msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)if handlerErr != nil {m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil); err != nil {glog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v",container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err)}return msg, fmt.Errorf("%s: %v", ErrPostStartHook, handlerErr)}}return "", nil}
以下对startContainer分段分析:
4.1. pull image
通过EnsureImageExists方法拉取拉取指定pod容器的镜像,并返回镜像信息和错误。
// Step 1: pull the image.imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)if err != nil {m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))return msg, err}
4.2. CreateContainer
首先生成container的*v1.ObjectReference对象,该对象包括container的相关信息。
// Step 2: create the container.ref, err := kubecontainer.GenerateContainerRef(pod, container)if err != nil {glog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)}glog.V(4).Infof("Generating ref for container %s: %#v", container.Name, ref)
统计container的重启次数,新的容器默认重启次数为0。
// For a new container, the RestartCount should be 0restartCount := 0containerStatus := podStatus.FindContainerStatusByName(container.Name)if containerStatus != nil {restartCount = containerStatus.RestartCount + 1}
生成container的配置。
containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)if cleanupAction != nil {defer cleanupAction()}if err != nil {m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))return grpc.ErrorDesc(err), ErrCreateContainerConfig}
调用runtimeService,执行CreateContainer的操作。
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)if err != nil {m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))return grpc.ErrorDesc(err), ErrCreateContainer}err = m.internalLifecycle.PreStartContainer(pod, container, containerID)if err != nil {m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", grpc.ErrorDesc(err))return grpc.ErrorDesc(err), ErrPreStartHook}m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container")if ref != nil {m.containerRefManager.SetRef(kubecontainer.ContainerID{Type: m.runtimeName,ID: containerID,}, ref)}
4.3. StartContainer
执行runtimeService的StartContainer方法,来启动容器。
// Step 3: start the container.err = m.runtimeService.StartContainer(containerID)if err != nil {m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", grpc.ErrorDesc(err))return grpc.ErrorDesc(err), kubecontainer.ErrRunContainer}m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, "Started container")// Symlink container logs to the legacy container log location for cluster logging// support.// TODO(random-liu): Remove this after cluster logging supports CRI container log path.containerMeta := containerConfig.GetMetadata()sandboxMeta := podSandboxConfig.GetMetadata()legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,sandboxMeta.Namespace)containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)// only create legacy symlink if containerLog path exists (or the error is not IsNotExist).// Because if containerLog path does not exist, only dandling legacySymlink is created.// This dangling legacySymlink is later removed by container gc, so it does not make sense// to create it in the first place. it happens when journald logging driver is used with docker.if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {glog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",legacySymlink, containerID, containerLog, err)}}
4.4. execute post start hook
如果有指定Lifecycle.PostStart,则执行PostStart操作,PostStart如果执行失败,则容器会根据重启的规则进行重启。
// Step 4: execute the post start hook.if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {kubeContainerID := kubecontainer.ContainerID{Type: m.runtimeName,ID: containerID,}msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)if handlerErr != nil {m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil); err != nil {glog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v",container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err)}return msg, fmt.Errorf("%s: %v", ErrPostStartHook, handlerErr)}}
5. 总结
kubelet的工作是管理pod在Node上的生命周期(包括增删改查),kubelet通过各种类型的manager异步工作各自执行各自的任务,其中使用到了多种的channel来控制状态信号变化的传递,例如比较重要的channel有podUpdates <-chan UpdatePodOptions,来传递pod的变化情况。
创建pod的调用逻辑
syncLoopIteration-->kubetypes.ADD-->HandlePodAdditions(u.Pods)-->dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)-->podWorkers.UpdatePod-->managePodLoop(podUpdates)-->syncPod(o syncPodOptions)-->containerRuntime.SyncPod-->startContainer
参考:
- https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/kubelet.go
- https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/pod_workers.go
- https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/kuberuntime/kuberuntime_container.go
