04. kube-scheduler 调度缓存

04. kube-scheduler 调度缓存

调度队列管理的是尚未调度的 Pod(三个子队列中的 Pod 均处于 “Pending” 状态),调度缓存则是一个以 Pod 为中心的本地缓存,主要维护 Pod 和 Node 的聚合信息,管理着已调度(或假定调度)的 Pod 及其节点绑定关系与资源占用情况等信息。

与存储在 etcd 中的原始对象相比,调度缓存不仅引入了“假定”状态,用于支持调度过程中的资源预占,还提供了快照机制,使调度器在同一个调度周期内能够基于一致且高效的节点视图进行决策。

在调度成功之后、绑定完成之前,调度器会通过 AssumePod 将 Pod 标记为“假设绑定”,提前在缓存中占用目标 Node 的资源,当 API Server 返回 Add 事件说明绑定结果已确认后,该 Pod 就会转为正式绑定。若迟迟未收到确认,则视为调度失败或异常,调度器会将 Pod 标记为过期并从缓存中移除(过期功能在当前版本可视为“已废弃”,具体后面会提到)。

$ tree pkg/scheduler/internal/
internal
├── cache # 调度缓存
├── heap  # 堆结构,作为调度队列的底层结构,详见上回
└── queue # 调度队列,详见上回

其中 cache 模块便是本文的主角,结构如下:

$ tree pkg/scheduler/internal/cache/
cache
├── cache.go         # 调度缓存的具体实现
├── interface.go     # 调度缓存的抽象接口定义
├── node_tree.go     # nodeTree 结构的实现,按拓扑域(Region+Zone)组织节点
└── snapshot.go      # 缓存快照的实现,提供调度周期内一致的缓存视图

调度缓存的抽象接口

调度缓存通过一个 Cache 接口定义了所有能力:

type Cache interface {
	// 假定一个 Pod 已被调度,并将其信息聚合到其所在的节点中。
	AssumePod(pod *v1.Pod) error
	// 当 Pod 绑定完成(无论成功/失败)后调用这个接口通知调度缓存做出相应操作。
	// 主要是为假定 Pod 设定过期状态,在当前版本可视为“已废弃”,具体后面会提到。
	// 当前主要依靠 ForgetPod 来移除绑定失败的假定 Pod。
	FinishBinding(pod *v1.Pod) error
	// 从缓存中移除一个假定的 Pod ,例如 AssumePod 后遇到失败就需要调用该方法。
	ForgetPod(pod *v1.Pod) error
	// 收到 Pod Add 事件时触发,有两种情况
	// 1. 如果 Pod 当前是假定状态,就更新
	// 2. 否则添加
	AddPod(pod *v1.Pod) error
	// 收到 Pod Update 事件时触发,移除 oldPod 的信息并添加 newPod 的信息。
	UpdatePod(oldPod, newPod *v1.Pod) error
	// 收到 Pod Delete 事件时触发,移除一个 Pod ,该 Pod 的信息将从其分配的节点上减去。
	RemovePod(pod *v1.Pod) error
	// 从缓存中返回对应的 Pod。
	GetPod(pod *v1.Pod) (*v1.Pod, error)
	// 如果 Pod 是假定的且未过期,则返回 true。
	IsAssumedPod(pod *v1.Pod) (bool, error)
	// 收到 Node Add 事件时触发,添加节点的信息,并返回对应的 NodeInfo 对象。
	AddNode(node *v1.Node) *framework.NodeInfo
	// 收到 Node Update 事件时触发,更新节点的信息,并返回对应的 NodeInfo 对象。
	UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo
	// 收到 Node Delete 事件时触发,移除节点的信息。
	RemoveNode(node *v1.Node) error

	// UpdateSnapshot 是调度缓存最关键的功能,将传入的 nodeSnapshot 更新为缓存的当前最新内容。
	// 之所以是“更新快照”,是因为快照本身数据量较大,完整生成开销不小,通过在上一次快照的基础上仅更新增量变化,能够显著降低开销。
	UpdateSnapshot(nodeSnapshot *Snapshot) error
	
	// 返回缓存中的节点数量,仅用于单元测试,略过。
	NodeCount() int
	// 返回缓存中 Pod 的数量(包括已删除的节点的 Pod),仅用于单元测试,略过。
	PodCount() (int, error)
	// Dump 生成当前缓存的转储(dump),用于调试,略过。
	Dump() *Dump
}

调度缓存的核心是维护 Pod 与 Node 的聚合信息,最终体现为 NodeInfo

节点级聚合信息:NodeInfo

节点级聚合信息,即 NodeInfo ,不仅包含 Node 本身的数据,还整合了与该 Node 调度相关的各类信息,调度器的很多决策都依赖于它:

type NodeInfo struct {
    // 节点对象本身。
    node *v1.Node
    // 运行在该节点上的所有 Pod(包括已绑定和假定绑定)。
    Pods []*PodInfo
    // 声明了 Pod 亲和性的 Pod 子集。
    PodsWithAffinity []*PodInfo
    // 声明了必需反亲和性的 Pod 子集。
    PodsWithRequiredAntiAffinity []*PodInfo
    // 该节点上已分配的端口。
    UsedPorts HostPortInfo
    // 该节点上所有 Pod 请求的资源总量(包括假定绑定的 Pod)。
    Requested *Resource
    // 为防止“零资源请求 Pod 堆积”问题,记录每个容器请求的 CPU/Memory 最小值后的资源总量。
    NonZeroRequested *Resource
    // 节点可分配的资源(Node.Status.Allocatable)。
    Allocatable *Resource
    // 节点上已存在的镜像及其状态,用于镜像本地性调度。
    ImageStates map[string]*ImageStateSummary
    // 记录该节点上被多少个 Pod 使用的 PVC(namespace/name → count)。
    PVCRefCounts map[string]int
    // 是一个全局的计数,每当有任何的 NodeInfo 更新时,都会调用 nextGeneration 更新,用于快照增量更新判断。
    Generation int64
}

