02. client-go 的 Informer 机制

02. client-go 的 Informer 机制

上一回讲到 client-go 的四种客户端:RESTClient、ClientSet、DynamicClient、DiscoveryClient 。

Kubernetes 的内部组件,如 kube-scheduler 、kube-controller-manager 等都需要通过 client-go 和 kube-apiserver 交互,但过程中为了保证 HTTP 消息的实时性、可靠性、顺序性,还得借助 client-go 的 Informer 机制。

本文先浅浅的以 kube-controller-manager 为例,看看其是如何使用 Informer 机制的(不会涉及到组件的其它逻辑)。

在 kube-controller-manager 的启动过程中,会创建一个 SharedInformerFactory 实例并保存起来:

// cmd/kube-controller-manager/app/controllermanager.go

func CreateControllerContext(logger klog.Logger, s *config.CompletedConfig, rootClientBuilder, clientBuilder clientbuilder.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
	// versionedClient 是一个 ClientSet 客户端
	versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
	// 创建一个 SharedInformerFactory 实例,需要传入一个 ClientSet ,以及同步周期
	sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
	// ...
	ctx := ControllerContext{
		// 保存 SharedInformer
		InformerFactory:                 sharedInformers,
	}
	// ...
}

在后续启动控制器的时候,就会使用保存的 SharedInformerFactory 实例创建出各个资源的 Informer :

// cmd/kube-controller-manager/app/apps.go

func startDeploymentController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
	dc, err := deployment.NewDeploymentController(
		ctx,
		// 使用 SharedInformerFactory 实例创建出各个资源的 Informer
		controllerContext.InformerFactory.Apps().V1().Deployments(),
		controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
		controllerContext.InformerFactory.Core().V1().Pods(),
		controllerContext.ClientBuilder.ClientOrDie("deployment-controller"),
	)
	// ...
}

然后根据各个资源的 Informer 为资源添加监听事件,在里面实现控制器的逻辑:

// pkg/controller/deployment/deployment_controller.go

func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
	// ...

	// 为 Deployment 资源添加监听事件
	dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			dc.addDeployment(logger, obj)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			dc.updateDeployment(logger, oldObj, newObj)
		},
		// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
		DeleteFunc: func(obj interface{}) {
			dc.deleteDeployment(logger, obj)
		},
	})
	// 为 ReplicaSet 资源添加监听事件
	rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			dc.addReplicaSet(logger, obj)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			dc.updateReplicaSet(logger, oldObj, newObj)
		},
		DeleteFunc: func(obj interface{}) {
			dc.deleteReplicaSet(logger, obj)
		},
	})
	// 为 Pod 资源添加监听事件
	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		DeleteFunc: func(obj interface{}) {
			dc.deletePod(logger, obj)
		},
	})
	// ...
}

将上面流程整合成一个例子:

package main

import (
	"fmt"
	"time"

	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	// 通过 kubeconfig 生成配置
	config, err := clientcmd.BuildConfigFromFlags("", ".kube/config")
	if err != nil {
		panic(err)
	}

	// 创建 ClientSet
	clientSet, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	// 上面就是我们上一回的 ClientSet 创建例子

	// 借助 ClientSet 创建 SharedInformerFactory 实例
	sharedInformerFactory := informers.NewSharedInformerFactory(clientSet, 10*time.Second)

	// 使用 SharedInformerFactory 实例创建 Pod 资源的 Informer
	podInformer := sharedInformerFactory.Core().V1().Pods()

	// 为 Pod 资源添加监听事件
	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			pod := obj.(*corev1.Pod)
			fmt.Println("Add", pod.Name)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			oldPod := oldObj.(*corev1.Pod)
			newPod := newObj.(*corev1.Pod)
			fmt.Println("Update", oldPod.Name, newPod.Status)
		},
		DeleteFunc: func(obj interface{}) {
			pod := obj.(*corev1.Pod)
			fmt.Println("Delete", pod.Name)
		},
	})

	// 启动 SharedInformerFactory 创建的所有 Informer
	stopCh := make(chan struct{})
	defer close(stopCh)
	sharedInformerFactory.Start(stopCh)

	select {}
}

运行该程序,就可以对集群的 Pod 资源进行监听,在资源发生变更时打印出对应的日志:

$ go run main.go
Add nginx
Update nginx {Pending []      [] <nil> [] [] BestEffort [] }
Update nginx {Pending []      [] <nil> [] [] BestEffort [] }
Delete nginx

Untitled

明白了如何使用,正式进入正文,分析 Informer 机制的原理。

首先看到 Informer 共享工厂(SharedInformerFactory),其接口定义如下:

// k8s.io/client-go/informers/factory.go

