06. kube-apiserver handler 处理流程

06. kube-apiserver handler 处理流程

在第3回核心 API 的路由注册 InstallLegacyAPI 过程中,最后会执行 registerResourceHandlers 方法:

// k8s.io/apiserver/pkg/endpoints/installer.go

// path 是资源的请求路径(不包含前缀和版本),例如 pods 、pods/attach 、pods/log 等
// storage 是 path 所对应的 RESTStorage 实现
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
	// ...

	// ...
	// 判断 storage 是否实现了 rest.Lister 接口,即支持 LIST 请求
	lister, isLister := storage.(rest.Lister)
	// 判断 storage 是否实现了 rest.Getter 接口,即支持 GET 请求
	getter, isGetter := storage.(rest.Getter)
	// ...

	switch {
	case !namespaceScoped:
		// ...
	default:
		// ...
		// 如果资源支持 LIST 请求,则添加到 actions
		actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
		// ...
		// 如果资源支持 GET 请求,则添加到 actions
		actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
		// ...
	}

	// 遍历 actions ,为资源支持的请求类型,设置 handler 并进行路由注册
	for _, action := range actions {
		// ...

		switch action.Verb {
		case "GET": // Get a resource.

			// 初始化资源的 GEI 请求的 handler
			var handler restful.RouteFunction
			if isGetterWithOptions {
				handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
			} else {
				handler = restfulGetResource(getter, reqScope)
			}

			// ...
			// 路由注册,绑定 handler
			route := ws.GET(action.Path).To(handler).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
				Returns(http.StatusOK, "OK", producedObject).
				Writes(producedObject)
			if isGetterWithOptions {
				if err := AddObjectParams(ws, route, versionedGetOptions); err != nil {
					return nil, nil, err
				}
			}
			addParams(route, action.Params)
			routes = append(routes, route)
		case "LIST": // List all resources of a kind.
			// ...
			// 初始化资源的 LIST 请求的 handler
			handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
			handler = utilwarning.AddWarningsHandler(handler, warnings)
			// 路由注册,绑定 handler
			route := ws.GET(action.Path).To(handler).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("list"+namespaced+kind+strings.Title(subresource)+operationSuffix).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), allMediaTypes...)...).
				Returns(http.StatusOK, "OK", versionedList).
				Writes(versionedList)
			if err := AddObjectParams(ws, route, versionedListOptions); err != nil {
				return nil, nil, err
			}
			switch {
			case isLister && isWatcher:
				doc := "list or watch objects of kind " + kind
				if isSubresource {
					doc = "list or watch " + subresource + " of objects of kind " + kind
				}
				route.Doc(doc)
			case isWatcher:
				doc := "watch objects of kind " + kind
				if isSubresource {
					doc = "watch " + subresource + "of objects of kind " + kind
				}
				route.Doc(doc)
			}
			addParams(route, action.Params)
			routes = append(routes, route)
		case "PUT": // Update a resource.
			// 同上
		case "PATCH": // Partially update a resource
			// 同上
		case "POST": // Create a resource.
			// 同上
		case "DELETE": // Delete a resource.
			// 同上
		case "DELETECOLLECTION":
			// 同上
		// deprecated in 1.11
		case "WATCH": // Watch a resource.
			// 同上
		// deprecated in 1.11
		case "WATCHLIST": // Watch all resources of a kind.
			// 同上
		case "CONNECT":
			// 同上
		default:
			return nil, nil, fmt.Errorf("unrecognized action verb: %s", action.Verb)
		}
		// ...
	}

	return &apiResource, resourceInfo, nil
}

其中资源的 RESTStorage 实现即 storage 会进行类型断言来判断是否实现了 rest.Listerrest.Getter 等接口:

// k8s.io/apiserver/pkg/registry/rest/rest.go

// Lister is an object that can retrieve resources that match the provided field and label criteria.
type Lister interface {
	// NewList returns an empty object that can be used with the List call.
	// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
	NewList() runtime.Object
	// List selects resources in the storage which match to the selector. 'options' can be nil.
	List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error)
	// TableConvertor ensures all list implementers also implement table conversion
	TableConvertor
}

// Getter is an object that can retrieve a named RESTful resource.
type Getter interface {
	// Get finds a resource in the storage by name and returns it.
	// Although it can return an arbitrary error value, IsNotFound(err) is true for the
	// returned error value err when the specified resource is not found.
	Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error)
}

以 LIST 请求为例(其它请求类似),当对一个资源发起 LIST 请求时,会执行 metrics.InstrumentRouteFunc 方法:

// k8s.io/apiserver/pkg/endpoints/metrics/metrics.go

