02. apiserver 的 HTTP Server 的初始化

02. apiserver 的 HTTP Server 的初始化

接上回,apiserver 对启动参数进行验证后,就会调用 Run() 启动函数,并传递经过验证的选项配置 completeOptions 以及一个停止信号的通道 stopCh ,函数的定义如下:

func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
	// To help debugging, immediately log version
	klog.Infof("Version: %+v", version.Get())

	klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))

	// 1、创建服务调用链
	server, err := CreateServerChain(completeOptions)
	if err != nil {
		return err
	}

	// 2、进行服务启动前的准备工作
	prepared, err := server.PrepareRun()
	if err != nil {
		return err
	}

	// 3、服务启动
	return prepared.Run(stopCh)
}

略过日志打印,可以看到整个启动函数可以分为三个步骤:

  • CreateServerChain :创建服务调用链。该函数负责创建各种不同 API Server 的配置并初始化,最后构建出完整的 API Server 链式结构
  • PrepareRun:服务启动前的准备工作。该函数负责进行健康检查、存活检查和 OpenAPI 路由的注册工作,以便 apiserver 能够顺利地运行
  • Run:服务启动。该函数启动 HTTP Server 实例并开始监听和处理来自客户端的请求

首先看服务调用链的创建,在这里会根据不同功能进行解耦,创建出三个不同的 API Server :

  • AggregatorServer:API 聚合服务。用于实现 Kubernetes API 聚合层 的功能,当 AggregatorServer 接收到请求之后,如果发现对应的是一个 APIService 的请求,则会直接转发到对应的服务上(自行编写和部署的 API 服务器),否则则委托给 KubeAPIServer 进行处理
  • KubeAPIServer:API 核心服务。实现认证、鉴权以及所有 Kubernetes 内置资源的 REST API 接口(诸如 Pod 和 Service 等资源的接口),如果请求未能找到对应的处理,则委托给 APIExtensionsServer 进行处理
  • APIExtensionsServer:API 扩展服务。处理 CustomResourceDefinitions(CRD)和 Custom Resource(CR)的 REST 请求(自定义资源的接口),也是服务链的最后一环,如果请求仍不能被处理则委托给 404 Handler 处理
func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) {
	// 为 KubeAPIServer 创建配置
	kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
	if err != nil {
		return nil, err
	}

	// 为 APIExtensionsServer 创建配置
	apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
		serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
	if err != nil {
		return nil, err
	}

	// 1、初始化 APIExtensionsServer
	notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
	apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
	if err != nil {
		return nil, err
	}

	// 2、初始化 KubeAPIServer
	kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
	if err != nil {
		return nil, err
	}

	// 为 AggregatorServer 创建配置
	aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
	if err != nil {
		return nil, err
	}
	// 3、初始化 AggregatorServer
	aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
	if err != nil {
		// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
		return nil, err
	}

	// 返回的是最后的 AggregatorServer
	return aggregatorServer, nil
}

这三个服务通过委托模式连接在一起,形成了一个链式结构,函数最后返回的 AggregatorServer 服务为头结点。把这个逻辑搞懂。

这三个服务的类型

  • APIExtensionsServer*apiextensionsapiserver.CustomResourceDefinitions
  • KubeAPIServer*controlplane.Instance
  • AggregatorServer*aggregatorapiserver.APIAggregator
// APIExtensionsServer 类型
type CustomResourceDefinitions struct {
	GenericAPIServer *genericapiserver.GenericAPIServer

	// ...
}

// KubeAPIServer 类型
type Instance struct {
	GenericAPIServer *genericapiserver.GenericAPIServer

	// ...
}

// AggregatorServer 类型
type APIAggregator struct {
	GenericAPIServer *genericapiserver.GenericAPIServer

	// ...
}

都有一个共同点,包含了 GenericAPIServer 成员,而该成员实现了 DelegationTarget 接口:

type DelegationTarget interface {
	// ...

	// 获取委托链中的下一个委托对象
	NextDelegate() DelegationTarget

	// 执行 API Server 安装设置步骤
	PrepareRun() preparedGenericAPIServer

	// ...
}

// 实现了 DelegationTarget 接口
type GenericAPIServer struct {
  // ...
	// delegationTarget是链中的下一个委托对象
	delegationTarget DelegationTarget
  // ...
}

// 实现 NextDelegate 方法
func (s *GenericAPIServer) NextDelegate() DelegationTarget {
	return s.delegationTarget
}

// 实现 PrepareRun 方法,会递归调用委托对象的 PrepareRun 方法,直到最后一个
func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
  // 调用下一个委托对象的 PrepareRun 方法
	s.delegationTarget.PrepareRun()

  // OpenAPI 路由的注册
	if s.openAPIConfig != nil && !s.skipOpenAPIInstallation {
		s.OpenAPIVersionedService, s.StaticOpenAPISpec = routes.OpenAPI{
			Config: s.openAPIConfig,
		}.InstallV2(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux)
	}

	if s.openAPIV3Config != nil && !s.skipOpenAPIInstallation {
		if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
			s.OpenAPIV3VersionedService = routes.OpenAPI{
				Config: s.openAPIV3Config,
			}.InstallV3(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux)
		}
	}

  // 健康检查
	s.installHealthz()
  // 存活检查
	s.installLivez()

	// as soon as shutdown is initiated, readiness should start failing
	readinessStopCh := s.lifecycleSignals.ShutdownInitiated.Signaled()
	err := s.addReadyzShutdownCheck(readinessStopCh)
	if err != nil {
		klog.Errorf("Failed to install readyz shutdown check %s", err)
	}
  // 启动准备就绪检查
	s.installReadyz()

	return preparedGenericAPIServer{s}
}

