05. kube-scheduler 事件处理器
前面我们介绍了调度队列和调度缓存。从使用角度来看,它们无非涉及读和写操作:读操作自然发生在调度流程中,而写操作则是本文的主角——事件处理器。
pkg/scheduler/eventhandlers.go 是 kube-scheduler 事件处理器的核心实现文件,其中定义了 Pod、Node、PVC 等关键资源的事件处理逻辑。在资源发生变更时,调度器通过注册的回调函数更新调度队列或调度缓存,确保集群状态及时反映到调度流程中。例如:
- 新 Pod 创建时,将其添加到调度队列
- Node 状态变更时,更新调度缓存,并触发重新评估待调度 Pod 是否满足条件
- ……
根据功能描述,不难推测,事件处理器的实现依赖于 client-go 的 SharedInformerFactory 。
事件处理器的注册入口
addAllEventHandlers 函数是所有事件处理逻辑的入口,调度器在初始化时会调用此函数,将各个资源 Informer 与对应的处理函数(Handler)绑定起来:
func addAllEventHandlers(
sched *Scheduler,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
gvkMap map[framework.GVK]framework.ActionType,
) {
// --- 已调度 Pod 的缓存事件处理器 ---
informerFactory.Core().V1().Pods().Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
// FilterFunc 用于过滤只对 已调度 的 Pod 感兴趣的事件。
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
// 如果 pod.Spec.NodeName 不为空,则 assignedPod 返回 true ,代表已调度 Pod。
return assignedPod(t)
case cache.DeletedFinalStateUnknown:
if _, ok := t.Obj.(*v1.Pod); ok {
// 对于删除事件,对象可能已经过时,我们无法准确判断它是否被分配。
// 为了安全起见,我们假设它可能已被分配并尝试进行清理。
return true
}
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
return false
default:
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
return false
}
},
// 处理器回调函数
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodToCache, // 添加已调度 Pod 到缓存
UpdateFunc: sched.updatePodInCache, // 更新缓存中的 Pod
DeleteFunc: sched.deletePodFromCache, // 从缓存中删除 Pod
},
},
)
// --- 未调度 Pod 的队列事件处理器 ---
informerFactory.Core().V1().Pods().Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
// FilterFunc 用于过滤只对 未调度 且 归当前调度器负责 的 Pod 感兴趣的事件。
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
// 必须是未调度且由当前调度器负责。
return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
// 对于删除事件,对象可能过时,我们无法检查 assigned 状态,
// 但仍然可以检查它是否由我们负责。
return responsibleForPod(pod, sched.Profiles)
}
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
return false
default:
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
return false
}
},
// 处理器回调函数
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodToSchedulingQueue, // 添加未调度 Pod 到队列
UpdateFunc: sched.updatePodInSchedulingQueue, // 更新队列中的 Pod
DeleteFunc: sched.deletePodFromSchedulingQueue, // 从队列中删除 Pod
},
},
)
// --- Node 资源的事件处理器 ---
informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.addNodeToCache, // 添加节点到缓存
UpdateFunc: sched.updateNodeInCache, // 更新缓存中的节点
DeleteFunc: sched.deleteNodeFromCache, // 从缓存中删除节点
},
)
// buildEvtResHandler 是一个构建通用事件处理器的工厂函数。
// 对于大多数资源类型,我们不需要复杂的逻辑,只需要在资源发生变化时,
// 简单地将所有不可调度的 Pod 移回活动队列即可。
buildEvtResHandler := func(at framework.ActionType, gvk framework.GVK, shortGVK string) cache.ResourceEventHandlerFuncs {
funcs := cache.ResourceEventHandlerFuncs{}
// 检查是否需要处理 Add 事件
if at&framework.Add != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Add, Label: fmt.Sprintf("%vAdd", shortGVK)}
funcs.AddFunc = func(_ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(evt, nil)
}
}
// 检查是否需要处理 Update 事件
if at&framework.Update != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Update, Label: fmt.Sprintf("%vUpdate", shortGVK)}
funcs.UpdateFunc = func(_, _ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(evt, nil)
}
}
// 检查是否需要处理 Delete 事件
if at&framework.Delete != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Delete, Label: fmt.Sprintf("%vDelete", shortGVK)}
funcs.DeleteFunc = func(_ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(evt, nil)
}
}
return funcs
}
// 遍历 gvkMap,为插件注册的各种资源类型添加事件处理器。
// gvkMap 定义了调度器关心哪些资源的哪些类型的事件(Add, Update, Delete)。
for gvk, at := range gvkMap {
switch gvk {
case framework.Node, framework.Pod:
// Pod 和 Node 有专门的、更复杂的处理器(见上方),所以这里什么都不做。
case framework.CSINode: // CSINode 资源
informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler(
buildEvtResHandler(at, framework.CSINode, "CSINode"),
)
case framework.CSIDriver: // CSIDriver 资源
informerFactory.Storage().V1().CSIDrivers().Informer().AddEventHandler(
buildEvtResHandler(at, framework.CSIDriver, "CSIDriver"),
)
case framework.CSIStorageCapacity: // CSIStorageCapacity 资源
informerFactory.Storage().V1().CSIStorageCapacities().Informer().AddEventHandler(
buildEvtResHandler(at, framework.CSIStorageCapacity, "CSIStorageCapacity"),
)
case framework.PersistentVolume:
// 处理 PV 事件的原因:
// PvAdd: 当没有可用 PV 时创建的 Pod 会停留在不可调度队列。如果之后手动创建了 PV,
// 需要一个事件来触发重新调度。
// PvUpdate: 调度器在绑定卷时可能会因为冲突而失败,导致 Pod 回到不可调度队列。
// PV 的更新可能解决了这个冲突,因此需要触发重新调度。
informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
buildEvtResHandler(at, framework.PersistentVolume, "Pv"),
)
case framework.PersistentVolumeClaim:
// 处理 PVC 事件的原因:
// PVC 的增删改可能会影响 PV 的绑定状态。
informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
buildEvtResHandler(at, framework.PersistentVolumeClaim, "Pvc"),
)
case framework.PodSchedulingContext:
// 如果启用了 DynamicResourceAllocation 特性门控
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
_, _ = informerFactory.Resource().V1alpha2().PodSchedulingContexts().Informer().AddEventHandler(
buildEvtResHandler(at, framework.PodSchedulingContext, "PodSchedulingContext"),
)
}
case framework.ResourceClaim:
// 如果启用了 DynamicResourceAllocation 特性门控
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
_, _ = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler(
buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"),
)
}
case framework.StorageClass:
// StorageClass 有特殊的处理逻辑。
if at&framework.Add != 0 {
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onStorageClassAdd, // 使用专门的 Add 处理器
},
)
}
if at&framework.Update != 0 {
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
// Update 事件使用通用的重新入队逻辑
UpdateFunc: func(_, _ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.StorageClassUpdate, nil)
},
},
)
}
default:
// 对于不在上面 case 中的、可能是自定义资源(CRD)的 GVK。
// 测试环境可能没有初始化 dynInformerFactory。
if dynInformerFactory == nil {
continue
}
// GVK 格式应该是 <kind in plural>.<version>.<group>,例如:foos.v1.example.com
if strings.Count(string(gvk), ".") < 2 {
klog.ErrorS(nil, "incorrect event registration", "gvk", gvk)
continue
}
// 降级使用动态 informer 来处理这些资源。
gvr, _ := schema.ParseResourceArg(string(gvk))
dynInformer := dynInformerFactory.ForResource(*gvr).Informer()
dynInformer.AddEventHandler(
buildEvtResHandler(at, gvk, strings.Title(gvr.Resource)),
)
}
}
}除了 Pod 和 Node 外的其它资源,其事件处理逻辑基本都是调用 MoveAllToActiveOrBackoffQueue,根据资源变化将 unschedulablePods 中的 Pod 批量移到 activeQ 或 backoffQ(这一流程已在调度队列章节中详述)。
接下来,我们重点来看最核心的资源——Pod 和 Node 的事件处理逻辑。
Pod 事件处理
Pod 是调度的基本单位,其事件处理逻辑也最为精细。代码巧妙地使用了 cache.FilteringResourceEventHandler,将 Pod 事件流一分为二:已调度 Pod 和 未调度 Pod,分别进行处理。
1、未调度 Pod -> 影响调度队列
这部分处理器负责将等待调度的 Pod 加入、更新或移出调度队列,回调函数如下:
AddFunc: sched.addPodToSchedulingQueue: 当一个符合条件的、新的未调度 Pod 被创建时,调用此函数将其添加到调度队列中,等待被调度。UpdateFunc: sched.updatePodInSchedulingQueue: 当一个未调度的 Pod 规格(如标签、资源请求)发生变化时,更新其在调度队列中的信息。DeleteFunc: sched.deletePodFromSchedulingQueue: 当一个未调度的 Pod 被删除时(例如,用户手动删除或被其他控制器删除),将其从调度队列中移除。
代码实现相对简单:
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
pod := obj.(*v1.Pod)
klog.V(3).InfoS("Add event for unscheduled pod", "pod", klog.KObj(pod))
// 将新创建的、未调度的 Pod 添加到调度队列中,等待调度。
if err := sched.SchedulingQueue.Add(pod); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
}
}
func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
oldPod, newPod := oldObj.(*v1.Pod), newObj.(*v1.Pod)
// 如果新旧对象的 ResourceVersion 相同,说明对象内容没有实际变化,直接忽略,
// 避免重复处理,防止出现意外行为(见 #96071)。
if oldPod.ResourceVersion == newPod.ResourceVersion {
return
}
// 检查这个 Pod 是否是“已假定”(assumed)的 Pod。
// "Assumed Pod" 是调度器已为其分配了节点,但还未收到 kubelet 确认的 Pod。
// 这类 Pod 的状态更新由 pod cache 的事件处理器负责,因此在这里直接返回。
isAssumed, err := sched.Cache.IsAssumedPod(newPod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", newPod.Namespace, newPod.Name, err))
}
if isAssumed {
return
}
// 更新 Pod 在调度队列中的信息。
if err := sched.SchedulingQueue.Update(oldPod, newPod); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
}
}
func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = obj.(*v1.Pod)
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
return
}
default:
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
return
}
klog.V(3).InfoS("Delete event for unscheduled pod", "pod", klog.KObj(pod))
// 从调度队列中删除该 Pod。
if err := sched.SchedulingQueue.Delete(pod); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
}
// 下面的内容等后面讲到调度框架后会比较清晰。
// 获取该 Pod 对应的调度框架(framework)。
fwk, err := sched.frameworkForPod(pod)
if err != nil {
// 这理论上不应该发生,因为只有调度器负责的 Pod 才会被加入队列。
klog.ErrorS(err, "Unable to get profile", "pod", klog.KObj(pod))
return
}
// 如果一个正在等待的 Pod(例如,一个被提名的 Pod,等待抢占完成)被拒绝(或删除),
// 这意味着它之前可能是一个 "assumed" 的 Pod,现在我们正在从调度器缓存中移除它。
// 在这种情况下,需要触发一个 AssignedPodDelete 事件,立即尝试重新调度其他一些不可调度的 Pods,
// 因为可能有资源被释放了。
if fwk.RejectWaitingPod(pod.UID) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete, nil)
}
}2、已调度 Pod -> 影响调度缓存
这部分处理器负责维护集群节点资源视图的准确性,回调函数如下:
AddFunc: sched.addPodToCache: 当调度器启动或与 apiserver 重连时,会收到已调度 Pod 的 Add 事件。此函数将 Pod 信息添加到调度缓存中,并更新对应节点的资源占用情况。UpdateFunc: sched.updatePodInCache: 当一个已调度的 Pod 状态更新时(例如,进入 Running 状态),更新其在缓存中的信息。DeleteFunc: sched.deletePodFromCache: 当一个已调度的 Pod 被删除时,它所占用的节点资源被释放。此函数在从缓存中移除该 Pod 后,会立即触发一次全局性的检查。
代码如下:
func (sched *Scheduler) addPodToCache(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", obj)
return
}
klog.V(3).InfoS("Add event for scheduled pod", "pod", klog.KObj(pod))
// 将一个已调度的 Pod 添加到调度缓存中。
// 这会更新其所在节点的资源使用情况等信息。
if err := sched.Cache.AddPod(pod); err != nil {
klog.ErrorS(err, "Scheduler cache AddPod failed", "pod", klog.KObj(pod))
}
// 通知调度队列,一个已分配的 Pod 被添加了。
// 这对于某些调度逻辑(如 PodAffinity)可能很重要。
sched.SchedulingQueue.AssignedPodAdded(pod)
}
func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*v1.Pod)
if !ok {
klog.ErrorS(nil, "Cannot convert oldObj to *v1.Pod", "oldObj", oldObj)
return
}
newPod, ok := newObj.(*v1.Pod)
if !ok {
klog.ErrorS(nil, "Cannot convert newObj to *v1.Pod", "newObj", newObj)
return
}
klog.V(4).InfoS("Update event for scheduled pod", "pod", klog.KObj(oldPod))
// 在调度器缓存中更新 Pod 信息。
if err := sched.Cache.UpdatePod(oldPod, newPod); err != nil {
klog.ErrorS(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod))
}
// 通知调度队列,一个已分配的 Pod 被更新了。
sched.SchedulingQueue.AssignedPodUpdated(newPod)
}
func (sched *Scheduler) deletePodFromCache(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = t
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
if !ok {
klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t.Obj)
return
}
default:
klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t)
return
}
klog.V(3).InfoS("Delete event for scheduled pod", "pod", klog.KObj(pod))
// 从调度器缓存中移除 Pod。这会释放该 Pod 在其节点上占用的资源。
if err := sched.Cache.RemovePod(pod); err != nil {
klog.ErrorS(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod))
}
// 一个已调度的 Pod 被删除,意味着其占用的资源被释放了。
// 这很可能会让一些之前不可调度的 Pod 变得可调度。
// 因此,尝试将所有不可调度的 Pod 移回活动队列,以便重新调度。
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete, nil)
}Node 事件处理
Node 的状态直接决定了 Pod 是否可以被调度。因此,Node 的事件处理对于唤醒等待中的 Pod 至关重要。相应的回调函数如下:
AddFunc: sched.addNodeToCache: 当一个新 Node 加入集群时,集群的总资源增加。此函数将 Node 添加到缓存,并立即触发MoveAllToActiveOrBackoffQueue,尝试调度那些因为资源不足而等待的 Pod。DeleteFunc: sched.deleteNodeFromCache: 当一个 Node 被删除时,从缓存中移除它。这会减少可用资源,但通常不需要主动唤醒 Pod(因为没有新的调度机会产生)。UpdateFunc: sched.updateNodeInCache: 这是 Node 事件处理的精髓所在,它包含了一个重要的优化。并非所有 Node 的更新都对调度有意义(例如LastHeartbeatTime的变化)。只有当 Node 变得“更适合”调度时,才有必要唤醒等待的 Pod。这个判断由nodeSchedulingPropertiesChange函数完成。
代码实现如下:
func (sched *Scheduler) addNodeToCache(obj interface{}) {
// 将接收到的对象转换为 *v1.Node 类型。
node, ok := obj.(*v1.Node)
if !ok {
klog.ErrorS(nil, "Cannot convert to *v1.Node", "obj", obj)
return
}
// 将新节点添加到调度器的缓存中。缓存中维护了集群中所有节点的状态信息。
nodeInfo := sched.Cache.AddNode(node)
klog.V(3).InfoS("Add event for node", "node", klog.KObj(node))
// 一个新节点的加入意味着可能会有之前不可调度的 Pod 现在可以被调度了。
// 因此,将所有不可调度的 Pod 移回活动队列。
// "NodeAdd" 事件表示由于节点的添加而触发的移动。
// preCheckForNode(nodeInfo) 是一个预检查函数,用于过滤掉那些即使有了新节点也明显无法调度的 Pod,以提高效率。
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd, preCheckForNode(nodeInfo))
}
func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
var node *v1.Node
// 处理对象删除的两种可能情况:
// 1. 直接收到 *v1.Node 对象。
// 2. 收到 cache.DeletedFinalStateUnknown 对象,这通常发生在 informer 和 apiserver 连接断开时,
// informer 不确定对象是被删除了还是仅仅是失联,此时真实对象被包装在 t.Obj 中。
switch t := obj.(type) {
case *v1.Node:
node = t
case cache.DeletedFinalStateUnknown:
var ok bool
node, ok = t.Obj.(*v1.Node)
if !ok {
klog.ErrorS(nil, "Cannot convert to *v1.Node", "obj", t.Obj)
return
}
default:
klog.ErrorS(nil, "Cannot convert to *v1.Node", "obj", t)
return
}
klog.V(3).InfoS("Delete event for node", "node", klog.KObj(node))
// 从调度器缓存中移除该节点。
if err := sched.Cache.RemoveNode(node); err != nil {
klog.ErrorS(err, "Scheduler cache RemoveNode failed")
}
}
func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
oldNode, ok := oldObj.(*v1.Node)
if !ok {
klog.ErrorS(nil, "Cannot convert oldObj to *v1.Node", "oldObj", oldObj)
return
}
newNode, ok := newObj.(*v1.Node)
if !ok {
klog.ErrorS(nil, "Cannot convert newObj to *v1.Node", "newObj", newObj)
return
}
// 在调度器缓存中更新节点信息。
nodeInfo := sched.Cache.UpdateNode(oldNode, newNode)
// 检查节点的调度相关属性是否发生了变化,使得节点变得“更可调度”。
// 例如:污点被移除、资源增加、标签变化等。
// 这是一个优化:只有当节点变得更适合调度时,才需要重新评估那些不可调度的 Pod。
if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != nil {
// 如果节点确实变得更可调度,则将所有不可调度的 Pod 移回活动队列。
// 同样使用 preCheckForNode 进行预过滤。
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(*event, preCheckForNode(nodeInfo))
}
}其中 nodeSchedulingPropertiesChange 函数用来判断节点是否变得“更可调度”,它会检查以下几项关键属性的变化:
nodeSpecUnschedulableChanged: 节点的.spec.unschedulable字段是否从true变为false。nodeAllocatableChanged: 节点的可分配资源(CPU, memory 等)是否发生变化。nodeLabelsChanged: 节点标签的变化可能满足某些 Pod 的nodeAffinity要求。nodeTaintsChanged: 节点污点(Taints)的移除可能让某些 Pod 可以容忍(Tolerate)该节点。nodeConditionsChanged: 节点状态(如Ready,MemoryPressure)的变化。
代码实现如下:
// nodeSchedulingPropertiesChange 检查节点更新时,调度相关属性是否发生了有利于调度的变化。
// 如果是,则返回相应的事件类型;否则返回 nil。
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {
// 检查节点的 `Unschedulable` 状态是否从 true 变为 false。
if nodeSpecUnschedulableChanged(newNode, oldNode) {
return &queue.NodeSpecUnschedulableChange
}
// 检查节点的可分配资源(Allocatable)是否发生变化。
if nodeAllocatableChanged(newNode, oldNode) {
return &queue.NodeAllocatableChange
}
// 检查节点的标签(Labels)是否发生变化。
if nodeLabelsChanged(newNode, oldNode) {
return &queue.NodeLabelChange
}
// 检查节点的污点(Taints)是否发生变化。
if nodeTaintsChanged(newNode, oldNode) {
return &queue.NodeTaintChange
}
// 检查节点的状态条件(Conditions)是否发生变化。
if nodeConditionsChanged(newNode, oldNode) {
return &queue.NodeConditionChange
}
return nil
}
func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && !newNode.Spec.Unschedulable
}
// 其它的就不一一贴出了
只有当这些变化发生时,才会认为这是一个有意义的事件。
而当 Node 事件触发 MoveAllToActiveOrBackoffQueue 时,如果将所有 unschedulablePods 的 Pod 全部移回 activeQ 队列,可能会造成巨大的性能浪费。因为很多 Pod 可能依然无法调度到这个新变化(比如资源仍然不够)的 Node 上。
preCheckForNode 函数就是为了解决这个问题。它返回一个预检查闭包函数,在移动 Pod 之前会进行一次快速过滤:
func preCheckForNode(nodeInfo *framework.NodeInfo) queue.PreEnqueueCheck {
// 注意:以下检查不考虑抢占。在极少数情况下(如节点扩容),一个 Pod 可能检查失败但通过抢占仍能成功。
// 我们有意忽略这些情况,因为不可调度的 Pod 最终还是会被重新排队。
return func(pod *v1.Pod) bool {
// 运行一组基本的准入检查(资源、节点亲和性等)。
admissionResults := AdmissionCheck(pod, nodeInfo, false)
if len(admissionResults) != 0 {
// 如果有任何检查失败,则不移动该 Pod。
return false
}
// 检查 Pod 是否能容忍该节点的 NoSchedule 污点。
_, isUntolerated := corev1helpers.FindMatchingUntoleratedTaint(nodeInfo.Node().Spec.Taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
return t.Effect == v1.TaintEffectNoSchedule
})
// 如果没有不能容忍的 NoSchedule 污点,则移动该 Pod。
return !isUntolerated
}
}
// AdmissionCheck 调用 noderesources/nodeport/nodeAffinity/nodename 等插件的过滤逻辑
// 并返回失败原因。它被 kubelet 和调度器内部(如 preCheckForNode)使用。
// 如果 `includeAllFailures` 为 false,它在遇到第一个失败时就返回;否则返回所有失败。
func AdmissionCheck(pod *v1.Pod, nodeInfo *framework.NodeInfo, includeAllFailures bool) []AdmissionResult {
var admissionResults []AdmissionResult
// 1. 检查节点资源是否满足 Pod 请求。
insufficientResources := noderesources.Fits(pod, nodeInfo)
if len(insufficientResources) != 0 {
for i := range insufficientResources {
admissionResults = append(admissionResults, AdmissionResult{InsufficientResource: &insufficientResources[i]})
}
if !includeAllFailures {
return admissionResults
}
}
// 2. 检查节点是否满足 Pod 的必需节点亲和性(RequiredNodeAffinity)。
if matches, _ := corev1nodeaffinity.GetRequiredNodeAffinity(pod).Match(nodeInfo.Node()); !matches {
admissionResults = append(admissionResults, AdmissionResult{Name: nodeaffinity.Name, Reason: nodeaffinity.ErrReasonPod})
if !includeAllFailures {
return admissionResults
}
}
// 3. 检查 Pod 的 spec.nodeName 是否匹配该节点。
if !nodename.Fits(pod, nodeInfo) {
admissionResults = append(admissionResults, AdmissionResult{Name: nodename.Name, Reason: nodename.ErrReason})
if !includeAllFailures {
return admissionResults
}
}
// 4. 检查 Pod 请求的主机端口(HostPorts)在该节点上是否可用。
if !nodeports.Fits(pod, nodeInfo) {
admissionResults = append(admissionResults, AdmissionResult{Name: nodeports.Name, Reason: nodeports.ErrReason})
if !includeAllFailures {
return admissionResults
}
}
return admissionResults
}
// AdmissionResult 描述了调度器无法接纳 Pod 的原因。
// 如果原因是资源不足,InsufficientResource 字段会包含详细信息。
type AdmissionResult struct {
Name string
Reason string
InsufficientResource *noderesources.InsufficientResource
}只有通过了资源、亲和性、污点等基本检查的 Pod,才会被认为“有希望”调度到这个变化的 Node 上,从而被移回 activeQ 队列。
微信公众号
更多内容请关注微信公众号:gopher云原生