func InstrumentRouteFunc(verb, group, version, resource, subresource, scope, component string, deprecated bool, removedRelease string, routeFunc restful.RouteFunction) restful.RouteFunction {
	return restful.RouteFunction(func(req *restful.Request, response *restful.Response) {
		
		requestReceivedTimestamp, ok := request.ReceivedTimestampFrom(req.Request.Context())
		if !ok {
			requestReceivedTimestamp = time.Now()
		}

		// 对 response.ResponseWriter 重新包装,以便在后续的处理中对响应进行更加灵活的控制
		delegate := &ResponseWriterDelegator{ResponseWriter: response.ResponseWriter}

		rw := responsewriter.WrapForHTTP1Or2(delegate)
		response.ResponseWriter = rw

		// 执行 routeFunc 处理请求
		routeFunc(req, response)

		// 对网络请求进行监视和跟踪,不展开
		MonitorRequest(req.Request, verb, group, version, resource, subresource, scope, component, deprecated, removedRelease, delegate.Status(), delegate.ContentLength(), time.Since(requestReceivedTimestamp))
	})
}

跳到 routeFunc 方法,即传入的 restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout) 方法,其中 lister 参数就是 storage 经过类型断言得到的 rest.Lister 接口的实现:

// k8s.io/apiserver/pkg/endpoints/installer.go

func restfulListResource(r rest.Lister, rw rest.Watcher, scope handlers.RequestScope, forceWatch bool, minRequestTimeout time.Duration) restful.RouteFunction {
	return func(req *restful.Request, res *restful.Response) {
		// 跳转
		handlers.ListResource(r, rw, &scope, forceWatch, minRequestTimeout)(res.ResponseWriter, req.Request)
	}
}

// k8s.io/apiserver/pkg/endpoints/handlers/get.go

func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
	return func(w http.ResponseWriter, req *http.Request) {
		// ... 一些请求参数的配置校验(忽略)

		if opts.Watch || forceWatch {
			// ... watch 功能(忽略)
			return
		}

		// Log only long List requests (ignore Watch).
		defer span.End(500 * time.Millisecond)
		span.AddEvent("About to List from storage")

		// 调用 rest.Lister 接口的 List 方法获取数据
		result, err := r.List(ctx, &opts)
		if err != nil {
			scope.err(err, w, req)
			return
		}
		span.AddEvent("Listing from storage done")
		defer span.AddEvent("Writing http response done", attribute.Int("count", meta.LenList(result)))
		transformResponseObject(ctx, scope, req, w, http.StatusOK, outputMediaType, result)
	}
}

可以看到 handler 处理流程的最后一步是直接调用 rest.Lister 接口的 List 方法来获取数据的。

到这里已经没法继续跟下去了,得回过头看看 RESTStorage 的实现过程中是如何实现 rest.Lister 接口的,以 pods path 为例,来到第4回创建 RESTStorage 的 NewLegacyRESTStorage 方法:

// pkg/registry/core/rest/storage_core.go

func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
	apiGroupInfo := genericapiserver.APIGroupInfo{
		// ...
		VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
	}
	// ...

	// Pod 资源的 RESTStorage 初始化
	podStorage, err := podstore.NewStorage(
		restOptionsGetter,
		nodeStorage.KubeletConnectionInfo,
		c.ProxyTransport,
		podDisruptionClient,
	)
	if err != nil {
		return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
	}
	// ...

	storage := map[string]rest.Storage{}
	// 添加 Pod 资源到 storage 
	if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
		// 这个 podStorage.Pod 就是 path 为 pods 的 rest.Storage
		// 后续经过一系列的调用就会传递到路由注册的 registerResourceHandlers 方法
		storage[resource] = podStorage.Pod
		storage[resource+"/attach"] = podStorage.Attach
		storage[resource+"/status"] = podStorage.Status
		storage[resource+"/log"] = podStorage.Log
		storage[resource+"/exec"] = podStorage.Exec
		storage[resource+"/portforward"] = podStorage.PortForward
		storage[resource+"/proxy"] = podStorage.Proxy
		storage[resource+"/binding"] = podStorage.Binding
		if podStorage.Eviction != nil {
			storage[resource+"/eviction"] = podStorage.Eviction
		}
		storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers

	}
	// ... 添加其它资源到 storage

	if len(storage) > 0 {
		// 将 storage 存到 apiGroupInfo.VersionedResourcesStorageMap
		apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage
	}

	// 返回 apiGroupInfo
	return restStorage, apiGroupInfo, nil
}

podStorage.Pod 的实现就是我们要找的,看到 podstore.NewStorage 方法:

// pkg/registry/core/pod/storage/storage.go