var generation int64

func nextGeneration() int64 {
	return atomic.AddInt64(&generation, 1)
}

内部字段的结构就不展开了。我们只要知道一旦有任何 NodeInfo 更新(调用 AddPodRemovePodSetNodeRemoveNode 方法),其 Generation 字段就会被更新为最新的全局的计数,以 SetNode 为例:

func (n *NodeInfo) SetNode(node *v1.Node) {
	n.node = node
	n.Allocatable = NewResource(node.Status.Allocatable)
	// 设置为最新的全局计数
	n.Generation = nextGeneration()
}

这个字段在后续快照的增量更新中起到关键作用。

缓存快照的实现

为了避免在每个调度周期内直接操作实时缓存,调度器会在每次调度周期开始时生成一个缓存快照供调度器使用(调用 UpdateSnapshot 更新)。如果说调度缓存是调度器的“实时数据中心”,那么缓存快照就是每个调度周期内的“静态只读副本”。

快照主要承担了两个职责:

  1. 提供调度周期内的一致性视图,避免在调度过程中(比如过滤和打分)因缓存实时变化导致的结果不确定。
  2. 提升性能,快照意味着只读,可以代替频繁的直接缓存访问,减少锁竞争与计算开销。

实现上,所谓的快照只是一组节点级聚合信息 NodeInfo 的集合,再加上一些辅助的派生列表(亲和性列表、PVC 集合等),形成一个完整的调度视图,如下:

type Snapshot struct {
	// nodeInfoMap 是一个从 Node 名称到其 NodeInfo 快照的映射。
	nodeInfoMap map[string]*framework.NodeInfo
	// nodeInfoList 是按照 nodeTree 结构顺序排列的 NodeInfo 快照列表。
	nodeInfoList []*framework.NodeInfo
	// havePodsWithAffinityNodeInfoList 是至少有一个声明了亲和性(affinity)条款的 Pod 的 NodeInfo 快照列表。
	havePodsWithAffinityNodeInfoList []*framework.NodeInfo
	// havePodsWithRequiredAntiAffinityNodeInfoList 是至少有一个声明了
	// 必需反亲和性(required anti-affinity)条款的 Pod 的 NodeInfo 快照列表。
	havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo
	// usedPVCSet 包含一个 PVC 名称的集合,这些 PVC 被一个或多个已调度的 Pod 使用,
	// 键的格式为 "namespace/name"。
	usedPVCSet sets.String
	// generation 是当前快照的代数,对应当前最近更新的 NodeInfo 的 Generation,用于增量更新。
	generation int64
}

快照中的 NodeInfo 集合除了通过 map 存储(便于根据 Node 名称快速检索单个 NodeInfo 信息),还维护了一个按照 nodeTree 结构顺序排列的切片,使调度器能够有序遍历节点,该作用后面再提。

generation 字段表示当前快照的代数,对应的是最近一次更新的 NodeInfo 的 Generation。我们之前提到过,NodeInfo 只要发生更新,其 Generation 就会被设置为最新(即当前最大的计数值)。因此,在快照中可以通过对比 generation 的大小来判断缓存快照是否最新,并据此实现增量更新。后面会对此进行更详细的分析。

为了给快照的使用者(调度插件)提供统一的查询入口,实现了 framework.SharedLister 接口:

// SharedLister 聚合调度器相关的多个 lister。
type SharedLister interface {
	NodeInfos() NodeInfoLister
	StorageInfos() StorageInfoLister
}

// NodeInfoLister 接口表示可以根据 Node 名称列出或获取 NodeInfo 对象的能力。
type NodeInfoLister interface {
	// List 返回所有 NodeInfo 列表。
	List() ([]*NodeInfo, error)
	// HavePodsWithAffinityList 返回至少有一个 Pod 声明了亲和性的节点的 NodeInfo 列表。
	HavePodsWithAffinityList() ([]*NodeInfo, error)
	// HavePodsWithRequiredAntiAffinityList 返回至少有一个 Pod 声明了必需反亲和性的节点的 NodeInfo 列表。
	HavePodsWithRequiredAntiAffinityList() ([]*NodeInfo, error)
	// Get 根据 Node 名称返回对应的 NodeInfo。
	Get(nodeName string) (*NodeInfo, error)
}

// StorageInfoLister 接口表示可以处理存储相关操作和资源的能力。
type StorageInfoLister interface {
	// IsPVCUsedByPods 判断指定 PVC 是否已被一个或多个已调度的 Pod 使用。
	// key 的格式为 "namespace/name"。
	IsPVCUsedByPods(key string) bool
}

具体方法的实现非常简洁,基本都是直接返回 Snapshot 内部对应的对象,无需进行复杂计算(因为每次调用 UpdateSnapshot 就会更新):

// List 返回快照中所有的 NodeInfo 列表。
func (s *Snapshot) List() ([]*framework.NodeInfo, error) {
	return s.nodeInfoList, nil
}

