介绍
Kubelet 是 Kubernetes 的主要组件之一,在每个节点运行,基于 PodSpec 来工作。
作用
- 节点管理
- Pod 管理
- 文件
- HTTP
- API Server
- 容器健康检查
- cAdvisor 资源监控
核心代码
监听 Pod 变化
对多个管道进行遍历,发现任何一个管道有消息就交给 handler 去处理。它会从以下管道中获取消息:
configCh
:kubeDeps
对象中的 PodConfig
提供 file,http,apiserver 3 个不同来源的信息的变化;
syncCh
:定时器管道,每隔一秒去同步最新保存的 Pod 状态;
houseKeepingCh
:housekeeping 事件管道,做 Pod 清理工作;
plegCh
:kubelet pleg 子模块周期性地向 container runtime 查询当前所有容器状态,如果状态发生变化则产生事件;
livenessManager.Updates()
:健康检查发现某个 Pod 不可用,kubelet 将根据 Pod 的 restartPolicy 自动执行正确的操作。
代码位置:pkg/kubelet/kubelet.go syncLoopIteration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
switch u.Op {
case kubetypes.ADD:
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
case kubetypes.REMOVE:
case kubetypes.RECONCILE:
case kubetypes.DELETE:
case kubetypes.SET:
default:
}
kl.sourcesReady.AddSource(u.Source)
case e := <-plegCh:
...
case <-syncCh:
...
case update := <-kl.livenessManager.Updates():
...
case <-housekeepingCh:
...
}
return true
}
|
handler 对象是个 interface。
1
2
3
4
5
6
7
8
|
type SyncHandler interface {
HandlePodAdditions(pods []*v1.Pod)
HandlePodUpdates(pods []*v1.Pod)
HandlePodRemoves(pods []*v1.Pod)
HandlePodReconcile(pods []*v1.Pod)
HandlePodSyncs(pods []*v1.Pod)
HandlePodCleanups() error
}
|
创建 Pod
- 用户请求 APIServer 组件,经过一系列的校验,然后 APIServer 将 Pod 定义存储到 ETCD 后端;
- Scheduler watch 到 Pod 的更新,然后根据策略调度到某个 Node,将信息更新回 APIServer;
- 然后 Kubelet watch 到 Pod 的时间更新会按照如下详细步骤进行同步操作。
流程
1
|
HandlePodAdditions -> dispatchWork -> podWorkers
|
podWorkers 子模块主要负责为每一个 Pod 的启动,并且创建单独的 Goroutine 去对这个 Pod 完成相应的操作,比如创建,更新,删除等等。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
...
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
...
},
})
...
}
|
Goroutine 会阻塞等待 channel 中的事件,并且对获取的事件进行处理。而 podWorkers 对象自身则主要负责对更新事件进行下发。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
pod := options.Pod
uid := pod.UID
var podUpdates chan UpdatePodOptions
var exists bool
p.podLock.Lock()
defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists {
// 如果当前 pod 还没有启动过 goroutine ,则启动 goroutine,并且创建 channel
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- *options
} else {
// 如果一个杀死 Pod 的请求正在等待,我们不会让任何东西覆盖该请求
update, found := p.lastUndeliveredWorkUpdate[pod.UID]
if !found || update.UpdateType != kubetypes.SyncPodKill {
p.lastUndeliveredWorkUpdate[pod.UID] = *options
}
}
}
|
managePodLoop 负责消费,调用 syncPodFn
匿名函数进行处理,匿名函数是 kubelet 对象的 func (kl *Kubelet) syncPod(o syncPodOptions) error
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
// pkg/kubelet/kubelet.go:1388
func (kl *Kubelet) syncPod(o syncPodOptions) error {
...
// 如果是删除 pod,立即执行并返回。
if updateType == kubetypes.SyncPodKill {
...
if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
...
return err
}
return nil
}
...
runnable := kl.canRunPod(pod)
// 检查 Pod 是否能运行在本节点,主要是权限检查(是否能使用主机网络模式,是否可以以 privileged 权限运行等)。如果没有权限,就删除本地旧的 Pod 并返回错误信息;
// 如果是 static Pod,就创建或者更新对应的 mirrorPod,如果存在先删除
if kubetypes.IsStaticPod(pod) {
...
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
...
}
...
}
// 创建 Pod 的数据目录,存放 volume 和 plugin 信息,即目录 /var/lib/kubelet/uuid
if err := kl.makePodDataDirs(pod); err != nil {
...
return err
}
// 如果定义了 PV,等待所有该 Pod 依赖的 Volume mount 完成(volumeManager 会在后台做这些事情)
if !kl.podIsTerminated(pod) {
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
...
return err
}
}
// 如果有 Image secrets,去 apiserver 获取对应的 Secrets 数据
pullSecrets := kl.getPullSecretsForPod(pod)
// 调用容器运行时的 SyncPod 回调
result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
...
return nil
}
|
Container Runtime
目前实现了 CRI 的主流项目有:Docker、containerd、CRI-O、Frakti、Pouch,它们衔接 Kubelet 与运行时方式对比如下:
以 Docker 运行时为例:
- kubelet 通过 CRI 接口 gRPC 调用 dockershim, 请求创建一个容器;
- dockershim 将请求转化成 Docker Daemon 能听懂的请求, 发到 Docker Daemon;
- Docker Daemon 将请求转移到另一个守护进程 containerd 创建一个容器;
- containerd 收到请求后,创建一个叫做 containerd-shim 的进程, 让 containerd-shim 去操作容器。
这是因为容器进程需要一个父进程来做诸如收集状态,维持 stdin 等 fd 打开等工作。
而假如这个父进程就是 containerd,那每次 containerd 挂掉或升级,整个宿主机上所有的容器都得退出了。
而引入了 containerd-shim 就规避了这个问题(containerd 和 shim 并不是父子进程关系);
- 创建容器需要做一些设置 namespaces 和 cgroups,挂载 root filesystem 等等操作,而这些事该怎么做已经有了公开的规范了,那就是 OCI。它的一个参考实现叫做 runc。于是 containerd-shim 在这一步调用 runc 这个命令行工具,来启动容器;
- runc 启动完容器后本身会直接退出,containerd-shim 则会成为容器进程的父进程,负责收集容器进程的状态,上报给 containerd,并在容器中 pid 为 1 的进程退出后接管容器中的子进程进行清理,确保不会出现僵尸进程;
不同 cri 的对比