func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {

	store := &genericregistry.Store{
		NewFunc:                   func() runtime.Object { return &api.Pod{} },
		NewListFunc:               func() runtime.Object { return &api.PodList{} },
		PredicateFunc:             registrypod.MatchPod,
		DefaultQualifiedResource:  api.Resource("pods"),
		SingularQualifiedResource: api.Resource("pod"),

		CreateStrategy:      registrypod.Strategy,
		UpdateStrategy:      registrypod.Strategy,
		DeleteStrategy:      registrypod.Strategy,
		ResetFieldsStrategy: registrypod.Strategy,
		ReturnDeletedObject: true,

		TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
	}
	// ...
	return PodStorage{
		// Pod 初始化为 REST 对象
		Pod:                 &REST{store, proxyTransport},
		Binding:             &BindingREST{store: store},
		LegacyBinding:       &LegacyBindingREST{bindingREST},
		Eviction:            newEvictionStorage(&statusStore, podDisruptionBudgetClient),
		Status:              &StatusREST{store: &statusStore},
		EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
		Log:                 &podrest.LogREST{Store: store, KubeletConn: k},
		Proxy:               &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
		Exec:                &podrest.ExecREST{Store: store, KubeletConn: k},
		Attach:              &podrest.AttachREST{Store: store, KubeletConn: k},
		PortForward:         &podrest.PortForwardREST{Store: store, KubeletConn: k},
	}, nil
}

REST 对象就是 pods path 对应的 RESTStorage 实现,而其中内嵌的 genericregistry.Store 对象实现了 rest.Lister 接口:

// pkg/registry/core/pod/storage/storage.go

type REST struct {
	*genericregistry.Store
	proxyTransport http.RoundTripper
}

// k8s.io/apiserver/pkg/registry/generic/registry/store.go

// REST 内嵌的 Store
// 实际是 Store 实现了 rest.Lister ,相当于 REST 实现了
type Store struct {
	// ...
}

func (e *Store) NewList() runtime.Object {
	return e.NewListFunc()
}

func (e *Store) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
	// ...
}

也就是说 path 为 pods 的 LIST 请求的 handler 处理流程会来到 Store.List 方法:

// k8s.io/apiserver/pkg/registry/generic/registry/store.go

func (e *Store) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
	// 标签选择器
	label := labels.Everything()
	if options != nil && options.LabelSelector != nil {
		label = options.LabelSelector
	}
	// 字段选择器
	field := fields.Everything()
	if options != nil && options.FieldSelector != nil {
		field = options.FieldSelector
	}
	// 调用 ListPredicate 获取数据
	out, err := e.ListPredicate(ctx, e.PredicateFunc(label, field), options)
	if err != nil {
		return nil, err
	}
	if e.Decorator != nil {
		e.Decorator(out)
	}
	return out, nil
}

继续看到 ListPredicate 方法:

// k8s.io/apiserver/pkg/registry/generic/registry/store.go

func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate, options *metainternalversion.ListOptions) (runtime.Object, error) {
	if options == nil {
		// 调用 etcd 列表资源的默认参数选项
		options = &metainternalversion.ListOptions{ResourceVersion: ""}
	}
	p.Limit = options.Limit
	p.Continue = options.Continue

	// 调用 NewListFunc 获取具体的资源类型,这里是 &api.PodList{}
	// 返回结果会存到这里
	list := e.NewListFunc()
	qualifiedResource := e.qualifiedResourceFromContext(ctx)
	storageOpts := storage.ListOptions{
		ResourceVersion:      options.ResourceVersion,
		ResourceVersionMatch: options.ResourceVersionMatch,
		Predicate:            p,
		Recursive:            true,
	}

	// ...
	// 如果请求指定了 metadata.name ,则直接获取单个 object ,无需对全量数据做过滤
	if name, ok := p.MatchesSingle(); ok {
		if key, err := e.KeyFunc(ctx, name); err == nil {
			storageOpts.Recursive = false
			err := e.Storage.GetList(ctx, key, storageOpts, list)
			return list, storeerr.InterpretListError(err, qualifiedResource)
		}
		// 如果不行,则跳过优化
	}

	// 调用底层 storage.Interface 接口的 GetList 方法,查询全量数据过滤后写入到 list
	err := e.Storage.GetList(ctx, e.KeyRootFunc(ctx), storageOpts, list)
	return list, storeerr.InterpretListError(err, qualifiedResource)
}

storage.Interface 接口的实现有两种,一种是带缓存的 cacher.Cacher 需要指定 --watch-cache-sizes 参数开启,另一种是默认的不带缓存的 etcd3.store ,在第4回的 GetRESTOptions 方法中进行设置:

// k8s.io/apiserver/pkg/server/options/etcd.go