type SharedInformerFactory interface {
	internalinterfaces.SharedInformerFactory
	// 启动所有创建的 Informer
	Start(stopCh <-chan struct{})
	// 停止运行
	Shutdown()
	WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
	ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
	InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer

	// 各种实现了 Informer 机制的内置资源,Informer 工厂可以调用以下的接口创建各种资源的 Informer 实例
	Admissionregistration() admissionregistration.Interface
	Internal() apiserverinternal.Interface
	Apps() apps.Interface
	Autoscaling() autoscaling.Interface
	Batch() batch.Interface
	Certificates() certificates.Interface
	Coordination() coordination.Interface
	Core() core.Interface
	Discovery() discovery.Interface
	Events() events.Interface
	Extensions() extensions.Interface
	Flowcontrol() flowcontrol.Interface
	Networking() networking.Interface
	Node() node.Interface
	Policy() policy.Interface
	Rbac() rbac.Interface
	Resource() resource.Interface
	Scheduling() scheduling.Interface
	Storage() storage.Interface
}

在我们例子中调用的 informers.NewSharedInformerFactory 实际就是创建这个 Informer 共享工厂对象 :

// k8s.io/client-go/informers/factory.go

// SharedInformerFactory 接口的实现
type sharedInformerFactory struct {
	// ClientSet 客户端,用于和 kube-apiserver 交互
	client           kubernetes.Interface
	// ...
	// 缓存所有创建的 Informer 【注意】类型是 cache.SharedIndexInformer
	informers map[reflect.Type]cache.SharedIndexInformer
	// ...
}

// 创建 SharedInformerFactory
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
	return NewSharedInformerFactoryWithOptions(client, defaultResync)
}

func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
	factory := &sharedInformerFactory{
		client:           client,
		informers:        make(map[reflect.Type]cache.SharedIndexInformer),
	}

	// Apply all options
	for _, opt := range options {
		factory = opt(factory)
	}

	return factory
}

// 启动所有创建的 Informer
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()

	if f.shuttingDown {
		return
	}

	// 遍历所有创建的 Informer
	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
			f.wg.Add(1)
			informer := informer
			go func() {
				defer f.wg.Done()
				// 运行 Informer,即【调用 cache.SharedIndexInformer 的 Run 方法】
				informer.Run(stopCh)
			}()
			f.startedInformers[informerType] = true
		}
	}
}

SharedInformerFactory 的创建、启动方法的代码都比较简单,一目了然。

当调用 SharedInformerFactory 的 Core().V1().Pods() 方法时,返回的是 PodInformer 接口:

// k8s.io/client-go/informers/core/v1/interface.go

func (v *version) Pods() PodInformer {
	return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

接口的定义如下:

// k8s.io/client-go/informers/core/v1/pod.go

type PodInformer interface {
	Informer() cache.SharedIndexInformer
	Lister() v1.PodLister
}

// PodInformer 接口的实现
type podInformer struct {
	// SharedInformerFactory
	factory          internalinterfaces.SharedInformerFactory
	tweakListOptions internalinterfaces.TweakListOptionsFunc
	namespace        string
}

// 获取 PodInformer 的 Informer 对象
func (f *podInformer) Informer() cache.SharedIndexInformer {
	// 调用 SharedInformerFactory 的 InformerFor 方法
	return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

当调用 PodInformer 的 Informer 方法创建 Pod 资源的 Informer 对象时,会调用 SharedInformerFactory 的 InformerFor 方法,又回到 factory 中:

// k8s.io/client-go/informers/factory.go

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

	informerType := reflect.TypeOf(obj)
	// 判断该资源的 Informer 是否已缓存过
	informer, exists := f.informers[informerType]
	if exists {
		// 如果已缓存过,则直接返回
		return informer
	}

	resyncPeriod, exists := f.customResync[informerType]
	if !exists {
		resyncPeriod = f.defaultResync
	}

	// 调用 newFunc 创建资源的 Informer 对象
	informer = newFunc(f.client, resyncPeriod)
	// 缓存起来
	f.informers[informerType] = informer

	return informer
}

从 InformerFor 方法中,就可以明白为什么我们的 InformerFactory 是 Shared 的,对于同一类资源,InformerFactory 是共享的,只会初始化一次,这可以避免在 Start 的时候运行重复的 Informer ,导致 kube-apiserver 负载过重。

继续看到关键逻辑 newFunc 函数,对于 PodInformer ,也就是参数所传递的 defaultInformer 方法:

// k8s.io/client-go/informers/core/v1/pod.go

func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		// ListerWatcher 对象,借助 ClientSet 客户端实现对资源的 List 和 Watch 操作
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				// 通过 ClientSet 客户端获取 Pod 资源列表
				return client.CoreV1().Pods(namespace).List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				// 通过 ClientSet 客户端 Watch Pod 资源列表
				return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
			},
		},
		&corev1.Pod{},
		resyncPeriod,
		indexers,
	)
}