// HavePodsWithAffinityList 返回至少有一个 Pod 声明了亲和性的 NodeInfo 列表。
func (s *Snapshot) HavePodsWithAffinityList() ([]*framework.NodeInfo, error) {
	return s.havePodsWithAffinityNodeInfoList, nil
}

// HavePodsWithRequiredAntiAffinityList 返回至少有一个 Pod 声明了必需反亲和性的 NodeInfo 列表。
func (s *Snapshot) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) {
	return s.havePodsWithRequiredAntiAffinityNodeInfoList, nil
}

// Get 根据 Node 名称返回对应的 NodeInfo。
// 如果节点不存在或 NodeInfo.Node 为 nil,则返回错误。
func (s *Snapshot) Get(nodeName string) (*framework.NodeInfo, error) {
	if v, ok := s.nodeInfoMap[nodeName]; ok && v.Node() != nil {
		return v, nil
	}
	return nil, fmt.Errorf("nodeinfo not found for node name %q", nodeName)
}

// IsPVCUsedByPods 判断指定 PVC 是否已被快照中的 Pod 使用。
func (s *Snapshot) IsPVCUsedByPods(key string) bool {
	return s.usedPVCSet.Has(key)
}

拓扑感知的节点组织结构:nodeTree

当调用缓存快照的 List 方法时,会返回包含所有节点信息的 NodeInfo 列表,即 nodeInfoList 。我们提到过,这个列表是按照 nodeTree 结构的顺序组织的。

下面就正式讲解 nodeTree 结构及其顺序。

nodeTree 会按拓扑域(区域 Region + 可用区 Zone)来管理节点,为调度器提供一个稳定且均衡的遍历顺序。这样,当调度器遍历节点时,不仅能避免因 map 无序性导致的结果抖动,还能在无额外约束条件下实现跨区域的负载均衡。

举个例子,在如下三个可用区的节点分布状态下:

ZoneA: nodeA1, nodeA2
ZoneB: nodeB1, nodeB2, nodeB3
ZoneC: nodeC1

调度器调用缓存快照获取的节点列表顺序将会是:

nodeA1, nodeB1, nodeC1, nodeA2, nodeB2, nodeB3

这种顺序使调度器在遍历节点时能够自然地跨 Zone 轮询,而不是从某个随机节点开始。

回到其源码实现上,nodeTree 结构的定义如下:

type nodeTree struct {
    tree     map[string][]string // region-zone(key) -> 节点列表(val) 的映射
    zones    []string            // 存放所有 tree 的 key,即 region-zone 列表
    numNodes int                 // 总节点数
}

由于 nodeTree 只是负责组织拓扑顺序,所以只存储节点名称,不需要包含节点的详细信息(NodeInfo)。

要添加一个 Node 到 nodeTree ,通过 addNode() 方法,比较简单:

func (nt *nodeTree) addNode(n *v1.Node) {
	// 根据 region 和 zone 获取 key ,允许为空
	// 默认不为节点设置相应标签(topology.kubernetes.io/region 和 topology.kubernetes.io/zone)的情况下就是空的
	zone := utilnode.GetZoneKey(n)
	if na, ok := nt.tree[zone]; ok {
		for _, nodeName := range na {
			if nodeName == n.Name {
				klog.InfoS("Node already exists in the NodeTree", "node", klog.KObj(n))
				return
			}
		}
		// 非首次,添加到 tree 中
		nt.tree[zone] = append(na, n.Name)
	} else {
		// 首次,添加到 zones 和 tree 中
		nt.zones = append(nt.zones, zone)
		nt.tree[zone] = []string{n.Name}
	}
	klog.V(2).InfoS("Added node in listed group to NodeTree", "node", klog.KObj(n), "zone", zone)
	// 总节点数+1
	nt.numNodes++
}

list() 方法则会以轮询的方式遍历各个 zones 下的节点列表,就和例子讲的那样:

func (nt *nodeTree) list() ([]string, error) {
	if len(nt.zones) == 0 {
		// 没有区域,也就没有节点
		return nil, nil
	}
	nodesList := make([]string, 0, nt.numNodes) // 用于存放最终结果
	numExhaustedZones := 0                      // 记录已经遍历完节点的区域数量
	nodeIndex := 0                              // 当前轮询的节点索引

	// 外层循环直到收集到所有节点
	for len(nodesList) < nt.numNodes {
		if numExhaustedZones >= len(nt.zones) { 
			// 所有区域都已遍历完,但节点数量仍未达到预期,说明存在异常
			return nodesList, errors.New("all zones exhausted before reaching count of nodes expected")
		}
		// 遍历每个区域
		for zoneIndex := 0; zoneIndex < len(nt.zones); zoneIndex++ {
			na := nt.tree[nt.zones[zoneIndex]] // 获取该区域的节点列表
			if nodeIndex >= len(na) {          // 如果该区域节点已经遍历完,跳过
				if nodeIndex == len(na) {        // 记录遍历完该区域
					numExhaustedZones++
				}
				continue
			}
			// 添加当前索引对应的节点到最终列表
			nodesList = append(nodesList, na[nodeIndex])
		}
		// 轮询索引加 1,进入下一轮节点
		nodeIndex++
	}

	return nodesList, nil
}

其它的 removeNoderemoveZoneupdateNode 就不展开了。

调度缓存的具体实现

