02. client-go 的 Informer 机制
上一回讲到 client-go 的四种客户端:RESTClient、ClientSet、DynamicClient、DiscoveryClient 。
Kubernetes 的内部组件,如 kube-scheduler 、kube-controller-manager 等都需要通过 client-go 和 kube-apiserver 交互,但过程中为了保证 HTTP 消息的实时性、可靠性、顺序性,还得借助 client-go 的 Informer 机制。
本文先浅浅的以 kube-controller-manager 为例,看看其是如何使用 Informer 机制的(不会涉及到组件的其它逻辑)。
在 kube-controller-manager 的启动过程中,会创建一个 SharedInformerFactory
实例并保存起来:
// cmd/kube-controller-manager/app/controllermanager.go
func CreateControllerContext(logger klog.Logger, s *config.CompletedConfig, rootClientBuilder, clientBuilder clientbuilder.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
// versionedClient 是一个 ClientSet 客户端
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
// 创建一个 SharedInformerFactory 实例,需要传入一个 ClientSet ,以及同步周期
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
// ...
ctx := ControllerContext{
// 保存 SharedInformer
InformerFactory: sharedInformers,
}
// ...
}
在后续启动控制器的时候,就会使用保存的 SharedInformerFactory 实例创建出各个资源的 Informer :
// cmd/kube-controller-manager/app/apps.go
func startDeploymentController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
dc, err := deployment.NewDeploymentController(
ctx,
// 使用 SharedInformerFactory 实例创建出各个资源的 Informer
controllerContext.InformerFactory.Apps().V1().Deployments(),
controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.ClientBuilder.ClientOrDie("deployment-controller"),
)
// ...
}
然后根据各个资源的 Informer 为资源添加监听事件,在里面实现控制器的逻辑:
// pkg/controller/deployment/deployment_controller.go
func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
// ...
// 为 Deployment 资源添加监听事件
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dc.addDeployment(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dc.updateDeployment(logger, oldObj, newObj)
},
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: func(obj interface{}) {
dc.deleteDeployment(logger, obj)
},
})
// 为 ReplicaSet 资源添加监听事件
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dc.addReplicaSet(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dc.updateReplicaSet(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
dc.deleteReplicaSet(logger, obj)
},
})
// 为 Pod 资源添加监听事件
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
dc.deletePod(logger, obj)
},
})
// ...
}
将上面流程整合成一个例子:
package main
import (
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 通过 kubeconfig 生成配置
config, err := clientcmd.BuildConfigFromFlags("", ".kube/config")
if err != nil {
panic(err)
}
// 创建 ClientSet
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
// 上面就是我们上一回的 ClientSet 创建例子
// 借助 ClientSet 创建 SharedInformerFactory 实例
sharedInformerFactory := informers.NewSharedInformerFactory(clientSet, 10*time.Second)
// 使用 SharedInformerFactory 实例创建 Pod 资源的 Informer
podInformer := sharedInformerFactory.Core().V1().Pods()
// 为 Pod 资源添加监听事件
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Println("Add", pod.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)
fmt.Println("Update", oldPod.Name, newPod.Status)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Println("Delete", pod.Name)
},
})
// 启动 SharedInformerFactory 创建的所有 Informer
stopCh := make(chan struct{})
defer close(stopCh)
sharedInformerFactory.Start(stopCh)
select {}
}
运行该程序,就可以对集群的 Pod 资源进行监听,在资源发生变更时打印出对应的日志:
$ go run main.go
Add nginx
Update nginx {Pending [] [] <nil> [] [] BestEffort [] }
Update nginx {Pending [] [] <nil> [] [] BestEffort [] }
Delete nginx
明白了如何使用,正式进入正文,分析 Informer 机制的原理。
首先看到 Informer 共享工厂(SharedInformerFactory),其接口定义如下:
// k8s.io/client-go/informers/factory.go
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
// 启动所有创建的 Informer
Start(stopCh <-chan struct{})
// 停止运行
Shutdown()
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer
// 各种实现了 Informer 机制的内置资源,Informer 工厂可以调用以下的接口创建各种资源的 Informer 实例
Admissionregistration() admissionregistration.Interface
Internal() apiserverinternal.Interface
Apps() apps.Interface
Autoscaling() autoscaling.Interface
Batch() batch.Interface
Certificates() certificates.Interface
Coordination() coordination.Interface
Core() core.Interface
Discovery() discovery.Interface
Events() events.Interface
Extensions() extensions.Interface
Flowcontrol() flowcontrol.Interface
Networking() networking.Interface
Node() node.Interface
Policy() policy.Interface
Rbac() rbac.Interface
Resource() resource.Interface
Scheduling() scheduling.Interface
Storage() storage.Interface
}
在我们例子中调用的 informers.NewSharedInformerFactory
实际就是创建这个 Informer 共享工厂对象 :
// k8s.io/client-go/informers/factory.go
// SharedInformerFactory 接口的实现
type sharedInformerFactory struct {
// ClientSet 客户端,用于和 kube-apiserver 交互
client kubernetes.Interface
// ...
// 缓存所有创建的 Informer 【注意】类型是 cache.SharedIndexInformer
informers map[reflect.Type]cache.SharedIndexInformer
// ...
}
// 创建 SharedInformerFactory
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
return NewSharedInformerFactoryWithOptions(client, defaultResync)
}
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
}
// Apply all options
for _, opt := range options {
factory = opt(factory)
}
return factory
}
// 启动所有创建的 Informer
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
if f.shuttingDown {
return
}
// 遍历所有创建的 Informer
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
f.wg.Add(1)
informer := informer
go func() {
defer f.wg.Done()
// 运行 Informer,即【调用 cache.SharedIndexInformer 的 Run 方法】
informer.Run(stopCh)
}()
f.startedInformers[informerType] = true
}
}
}
SharedInformerFactory 的创建、启动方法的代码都比较简单,一目了然。
当调用 SharedInformerFactory 的 Core().V1().Pods()
方法时,返回的是 PodInformer
接口:
// k8s.io/client-go/informers/core/v1/interface.go
func (v *version) Pods() PodInformer {
return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
接口的定义如下:
// k8s.io/client-go/informers/core/v1/pod.go
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
// PodInformer 接口的实现
type podInformer struct {
// SharedInformerFactory
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}
// 获取 PodInformer 的 Informer 对象
func (f *podInformer) Informer() cache.SharedIndexInformer {
// 调用 SharedInformerFactory 的 InformerFor 方法
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
当调用 PodInformer 的 Informer 方法创建 Pod 资源的 Informer 对象时,会调用 SharedInformerFactory 的 InformerFor 方法,又回到 factory 中:
// k8s.io/client-go/informers/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(obj)
// 判断该资源的 Informer 是否已缓存过
informer, exists := f.informers[informerType]
if exists {
// 如果已缓存过,则直接返回
return informer
}
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
// 调用 newFunc 创建资源的 Informer 对象
informer = newFunc(f.client, resyncPeriod)
// 缓存起来
f.informers[informerType] = informer
return informer
}
从 InformerFor 方法中,就可以明白为什么我们的 InformerFactory 是 Shared
的,对于同一类资源,InformerFactory 是共享的,只会初始化一次,这可以避免在 Start 的时候运行重复的 Informer ,导致 kube-apiserver 负载过重。
继续看到关键逻辑 newFunc
函数,对于 PodInformer ,也就是参数所传递的 defaultInformer
方法:
// k8s.io/client-go/informers/core/v1/pod.go
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
// ListerWatcher 对象,借助 ClientSet 客户端实现对资源的 List 和 Watch 操作
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
// 通过 ClientSet 客户端获取 Pod 资源列表
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
// 通过 ClientSet 客户端 Watch Pod 资源列表
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
进一步跟踪,来到 NewSharedIndexInformer 方法:
// k8s.io/client-go/tools/cache/shared_informer.go
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
return NewSharedIndexInformerWithOptions(
lw,
exampleObject,
SharedIndexInformerOptions{
ResyncPeriod: defaultEventHandlerResyncPeriod,
Indexers: indexers,
},
)
}
func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
realClock := &clock.RealClock{}
// sharedIndexInformer 实例
return &sharedIndexInformer{
// 创建 Indexer 实例,该 【Indexer】 很重要,待会会提及
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
processor: &sharedProcessor{clock: realClock},
listerWatcher: lw,
objectType: exampleObject,
objectDescription: options.ObjectDescription,
resyncCheckPeriod: options.ResyncPeriod,
defaultEventHandlerResyncPeriod: options.ResyncPeriod,
clock: realClock,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
}
}
最后,就是整个 Informer 机制最为核心且复杂的地方,也就是 sharedIndexInformer 的 Run 方法:
// k8s.io/client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
indexer Indexer
controller Controller
listerWatcher ListerWatcher
// ...
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
// ...
}
在开始进入 Run 方法前,先了解下 Informer 的架构设计里面,有以下三个重要组件:
- DeltaFIFO:是一个用来存储 K8s 资源对象及其类型的先进先出的队列
- Reflector:用于监控 kube-apiserver 中指定的资源,当资源变化时,更新到 DeltaFIFO 中(充当生产者)
- Indexer:存储资源对象并自带索引功能的本地存储,Informer 会从 DeltaFIFO 中将消费出来的资源存储到 Indexer 中,后续 client-go 获取资源就可以直接从 Indexer 中获取,减少服务器压力
三个组件的交互流程如下图的上半部分所示:
图源:https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md
开始分析 Run 方法:
// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
// ...
// 首先创建 【 DeltaFIFO 】 队列
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
Transformer: s.transform,
})
// cfg 是 sharedIndexInformer controller 的配置
cfg := &Config{
// DeltaFIFO 队列
Queue: fifo,
// ListerWatcher 实现对资源的 List 和 Watch 操作
ListerWatcher: s.listerWatcher,
// ...
// 对 DeltaFIFO 队列的处理方法(消费)
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 初始化 controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// ...
// 运行 controller
s.controller.Run(stopCh)
}
看到 controller 的 Run 方法:
// k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
// 创建 【 Reflector 】实例
r := NewReflectorWithOptions(
// ListerWatcher 实现对资源的 List 和 Watch 操作
c.config.ListerWatcher,
// DeltaFIFO 队列
c.config.Queue,
)
// ...
// r.Run 运行 Reflector 实例
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
Reflector 对象的 Run 方法:
// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
// 进入 ListAndWatch 方法
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
}
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
var err error
var w watch.Interface
// ...
// 启动一个协程定时同步,这里不展开,同步周期就是创建 SharedInformerFactory 时传递的参数
go r.startResync(stopCh, cancelCh, resyncerrc)
// 执行 watch 操作
return r.watch(w, stopCh, resyncerrc)
}
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
// ...
for {
// ...
if w == nil {
// ...
// 调用 ListerWatcher 的 Watch 方法
w, err = r.listerWatcher.Watch(options)
// ...
}
// 开始处理 watch ,其中 r.store 是 【 DeltaFIFO 】 队列
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
// ...
}
func watchHandler(start time.Time,
w watch.Interface,
store Store,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
exitOnInitialEventsEndBookmark *bool,
clock clock.Clock,
errc chan error,
stopCh <-chan struct{},
) error {
// ...
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
// ... 收到了资源更新通知
switch event.Type {
case watch.Added:
// 将资源对象以 Add 的动作添加到 DeltaFIFO 队列中
err := store.Add(event.Object)
// ...
case watch.Modified:
// 将资源对象以 Update 的动作添加到 DeltaFIFO 队列中
err := store.Update(event.Object)
// ...
case watch.Deleted:
// 将资源对象以 Delete 的动作添加到 DeltaFIFO 队列中
err := store.Delete(event.Object)
// ...
// ...
}
}
// ...
}
可以看到,Reflector 组件在这里完成了它的功能:通过 ListerWatcher 的 Watch 功能监听 kube-apiserver 的资源变化,当发生变化时,写入到 DeltaFIFO 队列中,如图所示的第 1、2 步骤:
回到 controller 的 Run 方法:
// k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
// 创建 【 Reflector 】
r := NewReflectorWithOptions(
// ListerWatcher 实现对资源的 List 和 Watch 操作
c.config.ListerWatcher,
// DeltaFIFO 队列
c.config.Queue,
)
// ...
// r.Run 运行 Reflector 实例(完成第1、2步骤)
wg.StartWithChannel(stopCh, r.Run)
// 调用 processLoop 处理 DeltaFIFO 队列(继续完成剩下的步骤)
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
func (c *controller) processLoop() {
for {
// 从 DeltaFIFO 队列中 Pop 出资源对象,资源对象的处理方法为 c.config.Process
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
其中资源对象的处理方法 c.config.Process 实际是之前 sharedIndexInformer 的 Run 方法创建 controller 配置时所传递的 HandleDeltas 方法:
// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
// ...
// cfg 是 sharedIndexInformer controller 的配置
cfg := &Config{
// ...
// 对 DeltaFIFO 队列的处理方法(消费)
Process: s.HandleDeltas,
}
// ...
}
func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// 将 obj 转成 Deltas 类型( DeltaFIFO 队列保存的资源对象类型为 Deltas )
if deltas, ok := obj.(Deltas); ok {
// 处理 Deltas 对象
// s.indexer 是之前创建 sharedIndexInformer 实例时就创建好的 【Indexer】实例
return processDeltas(s, s.indexer, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
}
跳转到 processDeltas 方法:
// k8s.io/client-go/tools/cache/controller.go
func processDeltas(
handler ResourceEventHandler,
clientState Store,
deltas Deltas,
isInInitialList bool,
) error {
// handler 是对用户设置 AddEventHandler 资源变化的回调触发
// clientState 是 【Indexer】实例
for _, d := range deltas {
obj := d.Object
switch d.Type {
case Sync, Replaced, Added, Updated:
// 判断 Indexer 本地存储中是否已存在该资源对象
if old, exists, err := clientState.Get(obj); err == nil && exists {
// 已存在则更新到 Indexer 本地存储中
if err := clientState.Update(obj); err != nil {
return err
}
// 触发回调函数,通知用户的 UpdateFunc
handler.OnUpdate(old, obj)
} else {
// 不存在,则添加到 Indexer 本地存储中
if err := clientState.Add(obj); err != nil {
return err
}
// 触发回调函数,通知用户的 AddFunc
handler.OnAdd(obj, isInInitialList)
}
case Deleted:
// 删除 Indexer 本地存储中对应的资源对象
if err := clientState.Delete(obj); err != nil {
return err
}
// 触发回调函数,通知用户的 DeleteFunc
handler.OnDelete(obj)
}
}
return nil
}
至次,第3、4、5、6步骤也都已完成。Informer 会从 DeltaFIFO 队列中 Pop 出资源对象,然后根据事件类型对 Indexer 本地存储进行增删改查操作,同时还会触发用户设置的回调函数(ResourceEventHandlerFuncs)。
Informer 机制的分析就到这里。关于 DeltaFIFO、Reflector、Indexer 的内部实现,篇幅问题就不再展开。
微信公众号
更多内容请关注微信公众号:gopher云原生