进一步跟踪,来到 NewSharedIndexInformer 方法:


// k8s.io/client-go/tools/cache/shared_informer.go

func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
	return NewSharedIndexInformerWithOptions(
		lw,
		exampleObject,
		SharedIndexInformerOptions{
			ResyncPeriod: defaultEventHandlerResyncPeriod,
			Indexers:     indexers,
		},
	)
}

func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
	realClock := &clock.RealClock{}

	// sharedIndexInformer 实例
	return &sharedIndexInformer{
		// 创建 Indexer 实例,该 【Indexer】 很重要,待会会提及
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
		processor:                       &sharedProcessor{clock: realClock},
		listerWatcher:                   lw,
		objectType:                      exampleObject,
		objectDescription:               options.ObjectDescription,
		resyncCheckPeriod:               options.ResyncPeriod,
		defaultEventHandlerResyncPeriod: options.ResyncPeriod,
		clock:                           realClock,
		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
	}
}

最后,就是整个 Informer 机制最为核心且复杂的地方,也就是 sharedIndexInformer 的 Run 方法:

// k8s.io/client-go/tools/cache/shared_informer.go

type sharedIndexInformer struct {
	indexer    Indexer
	controller Controller
	listerWatcher ListerWatcher
	// ...
}

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	// ...
}

在开始进入 Run 方法前,先了解下 Informer 的架构设计里面,有以下三个重要组件:

  • DeltaFIFO:是一个用来存储 K8s 资源对象及其类型的先进先出的队列
  • Reflector:用于监控 kube-apiserver 中指定的资源,当资源变化时,更新到 DeltaFIFO 中(充当生产者)
  • Indexer:存储资源对象并自带索引功能的本地存储,Informer 会从 DeltaFIFO 中将消费出来的资源存储到 Indexer 中,后续 client-go 获取资源就可以直接从 Indexer 中获取,减少服务器压力

三个组件的交互流程如下图的上半部分所示:

Untitled

图源:https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md

开始分析 Run 方法:

// k8s.io/client-go/tools/cache/shared_informer.go

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	// ...

	// 首先创建 【 DeltaFIFO 】 队列
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          s.indexer,
		EmitDeltaTypeReplaced: true,
		Transformer:           s.transform,
	})

	// cfg 是 sharedIndexInformer controller 的配置
	cfg := &Config{
		// DeltaFIFO 队列
		Queue:             fifo,
		// ListerWatcher 实现对资源的 List 和 Watch 操作
		ListerWatcher:     s.listerWatcher,
		// ...
		// 对 DeltaFIFO 队列的处理方法(消费)
		Process:           s.HandleDeltas,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		// 初始化 controller
		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// ...
	// 运行 controller
	s.controller.Run(stopCh)
}

看到 controller 的 Run 方法:

// k8s.io/client-go/tools/cache/controller.go

