介绍

Kubelet 是 Kubernetes 的主要组件之一,在每个节点运行,基于 PodSpec 来工作。

作用

  1. 节点管理
  2. Pod 管理
    1. 文件
    2. HTTP
    3. API Server
  3. 容器健康检查
  4. cAdvisor 资源监控

核心代码

监听 Pod 变化

对多个管道进行遍历,发现任何一个管道有消息就交给 handler 去处理。它会从以下管道中获取消息:

  • configChkubeDeps 对象中的 PodConfig 提供 file,http,apiserver 3 个不同来源的信息的变化;
  • syncCh:定时器管道,每隔一秒去同步最新保存的 Pod 状态;
  • houseKeepingCh:housekeeping 事件管道,做 Pod 清理工作;
  • plegCh:kubelet pleg 子模块周期性地向 container runtime 查询当前所有容器状态,如果状态发生变化则产生事件;
  • livenessManager.Updates():健康检查发现某个 Pod 不可用,kubelet 将根据 Pod 的 restartPolicy 自动执行正确的操作。

代码位置:pkg/kubelet/kubelet.go syncLoopIteration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
		switch u.Op {
		case kubetypes.ADD:
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
		case kubetypes.REMOVE:
		case kubetypes.RECONCILE:
		case kubetypes.DELETE:
		case kubetypes.SET:
		default:
		}
		kl.sourcesReady.AddSource(u.Source)
	case e := <-plegCh:
		...
	case <-syncCh:
		...
	case update := <-kl.livenessManager.Updates():
		...
	case <-housekeepingCh:
        ...
	}
	return true
}

handler 对象是个 interface。

1
2
3
4
5
6
7
8
type SyncHandler interface {
	HandlePodAdditions(pods []*v1.Pod)
	HandlePodUpdates(pods []*v1.Pod)
	HandlePodRemoves(pods []*v1.Pod)
	HandlePodReconcile(pods []*v1.Pod)
	HandlePodSyncs(pods []*v1.Pod)
	HandlePodCleanups() error
}

创建 Pod

  1. 用户请求 APIServer 组件,经过一系列的校验,然后 APIServer 将 Pod 定义存储到 ETCD 后端;
  2. Scheduler watch 到 Pod 的更新,然后根据策略调度到某个 Node,将信息更新回 APIServer;
  3. 然后 Kubelet watch 到 Pod 的时间更新会按照如下详细步骤进行同步操作。

流程

1
HandlePodAdditions -> dispatchWork -> podWorkers

podWorkers 子模块主要负责为每一个 Pod 的启动,并且创建单独的 Goroutine 去对这个 Pod 完成相应的操作,比如创建,更新,删除等等。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    ...
    // Run the sync in an async worker.
	kl.podWorkers.UpdatePod(&UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		OnCompleteFunc: func(err error) {
			...
		},
	})
    ...
}

Goroutine 会阻塞等待 channel 中的事件,并且对获取的事件进行处理。而 podWorkers 对象自身则主要负责对更新事件进行下发。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
	pod := options.Pod
	uid := pod.UID
	var podUpdates chan UpdatePodOptions
	var exists bool

	p.podLock.Lock()
	defer p.podLock.Unlock()
	if podUpdates, exists = p.podUpdates[uid]; !exists {
        // 如果当前 pod 还没有启动过 goroutine ,则启动 goroutine,并且创建 channel

		podUpdates = make(chan UpdatePodOptions, 1)
		p.podUpdates[uid] = podUpdates

		go func() {
			defer runtime.HandleCrash()
			p.managePodLoop(podUpdates)
		}()
	}
	if !p.isWorking[pod.UID] {
		p.isWorking[pod.UID] = true
		podUpdates <- *options
	} else {
        // 如果一个杀死 Pod 的请求正在等待,我们不会让任何东西覆盖该请求
		update, found := p.lastUndeliveredWorkUpdate[pod.UID]
		if !found || update.UpdateType != kubetypes.SyncPodKill {
			p.lastUndeliveredWorkUpdate[pod.UID] = *options
		}
	}
}

managePodLoop 负责消费,调用 syncPodFn 匿名函数进行处理,匿名函数是 kubelet 对象的 func (kl *Kubelet) syncPod(o syncPodOptions) error

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// pkg/kubelet/kubelet.go:1388
func (kl *Kubelet) syncPod(o syncPodOptions) error {
	...
	// 如果是删除 pod,立即执行并返回。
	if updateType == kubetypes.SyncPodKill {
		...
		if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
			...
			return err
		}
		return nil
	}

	...

	runnable := kl.canRunPod(pod)
	// 检查 Pod 是否能运行在本节点,主要是权限检查(是否能使用主机网络模式,是否可以以 privileged 权限运行等)。如果没有权限,就删除本地旧的 Pod 并返回错误信息;

	// 如果是 static Pod,就创建或者更新对应的 mirrorPod,如果存在先删除
	if kubetypes.IsStaticPod(pod) {
		...
		if err := kl.podManager.CreateMirrorPod(pod); err != nil {
			...
		}
		...
	}

	// 创建 Pod 的数据目录,存放 volume 和 plugin 信息,即目录 /var/lib/kubelet/uuid
	if err := kl.makePodDataDirs(pod); err != nil {
		...
		return err
	}

	// 如果定义了 PV,等待所有该 Pod 依赖的 Volume mount 完成(volumeManager 会在后台做这些事情)
	if !kl.podIsTerminated(pod) {
		if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
			...
			return err
		}
	}

	// 如果有 Image secrets,去 apiserver 获取对应的 Secrets 数据
	pullSecrets := kl.getPullSecretsForPod(pod)

	// 调用容器运行时的 SyncPod 回调
	result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
	...

	return nil
}

Container Runtime

目前实现了 CRI 的主流项目有:Docker、containerd、CRI-O、Frakti、Pouch,它们衔接 Kubelet 与运行时方式对比如下:

以 Docker 运行时为例:

  1. kubelet 通过 CRI 接口 gRPC 调用 dockershim, 请求创建一个容器;
  2. dockershim 将请求转化成 Docker Daemon 能听懂的请求, 发到 Docker Daemon;
  3. Docker Daemon 将请求转移到另一个守护进程 containerd 创建一个容器;
  4. containerd 收到请求后,创建一个叫做 containerd-shim 的进程, 让 containerd-shim 去操作容器。 这是因为容器进程需要一个父进程来做诸如收集状态,维持 stdin 等 fd 打开等工作。 而假如这个父进程就是 containerd,那每次 containerd 挂掉或升级,整个宿主机上所有的容器都得退出了。 而引入了 containerd-shim 就规避了这个问题(containerd 和 shim 并不是父子进程关系);
  5. 创建容器需要做一些设置 namespaces 和 cgroups,挂载 root filesystem 等等操作,而这些事该怎么做已经有了公开的规范了,那就是 OCI。它的一个参考实现叫做 runc。于是 containerd-shim 在这一步调用 runc 这个命令行工具,来启动容器;
  6. runc 启动完容器后本身会直接退出,containerd-shim 则会成为容器进程的父进程,负责收集容器进程的状态,上报给 containerd,并在容器中 pid 为 1 的进程退出后接管容器中的子进程进行清理,确保不会出现僵尸进程;

不同 cri 的对比