调度缓存的实现是 cacheImpl ,以下是与之相关的所有结构定义:

// nodeInfoListItem 是一个双向链表结构
// 内部持有一个 NodeInfo 指针,并通过 prev 和 next 与前后节点连接起来
// 多个 NodeInfo 会串联成一个按“最近更新”顺序排列的链表
type nodeInfoListItem struct {
	info *framework.NodeInfo
	next *nodeInfoListItem
	prev *nodeInfoListItem
}

type cacheImpl struct {
	stop   <-chan struct{}
	ttl    time.Duration
	period time.Duration
	mu sync.RWMutex
	
	// 存储假定 Pod 的 key(uid) 的集合
	// 这个 key 可以进一步用来在 podStates 中获取 podState
	assumedPods sets.String
	// 存储 Pod key(uid) 到 podState 的映射,这里存储的 Pod 既有假定的,也有已绑定的
	podStates map[string]*podState
	
	// 存储所有节点的映射,key 是 Node 名称,val 是所指向的 NodeInfo
	nodes     map[string]*nodeInfoListItem
	// headNode 指向 nodes 中最近更新的一个 NodeInfo,也就是链表的头部
	headNode *nodeInfoListItem
	
	// nodeTree 结构,用于前面提到的拓扑域排序
	nodeTree *nodeTree
	
	// 存储镜像名称对应 imageState 的映射,和本文关系不大,略过
	imageStates map[string]*imageState
}

type podState struct {
	pod *v1.Pod
	// 被假定的 Pod 用来判断是否已过期的字段
	// 如果 deadline 是 nil,这个假定的 Pod 将永不过期
	// 在当前版本实现中,deadline 永远是 nil,后面提到
	deadline *time.Time
	// 标识是否已绑定结束的状态
	bindingFinished bool
}

type imageState struct {
	// 镜像的大小(以字节为单位)
	size int64
	// 拥有此镜像的 Node 名称集合
	nodes sets.String
}

cacheImpl 用双向链表 + map 的组合来管理所有 NodeInfo 信息,主要是因为通过 map 可以实现节点的 O(1) 定位,而双向链表则允许在 NodeInfo 更新时快速将其移动到链表头(如此一来,链表头节点的 Generation 一定是最大值),这样最近更新过的节点就都会集中在前端,等调用 UpdateSnapshot 更新快照时,仅扫描前端这部分节点就可以高效完成增量同步,这是一种 LRU 思想的体现,具体后面分析完就会了然。

AssumePod 假定 Pod

func (cache *cacheImpl) AssumePod(pod *v1.Pod) error {
	// 获取 Pod 对应的 key,实际返回的是 Pod UID,不展开
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err
	}

	cache.mu.Lock()
	defer cache.mu.Unlock()
	// 判断该 Pod 是否已经在 podStates 映射中了
	if _, ok := cache.podStates[key]; ok {
		return fmt.Errorf("pod %v(%v) is in the cache, so can't be assumed", key, klog.KObj(pod))
	}

	// 如果未在,则添加到缓存
	return cache.addPod(pod, true)
}

addPod 是添加 Pod 到缓存的具体逻辑,其中 assumePod 参数用来标识该 Pod 是否为假定状态:

func (cache *cacheImpl) addPod(pod *v1.Pod, assumePod bool) error {
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err
	}
	
	// 拿到节点指向的 NodeInfo
	n, ok := cache.nodes[pod.Spec.NodeName]
	if !ok {
		// 如果没有,则初始化创建一个链表节点
		n = newNodeInfoListItem(framework.NewNodeInfo())
		cache.nodes[pod.Spec.NodeName] = n
	}
	// 将 Pod 添加到当前 NodeInfo 中
	n.info.AddPod(pod)
	// 由于更新了 NodeInfo ,所以将该 NodeInfo 移动到链表头去
	cache.moveNodeInfoToHead(pod.Spec.NodeName)
	ps := &podState{
		pod: pod,
	}
	// 加入到 podState 映射中
	cache.podStates[key] = ps
	if assumePod {
		// Pod 是假定状态的,添加到 assumedPods 集合
		cache.assumedPods.Insert(key)
	}
	return nil
}

前面提到过 NodeInfo 一旦更新,就会将其移动到链表头,这里就体现到了,对应的 moveNodeInfoToHead 方法如下:

func (cache *cacheImpl) moveNodeInfoToHead(name string) {
	ni, ok := cache.nodes[name]
	if !ok {
		klog.ErrorS(nil, "No node info with given name found in the cache", "node", klog.KRef("", name))
		return
	}
	// 如果已经是头节点了,不需要再做什么
	if ni == cache.headNode {
		return
	}

	// 这里就是调整一下节点的前驱和后继的指向而已,最终目的就是使我们当前节点作为头节点
	if ni.prev != nil {
		ni.prev.next = ni.next
	}
	if ni.next != nil {
		ni.next.prev = ni.prev
	}
	if cache.headNode != nil {
		cache.headNode.prev = ni
	}
	ni.next = cache.headNode
	ni.prev = nil
	
	// 最后更新一下 headNode
	cache.headNode = ni
}

利用双向链表的特性,直接操作链表指针,时间复杂度是 O(1),非常高效。

FinishBinding 结束绑定

func (cache *cacheImpl) FinishBinding(pod *v1.Pod) error {
	return cache.finishBinding(pod, time.Now())
}