func (c *controller) Run(stopCh <-chan struct{}) {
	// 创建 【 Reflector 】实例
	r := NewReflectorWithOptions(
		// ListerWatcher 实现对资源的 List 和 Watch 操作
		c.config.ListerWatcher,
		// DeltaFIFO 队列
		c.config.Queue,
	)
	// ...

	// r.Run 运行 Reflector 实例
	wg.StartWithChannel(stopCh, r.Run)

	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

Reflector 对象的 Run 方法:

// k8s.io/client-go/tools/cache/reflector.go

func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
	wait.BackoffUntil(func() {
		// 进入 ListAndWatch 方法
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
	klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
}

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
	var err error
	var w watch.Interface

	// ...

	// 启动一个协程定时同步,这里不展开,同步周期就是创建 SharedInformerFactory 时传递的参数
	go r.startResync(stopCh, cancelCh, resyncerrc)

	// 执行 watch 操作
	return r.watch(w, stopCh, resyncerrc)
}

func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
	// ...
	for {
		// ...

		if w == nil {
			// ...
			// 调用 ListerWatcher 的 Watch 方法
			w, err = r.listerWatcher.Watch(options)
			// ...
		}

		// 开始处理 watch ,其中 r.store 是 【 DeltaFIFO 】 队列
		err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
		// ...
}

func watchHandler(start time.Time,
	w watch.Interface,
	store Store,
	expectedType reflect.Type,
	expectedGVK *schema.GroupVersionKind,
	name string,
	expectedTypeName string,
	setLastSyncResourceVersion func(string),
	exitOnInitialEventsEndBookmark *bool,
	clock clock.Clock,
	errc chan error,
	stopCh <-chan struct{},
) error {
	// ...

loop:
	for {
		select {
		case <-stopCh:
			return errorStopRequested
		case err := <-errc:
			return err
		case event, ok := <-w.ResultChan():
			// ... 收到了资源更新通知
			switch event.Type {
			case watch.Added:
				// 将资源对象以 Add 的动作添加到 DeltaFIFO 队列中
				err := store.Add(event.Object)
				// ...
			case watch.Modified:
				// 将资源对象以 Update 的动作添加到 DeltaFIFO 队列中
				err := store.Update(event.Object)
				// ...
			case watch.Deleted:
				// 将资源对象以 Delete 的动作添加到 DeltaFIFO 队列中
				err := store.Delete(event.Object)
				// ...
			// ...
		}
	}

	// ...
}

可以看到,Reflector 组件在这里完成了它的功能:通过 ListerWatcher 的 Watch 功能监听 kube-apiserver 的资源变化,当发生变化时,写入到 DeltaFIFO 队列中,如图所示的第 1、2 步骤:

Untitled

回到 controller 的 Run 方法:

// k8s.io/client-go/tools/cache/controller.go

func (c *controller) Run(stopCh <-chan struct{}) {
	// 创建 【 Reflector 】
	r := NewReflectorWithOptions(
		// ListerWatcher 实现对资源的 List 和 Watch 操作
		c.config.ListerWatcher,
		// DeltaFIFO 队列
		c.config.Queue,
	)
	// ...

	// r.Run 运行 Reflector 实例(完成第1、2步骤)
	wg.StartWithChannel(stopCh, r.Run)

	// 调用 processLoop 处理 DeltaFIFO 队列(继续完成剩下的步骤)
	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

func (c *controller) processLoop() {
	for {
		// 从 DeltaFIFO 队列中 Pop 出资源对象,资源对象的处理方法为 c.config.Process
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

其中资源对象的处理方法 c.config.Process 实际是之前 sharedIndexInformer 的 Run 方法创建 controller 配置时所传递的 HandleDeltas 方法:

// k8s.io/client-go/tools/cache/shared_informer.go

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	// ...

	// cfg 是 sharedIndexInformer controller 的配置
	cfg := &Config{
		// ...
		// 对 DeltaFIFO 队列的处理方法(消费)
		Process:           s.HandleDeltas,
	}
	// ...
}

func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// 将 obj 转成 Deltas 类型( DeltaFIFO 队列保存的资源对象类型为 Deltas )
	if deltas, ok := obj.(Deltas); ok {
		// 处理 Deltas 对象
		// s.indexer 是之前创建 sharedIndexInformer 实例时就创建好的 【Indexer】实例
		return processDeltas(s, s.indexer, deltas, isInInitialList)
	}
	return errors.New("object given as Process argument is not Deltas")
}

跳转到 processDeltas 方法:

// k8s.io/client-go/tools/cache/controller.go

func processDeltas(
	handler ResourceEventHandler,
	clientState Store,
	deltas Deltas,
	isInInitialList bool,
) error {
	// handler 是对用户设置 AddEventHandler 资源变化的回调触发
	// clientState 是 【Indexer】实例
	for _, d := range deltas {
		obj := d.Object

		switch d.Type {
		case Sync, Replaced, Added, Updated:
			// 判断 Indexer 本地存储中是否已存在该资源对象
			if old, exists, err := clientState.Get(obj); err == nil && exists {
				// 已存在则更新到 Indexer 本地存储中
				if err := clientState.Update(obj); err != nil {
					return err
				}
				// 触发回调函数,通知用户的 UpdateFunc
				handler.OnUpdate(old, obj)
			} else {
				// 不存在,则添加到 Indexer 本地存储中
				if err := clientState.Add(obj); err != nil {
					return err
				}
				// 触发回调函数,通知用户的 AddFunc
				handler.OnAdd(obj, isInInitialList)
			}
		case Deleted:
			// 删除 Indexer 本地存储中对应的资源对象
			if err := clientState.Delete(obj); err != nil {
				return err
			}
			// 触发回调函数,通知用户的 DeleteFunc
			handler.OnDelete(obj)
		}
	}
	return nil
}

至次,第3、4、5、6步骤也都已完成。Informer 会从 DeltaFIFO 队列中 Pop 出资源对象,然后根据事件类型对 Indexer 本地存储进行增删改查操作,同时还会触发用户设置的回调函数(ResourceEventHandlerFuncs)。

Untitled

Informer 机制的分析就到这里。关于 DeltaFIFO、Reflector、Indexer 的内部实现,篇幅问题就不再展开。

微信公众号

更多内容请关注微信公众号:gopher云原生