基于委托模式,重新看 CreateServerChain 函数,从尾节点开始依次创建 API Server 委托对象:

// 0、初始化 404 Handler Server
notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)

// 1、初始化 APIExtensionsServer ,传入 404 Handler Server 作为下一个委托对象
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))

// 2、初始化 KubeAPIServer ,传入 APIExtensionsServer 作为下一个委托对象
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)

// 3、初始化 AggregatorServer ,传入 KubeAPIServer 作为下一个委托对象
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)

// 4、返回 AggregatorServer
return aggregatorServer, nil

先看 createAPIExtensionsServer :

func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
	return apiextensionsConfig.Complete().New(delegateAPIServer)
}

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
	// 创建 APIExtensionsServer 委托对象,并传入所指向的下一个委托对象,这里是 404 Handler Server
  genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
	
  // ...
}

再看 CreateKubeAPIServer:

func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
	return kubeAPIServerConfig.Complete().New(delegateAPIServer)
}

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
	// ...

	// 创建 KubeAPIServer 委托对象,并传入所指向的下一个委托对象,这里是 APIExtensionsServer
	s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)

  // ...
}

最后看 createAggregatorServer:

func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
	aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)

	// ...
}

func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
	// 创建 AggregatorServer 委托对象,并传入所指向的下一个委托对象,这里是 KubeAPIServer
	genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
	
	// ...
}

可以看到三个服务的初始化过程都是一样,调用 c.GenericConfig.New("server name", delegationTarget) 方法:

func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
	// ...

  // 创建 API Server 的 Handler 处理器
	apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())

	s := &GenericAPIServer{
		// 保存下一个委托对象
		delegationTarget:               delegationTarget,
		// 保存当前委托对象 API Server 的 Handler 处理器
		Handler:                        apiServerHandler,
		// ...
	}

	// ...
	return s, nil
}

在继续看 NewAPIServerHandler 函数之前,先了解一下 github.com/emicklei/go-restful ,因为 apiserver 就是使用这个库实现的 RESTful API 。

直接用一个小例子搞懂 apiserver 中 go-restful 的使用:

package main

import (
	"net/http"

	"github.com/emicklei/go-restful/v3"

	"k8s.io/apiserver/pkg/server"
	"k8s.io/kubernetes/pkg/api/legacyscheme"
)

func main() {
	// 创建 API Server 的 Handler 处理器
	handler := server.NewAPIServerHandler(
		"test-server",
		legacyscheme.Codecs,
		func(apiHandler http.Handler) http.Handler {
			return apiHandler
		},
		nil)

	// 注册路由
	testApisV1 := new(restful.WebService).Path("/apis/test/v1")
	{
		testApisV1.Route(testApisV1.GET("hello").To(
			func(req *restful.Request, resp *restful.Response) {
				resp.WriteAsJson(map[string]interface{}{"k": "v"})
			},
		)).Doc("hello endpoint")
	}

	// 路由添加到 GoRestfulContainer
	handler.GoRestfulContainer.Add(testApisV1)

	// 启动监听服务
	panic(http.ListenAndServe(":8080", handler))
}

// $ curl 127.0.0.1:8080/apis/test/v1/hello
// {
// "k": "v"
// }

继续看源码,可以看到 apiserver 实际只是对 go-restful 进行了一些简单的封装,使用了其中的一些基本方法。不过因为 go-restful 对于一些 API 有兼容性问题,因此引入了 Director 机制来选择是使用 GoRestfulContainer 还是 NonGoRestfulMux :


// 处理顺序:FullHandlerChain -> Director(选择) -> {GoRestfulContainer,NonGoRestfulMux}
type APIServerHandler struct {
	// 完整的处理程序链,包含了所有的中间件和处理程序
	FullHandlerChain http.Handler

	// 注册和管理 go-restful 的路由和处理程序
	GoRestfulContainer *restful.Container
	// 注册和管理非 go-restful 的路由和处理程序
	NonGoRestfulMux *mux.PathRecorderMux

	// 根据已注册的 web 服务检查来选择使用哪个处理程序(gorestful 或非 gorestful)
	Director http.Handler
}

func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
	// 非 go-restful ,以下称 mux 的初始化
	nonGoRestfulMux := mux.NewPathRecorderMux(name)
	if notFoundHandler != nil {
		// 自定义 404 处理器
		nonGoRestfulMux.NotFoundHandler(notFoundHandler)
	}

	// go-restful 的初始化
	gorestfulContainer := restful.NewContainer()
	gorestfulContainer.ServeMux = http.NewServeMux()
	gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
	gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
		logStackOnRecover(s, panicReason, httpWriter)
	})
	gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
		serviceErrorHandler(s, serviceErr, request, response)
	})

	// 声明 director ,后续使用 director 来决定是使用 mux 还是 go-restful
	director := director{
		name:               name,
		goRestfulContainer: gorestfulContainer,
		nonGoRestfulMux:    nonGoRestfulMux,
	}

	// 返回 APIServerHandler
	return &APIServerHandler{
		// 在 director 基础上增加中间件,得到完整的处理程序链
		FullHandlerChain:   handlerChainBuilder(director),
		GoRestfulContainer: gorestfulContainer,
		NonGoRestfulMux:    nonGoRestfulMux,
		Director:           director,
	}
}

至此,HTTP Server 的初始化完成。

微信公众号

更多内容请关注微信公众号:gopher云原生