func (cache *cacheImpl) finishBinding(pod *v1.Pod, now time.Time) error {
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err
	}

	cache.mu.RLock()
	defer cache.mu.RUnlock()

	klog.V(5).InfoS("Finished binding for pod, can be expired", "podKey", key, "pod", klog.KObj(pod))
	currState, ok := cache.podStates[key]
	if ok && cache.assumedPods.Has(key) {
		// 如果 podStates 映射中存在 Pod 并且该 Pod 是假定状态的
		if cache.ttl == time.Duration(0) {
			// 如果没有配置 ttl ,那么假定 Pod 是永不过期的
			currState.deadline = nil
		} else {
			// 否则配置假定 Pod 的过期时间
			dl := now.Add(cache.ttl)
			currState.deadline = &dl
		}
		// 标识一下绑定结束的状态
		currState.bindingFinished = true
	}
	return nil
}

之前多次提到,在当前版本实现中,假定 Pod 的过期逻辑是视为“已废弃”的,是因为默认 ttl 为 0 ,也就是永不过期,具体原因可以参考 https://github.com/kubernetes/kubernetes/issues/106361 ,大致就是原始 ttl 默认是 30s ,但是发现有可能因为超时等原因(例如因为接收 apiserver 事件超时,Pod 实际绑定成功,但缓存中假定却过期了),从而导致一些竞争条件的发生,最终结论是希望完全删除 ttl 的逻辑(可以主要依靠 ForgetPod ),但考虑版本发布影响等因素,暂时就先设为 0 了:

const (
	// Duration the scheduler will wait before expiring an assumed pod.
	// See issue #106361 for more details about this parameter and its value.
	durationToExpireAssumedPod time.Duration = 0
)

因此本文不再展开讨论调度缓存中假定 Pod 的过期逻辑,因为已无实际意义。

ForgetPod 移除假定 Pod

func (cache *cacheImpl) ForgetPod(pod *v1.Pod) error {
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err
	}

	cache.mu.Lock()
	defer cache.mu.Unlock()

	currState, ok := cache.podStates[key]
	// 如果假定 Pod 的 NodeName 和实际调度的不一样,则返回错误
	if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName {
		return fmt.Errorf("pod %v(%v) was assumed on %v but assigned to %v", key, klog.KObj(pod), pod.Spec.NodeName, currState.pod.Spec.NodeName)
	}

	// 只有被假定的 Pod 才能被移除
	if ok && cache.assumedPods.Has(key) {
		return cache.removePod(pod)
	}
	return fmt.Errorf("pod %v(%v) wasn't assumed so cannot be forgotten", key, klog.KObj(pod))
}

其中 removePod 的实现也比较简单:

func (cache *cacheImpl) removePod(pod *v1.Pod) error {
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err
	}

	// 找到链表中对应的节点
	n, ok := cache.nodes[pod.Spec.NodeName]
	if !ok {
		klog.ErrorS(nil, "Node not found when trying to remove pod", "node", klog.KRef("", pod.Spec.NodeName), "podKey", key, "pod", klog.KObj(pod))

	} else {
		// 移除该 Pod
		if err := n.info.RemovePod(pod); err != nil {
			return err
		}
		if len(n.info.Pods) == 0 && n.info.Node() == nil {
			// 如果移除 Pod 后,该链表节点的 Pod 和 Node 都为空了,则将该链表节点也移除掉
			cache.removeNodeInfoFromList(pod.Spec.NodeName)
		} else {
			// 一样的操作,更新了 NodeInfo ,就要将该 NodeInfo 移动到链表头去
			cache.moveNodeInfoToHead(pod.Spec.NodeName)
		}
	}

	// 清理对应假定 Pod 的映射和集合
	delete(cache.podStates, key)
	delete(cache.assumedPods, key)
	return nil
}

不过在移除 Pod 后,会做一个额外的判断:检查对应的链表节点(即 NodeInfo)中的 Pod 列表和 Node 对象是否都为空。如果两者都为空,则将该 NodeInfo 从链表中移除。

这是因为在 RemoveNode 的实现中(后面会提到),即便 Node 被删除,缓存也不会立即移除 NodeInfo,而是要等到 Node 上所有的 Pod 都被清空后才能删除。因此这里额外的判断可以防止遗漏,确保缓存的状态正确。

AddPod 添加 Pod

AddPod 用在事件处理器中对 Pod Add 事件的监听上:

func (cache *cacheImpl) AddPod(pod *v1.Pod) error {
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err
	}

	cache.mu.Lock()
	defer cache.mu.Unlock()

	currState, ok := cache.podStates[key]
	switch {
	case ok && cache.assumedPods.Has(key):
		// 当执行假定操作时,我们已经将 Pod 添加到了缓存中,
		// 这里只需要更新以确保 Pod 的状态是最新的。
		if err = cache.updatePod(currState.pod, pod); err != nil {
			klog.ErrorS(err, "Error occurred while updating pod")
		}
		if currState.pod.Spec.NodeName != pod.Spec.NodeName {
			// 如果 Pod 被添加到了一个与它被假定时不同的节点上,则作出提示。
			klog.InfoS("Pod was added to a different node than it was assumed", "podKey", key, "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName))
			return nil
		}
	case !ok:
		// podStates 映射中找不到,就加进来,但此时不是假定状态的了,所以传值 false
		// 比如 Pod 创建时就指定了 Node
		if err = cache.addPod(pod, false); err != nil {
			klog.ErrorS(err, "Error occurred while adding pod")
		}
	default:
		return fmt.Errorf("pod %v(%v) was already in added state", key, klog.KObj(pod))
	}
	return nil
}