func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
	storageConfig, err := f.StorageFactory.NewConfig(resource)
	if err != nil {
		return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
	}

	ret := generic.RESTOptions{
		StorageConfig:             storageConfig,
		Decorator:                 generic.UndecoratedStorage,
		DeleteCollectionWorkers:   f.Options.DeleteCollectionWorkers,
		EnableGarbageCollection:   f.Options.EnableGarbageCollection,
		ResourcePrefix:            f.StorageFactory.ResourcePrefix(resource),
		CountMetricPollPeriod:     f.Options.StorageConfig.CountMetricPollPeriod,
		StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
	}

	if f.Options.EnableWatchCache {
		sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
		if err != nil {
			return generic.RESTOptions{}, err
		}
		size, ok := sizes[resource]
		if ok && size > 0 {
			klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource)
		}
		if ok && size <= 0 {
			klog.V(3).InfoS("Not using watch cache", "resource", resource)
			// 不使用 cache 的 Storage 实现
			ret.Decorator = generic.UndecoratedStorage
		} else {
			klog.V(3).InfoS("Using watch cache", "resource", resource)
			// 使用 cache 的 Storage 实现
			ret.Decorator = genericregistry.StorageWithCacher()
		}
	}

	return ret, nil
}

来看默认的不使用 cache 的 Storage 实现方案,generic.UndecoratedStorage 的跳转过程在第4回讲过了,这里直接看到最终的 newETCD3Storage 方法:

// k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go

func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
	// etcd v3 客户端
	client, err := newETCD3Client(c.Transport)
	if err != nil {
		stopCompactor()
		return nil, nil, err
	}
	// 返回 storage.Interface 实现
	return etcd3.New(client, c.Codec, newFunc, c.Prefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
}

// k8s.io/apiserver/pkg/storage/etcd3/store.go

// store 就是 storage.Interface 接口的实现
type store struct {
	client              *clientv3.Client
	codec               runtime.Codec
	versioner           storage.Versioner
	transformer         value.Transformer
	pathPrefix          string
	groupResource       schema.GroupResource
	groupResourceString string
	watcher             *watcher
	pagingEnabled       bool
	leaseManager        *leaseManager
}

func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
	return newStore(c, codec, newFunc, prefix, groupResource, transformer, pagingEnabled, leaseManagerConfig)
}

func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
	// ...
	result := &store{
		client:              c,
		codec:               codec,
		versioner:           versioner,
		transformer:         transformer,
		pagingEnabled:       pagingEnabled,
		pathPrefix:          pathPrefix,
		groupResource:       groupResource,
		groupResourceString: groupResource.String(),
		watcher:             newWatcher(c, codec, groupResource, newFunc, versioner),
		leaseManager:        newDefaultLeaseManager(c, leaseManagerConfig),
	}
	return result
}

// handler 的处理流程的终点站
func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
	// ...
}

也就是说最终 handler 的处理流程是 GetList 方法:

// k8s.io/apiserver/pkg/storage/etcd3/store.go

func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
	// 将 key 解析成对应 etcd 的 preparedKey ,例如 /registry/pods/default
	preparedKey, err := s.prepareKey(key)
	if err != nil {
		return err
	}
	// ...

	// 转换 listObj 为指针类型 listPtr
	listPtr, err := meta.GetItemsPtr(listObj)
	if err != nil {
		return err
	}
	// 将 listPtr 转化为 reflect.Value 类型,后续直接通过 v 对象操作 listObj 的值
	v, err := conversion.EnforcePtr(listPtr)
	if err != nil || v.Kind() != reflect.Slice {
		return fmt.Errorf("need ptr to slice: %v", err)
	}

	// ... 一些查询数据的 options 配置

	for {
		startTime := time.Now()
		// 根据 preparedKey 和 options 调用 go.etcd.io/etcd/client/v3 库从 etcd 中查询数据
		getResp, err = s.client.KV.Get(ctx, preparedKey, options...)
		
		// ...

		// 对数据进行筛选,只选择符合条件的项目,然后将它们放入到 listObj 中保存
		for i, kv := range getResp.Kvs {
			if paging && int64(v.Len()) >= pred.Limit {
				hasMore = true
				break
			}
			lastKey = kv.Key

			data, _, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(kv.Key))
			if err != nil {
				return storage.NewInternalErrorf("unable to transform key %q: %v", kv.Key, err)
			}

			// 筛选后调用 v.Set 保存到 listObj 中
			if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil {
				recordDecodeError(s.groupResourceString, string(kv.Key))
				return err
			}
			numEvald++

			// 清掉数据,减少内存使用
			getResp.Kvs[i] = nil
		}

		// ... 一些退出循环的判断
	}

	// ...
}

到这里,handler 的处理流程就算走完了。

上个调试来结束本回,打个断点:

Untitled

执行 kubectl get pods 命令:

Untitled

Untitled

微信公众号

更多内容请关注微信公众号:gopher云原生