03. KubeAPIServer 路由注册
接上回说到的 AggregatorServer 、KubeAPIServer 、APIExtensionsServer 三个服务。
重点看 API 核心服务,即 KubeAPIServer 的路由注册实现( AggregatorServer 和 APIExtensionsServer 的实现是类似的)。
回到初始化 KubeAPIServer 的 CreateKubeAPIServer 函数:
// cmd/kube-apiserver/app/server.go #230
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
return kubeAPIServerConfig.Complete().New(delegateAPIServer)
}kubeAPIServerConfig.Complete() 方法不展开,主要是用于确保 KubeAPIServer 的配置完整性和合法性,看 New 方法:
// pkg/controlplane/instance.go #352
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
// ...
// 创建 KubeAPIServer 委托对象(见上回)
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
if err != nil {
return nil, err
}
// 待会再看...
}
// k8s.io/apiserver/pkg/server/config.go #665
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
// ...
// 用于完整的处理程序链 FullHandlerChain 的构建
handlerChainBuilder := func(handler http.Handler) http.Handler {
return c.BuildHandlerChainFunc(handler, c.Config)
}
// ...
// 创建 API Server 的 Handler 处理器(见上回)
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
// 初始化 GenericAPIServer
s := &GenericAPIServer{
// ...
Handler: apiServerHandler,
// ...
}
// ...
// 注册 /、/debug/*、/metrics、/version、/apis 基本路由
installAPI(s, c.Config)
// ...
return s, nil
}首先在 c.GenericConfig.New("kube-apiserver", delegationTarget) 中会调用 installAPI 来注册 /、/debug/*、/metrics、/version 、/apis 基本路由(这是三个服务都一样的流程):
// k8s.io/apiserver/pkg/server/config.go #962
func installAPI(s *GenericAPIServer, c *Config) {
// 如果启用了索引功能,则注册 / 和 /index.html 为索引页面,展示所有的路由列表 listedPathProvider
if c.EnableIndex {
routes.Index{}.Install(s.listedPathProvider, s.Handler.NonGoRestfulMux)
}
// 如果启用了性能分析功能,则注册 /debug/* 路由
if c.EnableProfiling {
// 注册 /debug/pprof 相关路由
routes.Profiling{}.Install(s.Handler.NonGoRestfulMux)
if c.EnableContentionProfiling {
goruntime.SetBlockProfileRate(1)
}
// 注册 /debug/flags 相关路由
// so far, only logging related endpoints are considered valid to add for these debug flags.
routes.DebugFlags{}.Install(s.Handler.NonGoRestfulMux, "v", routes.StringFlagPutHandler(logs.GlogSetter))
}
// 如果启用了未进行认证授权的用于调试功能的 Socket 接口,同上为其注册 /debug/* 路由
if s.UnprotectedDebugSocket != nil {
s.UnprotectedDebugSocket.InstallProfiling()
s.UnprotectedDebugSocket.InstallDebugFlag("v", routes.StringFlagPutHandler(logs.GlogSetter))
if c.EnableContentionProfiling {
goruntime.SetBlockProfileRate(1)
}
}
// 如果启用了指标功能,注册 /metrics 路由
if c.EnableMetrics {
if c.EnableProfiling {
routes.MetricsWithReset{}.Install(s.Handler.NonGoRestfulMux)
if utilfeature.DefaultFeatureGate.Enabled(features.ComponentSLIs) {
slis.SLIMetricsWithReset{}.Install(s.Handler.NonGoRestfulMux)
}
} else {
routes.DefaultMetrics{}.Install(s.Handler.NonGoRestfulMux)
if utilfeature.DefaultFeatureGate.Enabled(features.ComponentSLIs) {
slis.SLIMetrics{}.Install(s.Handler.NonGoRestfulMux)
}
}
}
// 注册 /version 路由
routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer)
// 如果启用了服务发现功能,则注册 /apis 路由,用于展示 APIGroupList 信息
if c.EnableDiscovery {
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
wrapped := discoveryendpoint.WrapAggregatedDiscoveryToHandler(s.DiscoveryGroupManager, s.AggregatedDiscoveryGroupManager)
s.Handler.GoRestfulContainer.Add(wrapped.GenerateWebService("/apis", metav1.APIGroupList{}))
} else {
s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService())
}
}
// 安装流量控制器中间件,用于控制 API 请求优先级和公平性
if c.FlowControl != nil && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) {
c.FlowControl.Install(s.Handler.NonGoRestfulMux)
}
}借助第1回的 kubectl proxy --port=8080 可以验证下效果,例如 / 和 /index.html 路由请求数据如下:
$ curl 127.0.0.1:8080/index.html
{
"paths": [
"/api",
"/api/v1",
"/apis",
"/apis/",
............
"/apis/apps",
"/apis/apps/v1",
......
"/apis/batch",
"/apis/batch/v1",
............
............
"/logs",
"/metrics",
............
"/version"
]
}在 installAPI 过程中,同时用到了 NonGoRestfulMux(非 go-restful 的路由)和 GoRestfulContainer(go-restful 的路由)两种路由的注册方法,都比较的简单。
例如 / 和 /index.html 路由使用 NonGoRestfulMux 注册,实现如下:
// k8s.io/apiserver/pkg/server/routes/index.go
type Index struct{}
func (i Index) Install(pathProvider ListedPathProvider, mux *mux.PathRecorderMux) {
handler := IndexLister{StatusCode: http.StatusOK, PathProvider: pathProvider}
mux.UnlistedHandle("/", handler)
mux.UnlistedHandle("/index.html", handler)
}
type IndexLister struct {
StatusCode int
PathProvider ListedPathProvider
}
func (i IndexLister) ServeHTTP(w http.ResponseWriter, r *http.Request) {
responsewriters.WriteRawJSON(i.StatusCode, metav1.RootPaths{Paths: i.PathProvider.ListedPaths()}, w)
}而 /version 路由使用 GoRestfulContainer 注册,实现如下:
// k8s.io/apiserver/pkg/server/routes/version.go
type Version struct {
Version *version.Info
}
func (v Version) Install(c *restful.Container) {
if v.Version == nil {
return
}
// 使用 go-restful 的路由注册方法
versionWS := new(restful.WebService)
versionWS.Path("/version")
versionWS.Doc("git code version from which this is built")
versionWS.Route(
versionWS.GET("/").To(v.handleVersion).
Doc("get the code version").
Operation("getCodeVersion").
Produces(restful.MIME_JSON).
Consumes(restful.MIME_JSON).
Writes(version.Info{}))
c.Add(versionWS)
}
func (v Version) handleVersion(req *restful.Request, resp *restful.Response) {
responsewriters.WriteRawJSON(http.StatusOK, *v.Version, resp.ResponseWriter)
}到这里,几个基本路由注册完成,跳到本回最开始的 kubeAPIServerConfig.Complete().New(delegateAPIServer) 的 New 方法,继续来看后续:
// pkg/controlplane/instance.go #352
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
// ...
// 创建 KubeAPIServer 委托对象(见上回)
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
if err != nil {
return nil, err
}
// 注册完成了 /、/debug/*、/metrics、/version、/apis 基本路由
// 后续开始
// 相同的方法,使用 GoRestfulContainer 注册 /logs 路由
if c.ExtraConfig.EnableLogsSupport {
routes.Logs{}.Install(s.Handler.GoRestfulContainer)
}
md, err := serviceaccount.NewOpenIDMetadata(
c.ExtraConfig.ServiceAccountIssuerURL,
c.ExtraConfig.ServiceAccountJWKSURI,
c.GenericConfig.ExternalAddress,
c.ExtraConfig.ServiceAccountPublicKeys,
)
if err != nil {
// ...
} else {
// 注册 /openapi 路由
routes.NewOpenIDMetadataServer(md.ConfigJSON, md.PublicKeysetJSON).
Install(s.Handler.GoRestfulContainer)
}
m := &Instance{
GenericAPIServer: s,
ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
}
// 注册 LegacyAPI ,即核心 API ,以 /api 开头的路由在这个方法中注册
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter); err != nil {
return nil, err
}
// ...
// 注册以 /apis 开头的路由
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
// ...
return m, nil
}在注册完基本路由后,KubeAPIServer 又继续进行了 /logs 、/openapi 、/api/* 、/apis/* 路由的注册。
继续看 /api/* 路由,跳到 InstallLegacyAPI 方法,主要分成两步,第一步创建 RESTStorage 将后端存储与资源进行绑定(下回分析),第二步才开始进行路由注册:
// pkg/controlplane/instance.go #580
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter) error {
// 这里下回分析
// 主要是为 LegacyAPI 中各个资源创建 RESTStorage
// 将各种资源和对应的后端存储(etcd)的操作绑定
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(c.ExtraConfig.APIResourceConfigSource, restOptionsGetter)
if err != nil {
return fmt.Errorf("error building core storage: %v", err)
}
//...
// 注册路由
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
return nil
}第二步的路由注册:
// k8s.io/apiserver/pkg/server/genericapiserver.go #815
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
// ...
// 跳转到 installAPIResources 看具体实现
if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
return err
}
// ...
return nil
}
// k8s.io/apiserver/pkg/server/genericapiserver.go #739
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels map[string]*spec.Schema) error {
// ...
// 遍历获取 API 版本,目前对于 /api 路由只有 v1 一个版本
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
// 根据路由前缀和版本(/api/v1)获取每个路由及其绑定的 RESTStorage 的 ApiGroupVersion 信息
apiGroupVersion, err := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
// 组装好了所有路由及其对应的 RESTStorage 后,再次跳转,开始进行路由注册
discoveryAPIResources, r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
}
// ...
return nil
}
// k8s.io/apiserver/pkg/server/genericapiserver.go #904
func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion, apiPrefix string) (*genericapi.APIGroupVersion, error) {
// 存储 API 资源的信息
storage := make(map[string]rest.Storage)
// groupVersion.Version 这里的值是 v1
for k, v := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {
// k 是资源(即路由),例如 pods ,需要判断下是否为小写
if strings.ToLower(k) != k {
return nil, fmt.Errorf("resource names must be lowercase only, not %q", k)
}
// v 是资源对应的 RESTStorage
// 将 k:v 存储到 storage 中
storage[k] = v
}
version := s.newAPIGroupVersion(apiGroupInfo, groupVersion)
version.Root = apiPrefix
version.Storage = storage
return version, nil
}这里可以通过调试来验证:


继续来到 apiGroupVersion.InstallREST 方法(调用链很深):
// k8s.io/apiserver/pkg/endpoints/groupversion.go #105
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]apidiscoveryv2beta1.APIResourceDiscovery, []*storageversion.ResourceInfo, error) {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
// 继续跳转
apiResources, resourceInfos, ws, registrationErrors := installer.Install()
// ...
return aggregatedDiscoveryResources, removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}
// k8s.io/apiserver/pkg/endpoints/installer.go #184
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
var apiResources []metav1.APIResource
var resourceInfos []*storageversion.ResourceInfo
var errors []error
ws := a.newWebService()
// 先对路径进行排序,以确保在不同的环境中生成的 Swagger 规范是相同的
paths := make([]string, len(a.group.Storage))
var i int = 0
for path := range a.group.Storage {
paths[i] = path
i++
}
sort.Strings(paths)
for _, path := range paths {
// 继续,快到终点了!
apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
// ...
}
return apiResources, resourceInfos, ws, errors
}registerResourceHandlers 方法,整整 824 行代码,拆开看,不揪细节。
首先先判断 RESTStorage 支持的 verbs ,然后根据是否支持 namespace 选项来添加对应的 action :
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
// ...
// 判断 RESTStorage 支持的 verbs
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
updater, isUpdater := storage.(rest.Updater)
patcher, isPatcher := storage.(rest.Patcher)
watcher, isWatcher := storage.(rest.Watcher)
connecter, isConnecter := storage.(rest.Connecter)
storageMeta, isMetadata := storage.(rest.StorageMetadata)
storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
gvAcceptor, _ := storage.(rest.GroupVersionAcceptor)
// ...
// 根据资源是否支持 namespace 选项来添加对应的 action
switch {
case !namespaceScoped:
// 非 namespace 作用域的资源,例如 nodes
// ...省略
default:
// 支持 namespace 作用域的资源,例如 pods
namespaceParamName := "namespaces"
// Handler for standard REST verbs (GET, PUT, POST and DELETE).
namespaceParam := ws.PathParameter("namespace", "object name and auth scope, such as for teams and projects").DataType("string")
// 支持指定 namespace
// 例如 /api/v1/pods 等同于 /api/v1/namespaces/default/pods
namespacedPath := namespaceParamName + "/{namespace}/" + resource
namespaceParams := []*restful.Parameter{namespaceParam}
resourcePath := namespacedPath
resourceParams := namespaceParams
itemPath := namespacedPath + "/{name}"
nameParams := append(namespaceParams, nameParam)
proxyParams := append(nameParams, pathParam)
itemPathSuffix := ""
if isSubresource {
itemPathSuffix = "/" + subresource
itemPath = itemPath + itemPathSuffix
resourcePath = itemPath
resourceParams = nameParams
}
apiResource.Name = path
apiResource.Namespaced = true
apiResource.Kind = resourceKind
namer := handlers.ContextBasedNaming{
Namer: a.group.Namer,
ClusterScoped: false,
}
// 结合 RESTStorage 支持的 verbs,添加 action
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
// DEPRECATED in 1.11
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)
actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
if getSubpath {
actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
}
actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
// DEPRECATED in 1.11
actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)
// list or post across namespace.
// For ex: LIST all pods in all namespaces by sending a LIST request at /api/apiVersion/pods.
// TODO: more strongly type whether a resource allows these actions on "all namespaces" (bulk delete)
if !isSubresource {
actions = appendIf(actions, action{"LIST", resource, params, namer, true}, isLister)
// DEPRECATED in 1.11
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resource, params, namer, true}, allowWatchList)
}
}
// ...
return &apiResource, resourceInfo, nil
}以 pods 资源为例,就支持了 11 种 action :

最后就可以根据每个 action 来添加对应的 handler 并注册到路由中,以 GET 为例:
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
// ...
for _, action := range actions {
switch action.Verb {
case "GET": // Get a resource.
// 初始化 handler
var handler restful.RouteFunction
if isGetterWithOptions {
handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
} else {
handler = restfulGetResource(getter, reqScope)
}
if needOverride {
// need change the reported verb
handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb), group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
} else {
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
}
handler = utilwarning.AddWarningsHandler(handler, warnings)
doc := "read the specified " + kind
if isSubresource {
doc = "read " + subresource + " of the specified " + kind
}
// 熟悉的方法,使用 go-restful 的路由注册方法
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Writes(producedObject)
if isGetterWithOptions {
if err := AddObjectParams(ws, route, versionedGetOptions); err != nil {
return nil, nil, err
}
}
addParams(route, action.Params)
routes = append(routes, route)
case "LIST": // List all resources of a kind.
// ...
case "PUT": // Update a resource.
// ...
case "PATCH": // Partially update a resource
// ...
case "POST": // Create a resource.
// ...
case "DELETE": // Delete a resource.
// ...
case "DELETECOLLECTION":
// ...
// deprecated in 1.11
case "WATCH": // Watch a resource.
// ...
// deprecated in 1.11
case "WATCHLIST": // Watch all resources of a kind.
// ...
case "CONNECT":
// ...
default:
return nil, nil, fmt.Errorf("unrecognized action verb: %s", action.Verb)
}
for _, route := range routes {
route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
Group: reqScope.Kind.Group,
Version: reqScope.Kind.Version,
Kind: reqScope.Kind.Kind,
})
route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
// 使用 go-restful 的路由注册方法
ws.Route(route)
}
// Note: update GetAuthorizerAttributes() when adding a custom handler.
}
// ...
return &apiResource, resourceInfo, nil
}同样可以进行调试,以 pods 资源为例:
# 等价于 127.0.0.1:8080/api/v1/namespaces/default/pods
$ curl http://127.0.0.1:8080/api/v1/pods
{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {
"resourceVersion": "5210"
},
"items": [
{
"metadata": {
"name": "nginx",
"namespace": "default",
"uid": "65d17c9d-5dc6-437f-81e2-6d31e476417a",
"resourceVersion": "1874",
"creationTimestamp": "2023-06-04T08:56:21Z",
"annotations": {
"kubectl.kubernetes.io/last-applied-configuration": "{\"apiVersion\":\"v1\",\"kind\":\"Pod\",\"metadata\":{\"annotations\":{},\"name\":\"nginx\",\"namespace\":\"default\"},\"spec\":{\"containers\":[{\"image\":\"nginx:1.14.2\",\"name\":\"nginx\",\"ports\":[{\"containerPort\":80}]}]}}\n"
},
"managedFields": [...]
},
"spec": {
"volumes": [...],
"containers": [...],
"restartPolicy": "Always",
"terminationGracePeriodSeconds": 30,
"dnsPolicy": "ClusterFirst",
"serviceAccountName": "default",
"serviceAccount": "default",
"securityContext": {},
"schedulerName": "default-scheduler",
"tolerations": [...],
"priority": 0,
"enableServiceLinks": true,
"preemptionPolicy": "PreemptLowerPriority"
},
"status": {
"phase": "Pending",
"qosClass": "BestEffort"
}
}
]
}到这里,所有路由注册就完成了。这里就不展开 handler 的初始化了。
至于最后的 /apis/* 路由的注册,即 InstallAPIs 方法,其主要流程和 InstallLegacyAPI 方法是一致的,也不再赘述了。
本回先到这里,下一回再讲解 RESTStorage 的创建过程。
微信公众号
更多内容请关注微信公众号:gopher云原生