其中 updatePod 会先进行 removePod 再进行 addPod ,这两个方法都分析过了:

func (cache *cacheImpl) updatePod(oldPod, newPod *v1.Pod) error {
	if err := cache.removePod(oldPod); err != nil {
		return err
	}
	return cache.addPod(newPod, false)
}

UpdatePod 更新 Pod

UpdatePod 用在事件处理器中对 Pod Update 事件的监听上,调用的就是上面刚提到的 updatePod (先进行 removePod 再进行 addPod):

func (cache *cacheImpl) UpdatePod(oldPod, newPod *v1.Pod) error {
	key, err := framework.GetPodKey(oldPod)
	if err != nil {
		return err
	}

	cache.mu.Lock()
	defer cache.mu.Unlock()

	currState, ok := cache.podStates[key]
	if !ok {
		return fmt.Errorf("pod %v(%v) is not added to scheduler cache, so cannot be updated", key, klog.KObj(oldPod))
	}

	// 一个被假定的 Pod 不会有 Update/Delete 事件。它在 Update 事件之前必须有一个 Add 事件
	// 所以这种情况下返回错误
	if cache.assumedPods.Has(key) {
		return fmt.Errorf("assumed pod %v(%v) should not be updated", key, klog.KObj(oldPod))
	}

	if currState.pod.Spec.NodeName != newPod.Spec.NodeName {
		klog.ErrorS(nil, "Pod updated on a different node than previously added to", "podKey", key, "pod", klog.KObj(oldPod))
		klog.ErrorS(nil, "scheduler cache is corrupted and can badly affect scheduling decisions")
		klog.FlushAndExit(klog.ExitFlushTimeout, 1)
	}
	// 调用 updatePod 方法
	return cache.updatePod(oldPod, newPod)
}

RemovePod 移除 Pod

RemovePod 用在事件处理器中对 Pod Delete 事件的监听上:

func (cache *cacheImpl) RemovePod(pod *v1.Pod) error {
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err
	}

	cache.mu.Lock()
	defer cache.mu.Unlock()

	currState, ok := cache.podStates[key]
	if !ok {
		return fmt.Errorf("pod %v(%v) is not found in scheduler cache, so cannot be removed from it", key, klog.KObj(pod))
	}
	if currState.pod.Spec.NodeName != pod.Spec.NodeName {
		klog.ErrorS(nil, "Pod was added to a different node than it was assumed", "podKey", key, "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName))
		if pod.Spec.NodeName != "" {
			// 当调度器错过一个 Delete 事件并从 informer 缓存中获取到最后一个已知状态时,
			// NodeName 可能为空。
			klog.ErrorS(nil, "scheduler cache is corrupted and can badly affect scheduling decisions")
			klog.FlushAndExit(klog.ExitFlushTimeout, 1)
		}
	}
	// 调用 removePod 方法,也已提过
	return cache.removePod(currState.pod)
}

GetPod 获取 Pod

func (cache *cacheImpl) GetPod(pod *v1.Pod) (*v1.Pod, error) {
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return nil, err
	}

	cache.mu.RLock()
	defer cache.mu.RUnlock()

	// 直接从映射获取就行了
	podState, ok := cache.podStates[key]
	if !ok {
		return nil, fmt.Errorf("pod %v(%v) does not exist in scheduler cache", key, klog.KObj(pod))
	}

	return podState.pod, nil
}

IsAssumedPod 判断 Pod 是否是假定且未过期

func (cache *cacheImpl) IsAssumedPod(pod *v1.Pod) (bool, error) {
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return false, err
	}

	cache.mu.RLock()
	defer cache.mu.RUnlock()
	
	// 就是判断 Pod 是否在假定集合中
	return cache.assumedPods.Has(key), nil
}

AddNode 添加节点

AddNode 用在事件处理器中对 Node Add 事件的监听上:

func (cache *cacheImpl) AddNode(node *v1.Node) *framework.NodeInfo {
	cache.mu.Lock()
	defer cache.mu.Unlock()

	n, ok := cache.nodes[node.Name]
	if !ok {
		n = newNodeInfoListItem(framework.NewNodeInfo())
		cache.nodes[node.Name] = n
	} else {
		// 如果节点已存在,先移除该节点原有的镜像状态(image states)
		cache.removeNodeImageStates(n.info.Node())
	}
	// 老朋友,将该节点移动到链表头部,表示最近被更新
	cache.moveNodeInfoToHead(node.Name)

	// 将节点加入 nodeTree,用于后续调度中快速按拓扑结构查找节点,这个作用前面已经说过了
	cache.nodeTree.addNode(node)
	// 添加节点的镜像状态信息(镜像大小、镜像存在情况等)
	cache.addNodeImageStates(node, n.info)
	// 更新 NodeInfo 中保存的节点对象
	n.info.SetNode(node)
	// 返回 NodeInfo 的克隆对象(避免外部修改内部缓存数据)
	return n.info.Clone()
}

用于按拓扑域排序的 nodeTree 结构,也是在这一阶段将节点添加到其中的。

UpdateNode 更新节点

UpdateNode 用在事件处理器中对 Node Update 事件的监听上:

