- kubelet源码分析(一)之 NewKubeletCommand
- 1. Main 函数
- 2. NewKubeletCommand
- 2.1. 参数解析
- 2.1.1. 初始化参数和配置
- 2.1.2. 打印帮助信息和版本信息
- 2.1.3. kubelet config
- 2.1.4. dynamic kubelet config
- 2.2. 初始化kubeletServer和kubeletDeps
- 2.2.1. kubeletServer
- 2.2.2. kubeletDeps
- 2.2.3. docker shim
- 2.3. AddFlags
- 2.4. 运行kubelet
- 2.1. 参数解析
- 3. Run
- 3.1. 构造kubeDeps
- 3.1.1. clientConfig
- 3.1.2. kubeClient
- 3.1.3. dynamicKubeClient
- 3.1.4. eventClient
- 3.1.5. heartbeatClient
- 3.1.6. csiClient
- 3.1.7. CAdvisorInterface
- 3.1.8. ContainerManager
- 3.1.9. oomAdjuster
- 3.2. Health check
- 3.3. RunKubelet
- 3.1. 构造kubeDeps
- 4. RunKubelet
- 4.1. CreateAndInitKubelet
- 4.1.1. NewMainKubelet
- 4.1.2. PodConfig
- 4.2. startKubelet
- 4.2.1. k.Run
- 4.1. CreateAndInitKubelet
- 5. 总结
kubelet源码分析(一)之 NewKubeletCommand
以下代码分析基于
kubernetes v1.12.0版本。本文主要分析 https://github.com/kubernetes/kubernetes/tree/v1.12.0/cmd/kubelet 部分的代码。
本文主要分析 kubernetes/cmd/kubelet部分,该部分主要涉及kubelet的参数解析,及初始化和构造相关的依赖组件(主要在kubeDeps结构体中),并没有kubelet运行的详细逻辑,该部分位于kubernetes/pkg/kubelet模块,待后续文章分析。
kubelet的cmd代码目录结构如下:
kubelet├── app│ ├── auth.go│ ├── init_others.go│ ├── init_windows.go│ ├── options # 包括kubelet使用到的option│ │ ├── container_runtime.go│ │ ├── globalflags.go│ │ ├── globalflags_linux.go│ │ ├── globalflags_other.go│ │ ├── options.go # 包括KubeletFlags、AddFlags、AddKubeletConfigFlags等│ │ ├── osflags_others.go│ │ └── osflags_windows.go│ ├── plugins.go│ ├── server.go # 包括NewKubeletCommand、Run、RunKubelet、CreateAndInitKubelet、startKubelet等│ ├── server_linux.go│ └── server_unsupported.go└── kubelet.go # kubelet的main入口函数
1. Main 函数
kubelet的入口函数Main 函数,具体代码参考:https://github.com/kubernetes/kubernetes/blob/v1.12.0/cmd/kubelet/kubelet.go。
func main() {rand.Seed(time.Now().UTC().UnixNano())command := app.NewKubeletCommand(server.SetupSignalHandler())logs.InitLogs()defer logs.FlushLogs()if err := command.Execute(); err != nil {fmt.Fprintf(os.Stderr, "%v\n", err)os.Exit(1)}}
kubelet代码主要采用了Cobra命令行框架,核心代码如下:
// 初始化命令行command := app.NewKubeletCommand(server.SetupSignalHandler())// 执行Executeerr := command.Execute()
2. NewKubeletCommand
NewKubeletCommand基于参数创建了一个*cobra.Command对象。其中核心部分代码为参数解析部分和Run函数。
// NewKubeletCommand creates a *cobra.Command object with default parametersfunc NewKubeletCommand(stopCh <-chan struct{}) *cobra.Command {...cmd := &cobra.Command{Use: componentKubelet,Long: `...`,// The Kubelet has special flag parsing requirements to enforce flag precedence rules,// so we do all our parsing manually in Run, below.// DisableFlagParsing=true provides the full set of flags passed to the kubelet in the// `args` arg to Run, without Cobra's interference.DisableFlagParsing: true,Run: func(cmd *cobra.Command, args []string) {...// run the kubeletglog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {glog.Fatal(err)}},}...return cmd}
2.1. 参数解析
kubelet开启了DisableFlagParsing参数,没有使用Cobra框架中的默认参数解析,而是自定义参数解析。
2.1.1. 初始化参数和配置
初始化参数解析,初始化cleanFlagSet,kubeletFlags,kubeletConfig。
cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)cleanFlagSet.SetNormalizeFunc(flag.WordSepNormalizeFunc)kubeletFlags := options.NewKubeletFlags()kubeletConfig, err := options.NewKubeletConfiguration()
2.1.2. 打印帮助信息和版本信息
如果输入非法参数则打印使用帮助信息。
// initial flag parse, since we disable cobra's flag parsingif err := cleanFlagSet.Parse(args); err != nil {cmd.Usage()glog.Fatal(err)}// check if there are non-flag arguments in the command linecmds := cleanFlagSet.Args()if len(cmds) > 0 {cmd.Usage()glog.Fatalf("unknown command: %s", cmds[0])}
遇到help和version参数则打印相关内容并退出。
// short-circuit on helphelp, err := cleanFlagSet.GetBool("help")if err != nil {glog.Fatal(`"help" flag is non-bool, programmer error, please correct`)}if help {cmd.Help()return}// short-circuit on verflagverflag.PrintAndExitIfRequested()utilflag.PrintFlags(cleanFlagSet)
2.1.3. kubelet config
加载并校验kubelet config。其中包括校验初始化的kubeletFlags,并从kubeletFlags的KubeletConfigFile参数获取kubelet config的内容。
// set feature gates from initial flags-based configif err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {glog.Fatal(err)}// validate the initial KubeletFlagsif err := options.ValidateKubeletFlags(kubeletFlags); err != nil {glog.Fatal(err)}if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {glog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")}// load kubelet config file, if providedif configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {kubeletConfig, err = loadConfigFile(configFile)if err != nil {glog.Fatal(err)}// We must enforce flag precedence by re-parsing the command line into the new object.// This is necessary to preserve backwards-compatibility across binary upgrades.// See issue #56171 for more details.if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {glog.Fatal(err)}// update feature gates based on new configif err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {glog.Fatal(err)}}// We always validate the local configuration (command line + config file).// This is the default "last-known-good" config for dynamic config, and must always remain valid.if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {glog.Fatal(err)}
2.1.4. dynamic kubelet config
如果开启使用动态kubelet的配置,则由动态配置文件替换kubelet配置文件。
// use dynamic kubelet config, if enabledvar kubeletConfigController *dynamickubeletconfig.Controllerif dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfigurationdynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,func(kc *kubeletconfiginternal.KubeletConfiguration) error {// Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,// so that we get a complete validation at the same point where we can decide to reject dynamic config.// This fixes the flag-precedence component of issue #63305.// See issue #56171 for general details on flag precedence.return kubeletConfigFlagPrecedence(kc, args)})if err != nil {glog.Fatal(err)}// If we should just use our existing, local config, the controller will return a nil configif dynamicKubeletConfig != nil {kubeletConfig = dynamicKubeletConfig// Note: flag precedence was already enforced in the controller, prior to validation,// by our above transform function. Now we simply update feature gates from the new config.if err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {glog.Fatal(err)}}}
总结:以上通过对各种特定参数的解析,最终生成kubeletFlags和kubeletConfig两个重要的参数对象,用来构造kubeletServer和其他需求。
2.2. 初始化kubeletServer和kubeletDeps
2.2.1. kubeletServer
// construct a KubeletServer from kubeletFlags and kubeletConfigkubeletServer := &options.KubeletServer{KubeletFlags: *kubeletFlags,KubeletConfiguration: *kubeletConfig,}
2.2.2. kubeletDeps
// use kubeletServer to construct the default KubeletDepskubeletDeps, err := UnsecuredDependencies(kubeletServer)if err != nil {glog.Fatal(err)}// add the kubelet config controller to kubeletDepskubeletDeps.KubeletConfigController = kubeletConfigController
2.2.3. docker shim
如果开启了docker shim参数,则执行RunDockershim。
// start the experimental docker shim, if enabledif kubeletServer.KubeletFlags.ExperimentalDockershim {if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {glog.Fatal(err)}return}
2.3. AddFlags
// keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flagskubeletFlags.AddFlags(cleanFlagSet)options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)options.AddGlobalFlags(cleanFlagSet)cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))// ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flagsconst usageFmt = "Usage:\n %s\n\nFlags:\n%s"cmd.SetUsageFunc(func(cmd *cobra.Command) error {fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))return nil})cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))})
其中:
AddFlags代码可参考:kubernetes/cmd/kubelet/app/options/options.go#L323AddKubeletConfigFlags代码可参考:kubernetes/cmd/kubelet/app/options/options.go#L424
2.4. 运行kubelet
运行kubelet并且不退出。由Run函数进入后续的操作。
// run the kubeletglog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {glog.Fatal(err)}
3. Run
// Run runs the specified KubeletServer with the given Dependencies. This should never exit.// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will// not be generated.func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error {// To help debugging, immediately log versionglog.Infof("Version: %+v", version.Get())if err := initForOS(s.KubeletFlags.WindowsService); err != nil {return fmt.Errorf("failed OS init: %v", err)}if err := run(s, kubeDeps, stopCh); err != nil {return fmt.Errorf("failed to run Kubelet: %v", err)}return nil}
当运行环境是Windows的时候,初始化操作,但是该操作为空,只是预留。具体执行run(s, kubeDeps, stopCh)函数。
3.1. 构造kubeDeps
3.1.1. clientConfig
创建clientConfig,该对象用来创建各种的kubeDeps属性中包含的client。
clientConfig, err := createAPIServerClientConfig(s)if err != nil {return fmt.Errorf("invalid kubeconfig: %v", err)}
3.1.2. kubeClient
kubeClient, err = clientset.NewForConfig(clientConfig)if err != nil {glog.Warningf("New kubeClient from clientConfig error: %v", err)} else if kubeClient.CertificatesV1beta1() != nil && clientCertificateManager != nil {glog.V(2).Info("Starting client certificate rotation.")clientCertificateManager.SetCertificateSigningRequestClient(kubeClient.CertificatesV1beta1().CertificateSigningRequests())clientCertificateManager.Start()}
3.1.3. dynamicKubeClient
dynamicKubeClient, err = dynamic.NewForConfig(clientConfig)if err != nil {glog.Warningf("Failed to initialize dynamic KubeClient: %v", err)}
3.1.4. eventClient
// make a separate client for eventseventClientConfig := *clientConfigeventClientConfig.QPS = float32(s.EventRecordQPS)eventClientConfig.Burst = int(s.EventBurst)eventClient, err = v1core.NewForConfig(&eventClientConfig)if err != nil {glog.Warningf("Failed to create API Server client for Events: %v", err)}
3.1.5. heartbeatClient
// make a separate client for heartbeat with throttling disabled and a timeout attachedheartbeatClientConfig := *clientConfigheartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration// if the NodeLease feature is enabled, the timeout is the minimum of the lease duration and status update frequencyif utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Secondif heartbeatClientConfig.Timeout > leaseTimeout {heartbeatClientConfig.Timeout = leaseTimeout}}heartbeatClientConfig.QPS = float32(-1)heartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)if err != nil {glog.Warningf("Failed to create API Server client for heartbeat: %v", err)}
3.1.6. csiClient
// csiClient works with CRDs that support json onlyclientConfig.ContentType = "application/json"csiClient, err := csiclientset.NewForConfig(clientConfig)if err != nil {glog.Warningf("Failed to create CSI API client: %v", err)}
client赋值
kubeDeps.KubeClient = kubeClientkubeDeps.DynamicKubeClient = dynamicKubeClientif heartbeatClient != nil {kubeDeps.HeartbeatClient = heartbeatClientkubeDeps.OnHeartbeatFailure = closeAllConns}if eventClient != nil {kubeDeps.EventClient = eventClient}kubeDeps.CSIClient = csiClient
3.1.7. CAdvisorInterface
if kubeDeps.CAdvisorInterface == nil {imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))if err != nil {return err}}
3.1.8. ContainerManager
if kubeDeps.ContainerManager == nil {if s.CgroupsPerQOS && s.CgroupRoot == "" {glog.Infof("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")s.CgroupRoot = "/"}kubeReserved, err := parseResourceList(s.KubeReserved)if err != nil {return err}systemReserved, err := parseResourceList(s.SystemReserved)if err != nil {return err}var hardEvictionThresholds []evictionapi.Threshold// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)if err != nil {return err}}experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)if err != nil {return err}devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)kubeDeps.ContainerManager, err = cm.NewContainerManager(kubeDeps.Mounter,kubeDeps.CAdvisorInterface,cm.NodeConfig{RuntimeCgroupsName: s.RuntimeCgroups,SystemCgroupsName: s.SystemCgroups,KubeletCgroupsName: s.KubeletCgroups,ContainerRuntime: s.ContainerRuntime,CgroupsPerQOS: s.CgroupsPerQOS,CgroupRoot: s.CgroupRoot,CgroupDriver: s.CgroupDriver,KubeletRootDir: s.RootDirectory,ProtectKernelDefaults: s.ProtectKernelDefaults,NodeAllocatableConfig: cm.NodeAllocatableConfig{KubeReservedCgroupName: s.KubeReservedCgroup,SystemReservedCgroupName: s.SystemReservedCgroup,EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),KubeReserved: kubeReserved,SystemReserved: systemReserved,HardEvictionThresholds: hardEvictionThresholds,},QOSReserved: *experimentalQOSReserved,ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,ExperimentalPodPidsLimit: s.PodPidsLimit,EnforceCPULimits: s.CPUCFSQuota,CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,},s.FailSwapOn,devicePluginEnabled,kubeDeps.Recorder)if err != nil {return err}}
3.1.9. oomAdjuster
// TODO(vmarmol): Do this through container config.oomAdjuster := kubeDeps.OOMAdjusterif err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {glog.Warning(err)}
3.2. Health check
if s.HealthzPort > 0 {healthz.DefaultHealthz()go wait.Until(func() {err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil)if err != nil {glog.Errorf("Starting health server failed: %v", err)}}, 5*time.Second, wait.NeverStop)}
3.3. RunKubelet
通过各种赋值构造了完整的kubeDeps结构体,最后再执行RunKubelet转入后续的kubelet执行流程。
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {return err}
4. RunKubelet
// RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:// 1 Integration tests// 2 Kubelet binary// 3 Standalone 'kubernetes' binary// Eventually, #2 will be replaced with instances of #3func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {...k, err := CreateAndInitKubelet(&kubeServer.KubeletConfiguration,...kubeServer.NodeStatusMaxImages)if err != nil {return fmt.Errorf("failed to create kubelet: %v", err)}// NewMainKubelet should have set up a pod source config if one didn't exist// when the builder was run. This is just a precaution.if kubeDeps.PodConfig == nil {return fmt.Errorf("failed to create kubelet, pod source config was nil")}podCfg := kubeDeps.PodConfigrlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))// process pods and exit.if runOnce {if _, err := k.RunOnce(podCfg.Updates()); err != nil {return fmt.Errorf("runonce failed: %v", err)}glog.Infof("Started kubelet as runonce")} else {startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)glog.Infof("Started kubelet")}return nil}
RunKubelet函数核心代码为执行了CreateAndInitKubelet和startKubelet两个函数的操作,以下对这两个函数进行分析。
4.1. CreateAndInitKubelet
通过传入kubeDeps调用CreateAndInitKubelet初始化Kubelet。
k, err := CreateAndInitKubelet(&kubeServer.KubeletConfiguration,kubeDeps,&kubeServer.ContainerRuntimeOptions,kubeServer.ContainerRuntime,kubeServer.RuntimeCgroups,kubeServer.HostnameOverride,kubeServer.NodeIP,kubeServer.ProviderID,kubeServer.CloudProvider,kubeServer.CertDirectory,kubeServer.RootDirectory,kubeServer.RegisterNode,kubeServer.RegisterWithTaints,kubeServer.AllowedUnsafeSysctls,kubeServer.RemoteRuntimeEndpoint,kubeServer.RemoteImageEndpoint,kubeServer.ExperimentalMounterPath,kubeServer.ExperimentalKernelMemcgNotification,kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,kubeServer.MinimumGCAge,kubeServer.MaxPerPodContainerCount,kubeServer.MaxContainerCount,kubeServer.MasterServiceNamespace,kubeServer.RegisterSchedulable,kubeServer.NonMasqueradeCIDR,kubeServer.KeepTerminatedPodVolumes,kubeServer.NodeLabels,kubeServer.SeccompProfileRoot,kubeServer.BootstrapCheckpointPath,kubeServer.NodeStatusMaxImages)if err != nil {return fmt.Errorf("failed to create kubelet: %v", err)}
4.1.1. NewMainKubelet
CreateAndInitKubelet方法中执行的核心函数是NewMainKubelet,NewMainKubelet实例化一个kubelet对象,该部分的具体代码在kubernetes/pkg/kubelet中,具体参考:kubernetes/pkg/kubelet/kubelet.go#L325。
func CreateAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,...nodeStatusMaxImages int32) (k kubelet.Bootstrap, err error) {// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop// up into "per source" synchronizationsk, err = kubelet.NewMainKubelet(kubeCfg,kubeDeps,crOptions,containerRuntime,runtimeCgroups,hostnameOverride,nodeIP,providerID,cloudProvider,certDirectory,rootDirectory,registerNode,registerWithTaints,allowedUnsafeSysctls,remoteRuntimeEndpoint,remoteImageEndpoint,experimentalMounterPath,experimentalKernelMemcgNotification,experimentalCheckNodeCapabilitiesBeforeMount,experimentalNodeAllocatableIgnoreEvictionThreshold,minimumGCAge,maxPerPodContainerCount,maxContainerCount,masterServiceNamespace,registerSchedulable,nonMasqueradeCIDR,keepTerminatedPodVolumes,nodeLabels,seccompProfileRoot,bootstrapCheckpointPath,nodeStatusMaxImages)if err != nil {return nil, err}k.BirthCry()k.StartGarbageCollection()return k, nil}
4.1.2. PodConfig
if kubeDeps.PodConfig == nil {var err errorkubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)if err != nil {return nil, err}}
NewMainKubelet-->PodConfig-->NewPodConfig-->kubetypes.PodUpdate。会生成一个podUpdate的channel来监听pod的变化,该channel会在k.Run(podCfg.Updates())中作为关键入参。
4.2. startKubelet
// process pods and exit.if runOnce {if _, err := k.RunOnce(podCfg.Updates()); err != nil {return fmt.Errorf("runonce failed: %v", err)}glog.Infof("Started kubelet as runonce")} else {startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)glog.Infof("Started kubelet")}
如果设置了只运行一次的参数,则执行k.RunOnce,否则执行核心函数startKubelet。具体实现如下:
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {// start the kubeletgo wait.Until(func() {k.Run(podCfg.Updates())}, 0, wait.NeverStop)// start the kubelet serverif enableServer {go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)}if kubeCfg.ReadOnlyPort > 0 {go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))}}
4.2.1. k.Run
// start the kubeletgo wait.Until(func() {k.Run(podCfg.Updates())}, 0, wait.NeverStop)
通过长驻进程的方式运行k.Run,不退出,将kubelet的运行逻辑引入kubernetes/pkg/kubelet/kubelet.go部分,kubernetes/pkg/kubelet部分的运行逻辑待后续文章分析。
5. 总结
kubelet采用Cobra命令行框架和pflag参数解析框架,和apiserver、scheduler、controller-manager形成统一的代码风格。
kubernetes/cmd/kubelet部分主要对运行参数进行定义和解析,初始化和构造相关的依赖组件(主要在kubeDeps结构体中),并没有kubelet运行的详细逻辑,该部分位于kubernetes/pkg/kubelet模块。cmd部分调用流程如下:
Main-->NewKubeletCommand-->Run(kubeletServer, kubeletDeps, stopCh)-->run(s *options.KubeletServer, kubeDeps ..., stopCh ...)--> RunKubelet(s, kubeDeps, s.RunOnce)-->startKubelet-->k.Run(podCfg.Updates())-->pkg/kubelet。同时
RunKubelet(s, kubeDeps, s.RunOnce)-->CreateAndInitKubelet-->kubelet.NewMainKubelet-->pkg/kubelet。
参考文章:
- https://github.com/kubernetes/kubernetes/tree/v1.12.0