func (cache *cacheImpl) UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo {
	cache.mu.Lock()
	defer cache.mu.Unlock()

	n, ok := cache.nodes[newNode.Name]
	if !ok {
		// 如果不存在,创建新的 NodeInfoListItem 并加入 map 和 nodeTree
		n = newNodeInfoListItem(framework.NewNodeInfo())
		cache.nodes[newNode.Name] = n
		cache.nodeTree.addNode(newNode)
	} else {
		// 如果已存在,移除原有镜像状态
		cache.removeNodeImageStates(n.info.Node())
	}
	// 将节点移动到链表头部,表示最近更新
	cache.moveNodeInfoToHead(newNode.Name)

	// 更新 nodeTree 中该节点的拓扑结构信息
	cache.nodeTree.updateNode(oldNode, newNode)
	// 添加新的镜像状态
	cache.addNodeImageStates(newNode, n.info)
	// 更新 NodeInfo 中的节点对象
	n.info.SetNode(newNode)
	// 返回 NodeInfo 的克隆
	return n.info.Clone()
}

RemoveNode 移除节点

RemoveNode 用在事件处理器中对 Node Delete 事件的监听上:

// RemoveNode 从缓存中移除一个节点
// 注意:节点上可能还有 Pod 存在,因为 Pod 删除事件可能尚未到达。
// 因此:
// - nodeTree 中会立即移除该节点,使后续调度不会出现在快照中
// - 但缓存中会保留一个“幽灵节点”(ghost node)直到所有 Pod 删除事件都到达。
func (cache *cacheImpl) RemoveNode(node *v1.Node) error {
	cache.mu.Lock()
	defer cache.mu.Unlock()

	n, ok := cache.nodes[node.Name]
	if !ok {
		return fmt.Errorf("node %v is not found", node.Name)
	}
	// 从 NodeInfo 中移除 Node 对象,但保留 Pod 信息
	n.info.RemoveNode()

	// 仅当该节点上已经没有 Pod 时,才真正从链表和 map 中移除 NodeInfo
	if len(n.info.Pods) == 0 {
		cache.removeNodeInfoFromList(node.Name)
	} else {
		// 否则将该 NodeInfo 移动到链表头部,等待 Pod 删除事件
		cache.moveNodeInfoToHead(node.Name)
	}

	// 从 nodeTree 中移除该节点,使后续调度不可见
	if err := cache.nodeTree.removeNode(node); err != nil {
		return err
	}
	// 移除节点镜像状态
	cache.removeNodeImageStates(node)
	return nil
}

当 Node 被删除时,会立即从调度的 nodeTree 数据结构中剔除以保证后续调度安全(快照的节点列表是通过 nodeTree 来组织的),但为保证事件完整性与最终一致性,在缓存层仍然会保留该节点的 Pod 列表(但会将 Node 置空,形成幽灵节点)直到所有关联的 Pod 删除事件处理完毕。

这里就对照了前面提到的 removePod 的一个额外判断:在移除 Pod 后,需要检查对应的 NodeInfo 中的 Pod 列表和 Node 对象是否都为空,如果两者都为空,说明该 Node 已然是一个幽灵节点,就可以从链表中安全移除了。

UpdateSnapshot 更新快照

最后,我们来到调度缓存最为关键的一个方法,快照的增量更新逻辑,该逻辑也是一种 LRU 思想的体现:

func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error {
	cache.mu.Lock()
	defer cache.mu.Unlock()

	// 获取当前的快照的 generation ,缓存中只有比该 generation 大的 NodeInfo 才需要更新。
	snapshotGeneration := nodeSnapshot.generation

	// updateAllLists 代表全量更新,需要根据 nodeTree 重建快照的 nodeInfoList
	// 否则沿用旧的 nodeInfoList 即可
	updateAllLists := false
	// 三个状态,用来判断快照中哪些辅助的派生列表(详见前文)需要更新
	updateNodesHavePodsWithAffinity := false
	updateNodesHavePodsWithRequiredAntiAffinity := false
	updateUsedPVCSet := false

	// 从 NodeInfo 双向链表的头节点 headNode 开始遍历。
	// 因为 headNode 肯定是最新的,其 Generation 肯定是最大的。
	// 这里就是增量更新的逻辑,也就是 LRU 思想的体现。
	for node := cache.headNode; node != nil; node = node.next {
		if node.info.Generation <= snapshotGeneration {
			// 遇到比 snapshotGeneration 小或相等的节点,说明后面的都不需要处理了,可以退出了。
			// 增量更新完成。
			break
		}
		if np := node.info.Node(); np != nil {
			existing, ok := nodeSnapshot.nodeInfoMap[np.Name]
			if !ok {
				// 如果快照中没有该节点,则说明是新增节点或快照之前没有记录
				// 则需要重建 nodeInfoList(因为节点集合发生变化)
				updateAllLists = true
				// 在快照的 map 中创建一个占位的 NodeInfo 指针
				existing = &framework.NodeInfo{}
				nodeSnapshot.nodeInfoMap[np.Name] = existing
			}
			// 拷贝当前 cache 中的 NodeInfo(Clone 返回 *framework.NodeInfo)
			clone := node.info.Clone()
			
			// 一些状态比较,用于判断哪些辅助的派生列表需要更新
			if (len(existing.PodsWithAffinity) > 0) != (len(clone.PodsWithAffinity) > 0) {
				updateNodesHavePodsWithAffinity = true
			}
			if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) {
				updateNodesHavePodsWithRequiredAntiAffinity = true
			}
			if !updateUsedPVCSet {
				if len(existing.PVCRefCounts) != len(clone.PVCRefCounts) {
					updateUsedPVCSet = true
				} else {
					for pvcKey := range clone.PVCRefCounts {
						if _, found := existing.PVCRefCounts[pvcKey]; !found {
							updateUsedPVCSet = true
							break
						}
					}
				}
			}

			// 关键操作:不是直接替换 existing 指针,而是把 clone 的内容拷贝到 existing 指向的内存中
			// 这样可以保留 existing 的地址(因为 nodeInfoList 里保存的是指针,外部也可能持有该指针)
			// 到这里循环内,就完成了快照中的 nodeInfoMap 的增量更新。
			// 这个时候 nodeInfoList 和其它的辅助的派生列表仍未更新,等下面继续处理。
			*existing = *clone
		}
	}
	
	// 用最新的 NodeInfo Generation 更新到快照中。
	if cache.headNode != nil {
		nodeSnapshot.generation = cache.headNode.info.Generation
	}

	// 如果 snapshot.nodeInfoMap 长度大于 nodeTree 中的节点数,需要尝试移除已删除节点(与 nodeTree 同步)
	if len(nodeSnapshot.nodeInfoMap) > cache.nodeTree.numNodes {
		cache.removeDeletedNodesFromSnapshot(nodeSnapshot)
		// 同样的,节点集合发生了变化,nodeInfoList 需要重建了
		updateAllLists = true
	}

	// 如果需要重建任一快照列表,就调用更新函数
	// 只有 updateAllLists 用来判断 nodeInfoList 是否需要重建
	// 其它三个辅助的派生列表任意其一需要重建,则三个是一起重建的
	if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity || updateUsedPVCSet {
		cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists)
	}
	
	// 更新完后,发现快照的 nodeInfoList (用 nodeTree 组织的顺序列表)与缓存中的 nodeTree 的节点数量不对等,说明有错误发生
	if len(nodeSnapshot.nodeInfoList) != cache.nodeTree.numNodes {
		errMsg := fmt.Sprintf("snapshot state is not consistent, length of NodeInfoList=%v not equal to length of nodes in tree=%v "+
			", length of NodeInfoMap=%v, length of nodes in cache=%v"+
			", trying to recover",
			len(nodeSnapshot.nodeInfoList), cache.nodeTree.numNodes,
			len(nodeSnapshot.nodeInfoMap), len(cache.nodes))
		klog.ErrorS(nil, errMsg)
		// 尝试通过为下一个调度周期重新创建所有列表,期望可以在下一个调度周期中恢复,但仍然返回一个错误暴露问题,
		// 这个错误很可能会导致当前调度周期失败。
		cache.updateNodeInfoSnapshotList(nodeSnapshot, true)
		return fmt.Errorf(errMsg)
	}

	return nil
}

这个方法略微复杂,但大致流程通过阅读注释应该也七七八八了,对于利用 nodeTree 来组织快照中 nodeInfoList 的列表顺序,继续看到 updateNodeInfoSnapshotList 方法:

func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
	// 重建这些辅助的派生列表(先清空)
	snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
	snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
	snapshot.usedPVCSet = sets.NewString()

	if updateAll {
		// 重建 nodeInfoList(按照 nodeTree 的节点顺序)
		snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
		// 调用 nodeTree 的 list 方法,这里便是按拓扑域排序好的 Node 列表了
		nodesList, err := cache.nodeTree.list()
		if err != nil {
			klog.ErrorS(err, "Error occurred while retrieving the list of names of the nodes from node tree")
		}
		for _, nodeName := range nodesList {
			if nodeInfo := snapshot.nodeInfoMap[nodeName]; nodeInfo != nil {
				// 将 nodeInfo 按照 nodeTree 的顺序追加到 nodeInfoList
				snapshot.nodeInfoList = append(snapshot.nodeInfoList, nodeInfo)
				// 收集有 affinity 的节点
				if len(nodeInfo.PodsWithAffinity) > 0 {
					snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
				}
				// 收集有 required anti-affinity 的节点
				if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
					snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
				}
				// 收集所有被引用的 PVC key 到 usedPVCSet
				for key := range nodeInfo.PVCRefCounts {
					snapshot.usedPVCSet.Insert(key)
				}
			} else {
				// 如果 nodeTree 中存在但 snapshot.nodeInfoMap 中没有对应的 nodeInfo,这是不应该发生的情况,记录日志
				klog.ErrorS(nil, "Node exists in nodeTree but not in NodeInfoMap, this should not happen", "node", klog.KRef("", nodeName))
			}
		}
	} else {
		// 利用已有的 nodeInfoList(旧的顺序,不重建),只重建辅助的派生列表,同上
		for _, nodeInfo := range snapshot.nodeInfoList {
			if len(nodeInfo.PodsWithAffinity) > 0 {
				snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
			}
			if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
				snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
			}
			for key := range nodeInfo.PVCRefCounts {
				snapshot.usedPVCSet.Insert(key)
			}
		}
	}
}

到这里,就和前文的都串联起来了。

往后,调度器每从调度队列 Pop 出一个 Pod 进行调度,执行调度周期时,就可以调用调度缓存的该方法将缓存快照(初始时是空的)更新为最新的一份(按拓扑顺序排列),供所有的调度框架的插件使用。

微信公众号

更多内容请关注微信公众号:gopher云原生