02. apiserver 的 HTTP Server 的初始化
接上回,apiserver 对启动参数进行验证后,就会调用 Run()
启动函数,并传递经过验证的选项配置 completeOptions
以及一个停止信号的通道 stopCh
,函数的定义如下:
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
// 1、创建服务调用链
server, err := CreateServerChain(completeOptions)
if err != nil {
return err
}
// 2、进行服务启动前的准备工作
prepared, err := server.PrepareRun()
if err != nil {
return err
}
// 3、服务启动
return prepared.Run(stopCh)
}
略过日志打印,可以看到整个启动函数可以分为三个步骤:
CreateServerChain
:创建服务调用链。该函数负责创建各种不同 API Server 的配置并初始化,最后构建出完整的 API Server 链式结构PrepareRun
:服务启动前的准备工作。该函数负责进行健康检查、存活检查和 OpenAPI 路由的注册工作,以便 apiserver 能够顺利地运行Run
:服务启动。该函数启动 HTTP Server 实例并开始监听和处理来自客户端的请求
首先看服务调用链的创建,在这里会根据不同功能进行解耦,创建出三个不同的 API Server :
AggregatorServer
:API 聚合服务。用于实现 Kubernetes API 聚合层 的功能,当 AggregatorServer 接收到请求之后,如果发现对应的是一个 APIService 的请求,则会直接转发到对应的服务上(自行编写和部署的 API 服务器),否则则委托给 KubeAPIServer 进行处理KubeAPIServer
:API 核心服务。实现认证、鉴权以及所有 Kubernetes 内置资源的 REST API 接口(诸如 Pod 和 Service 等资源的接口),如果请求未能找到对应的处理,则委托给 APIExtensionsServer 进行处理APIExtensionsServer
:API 扩展服务。处理 CustomResourceDefinitions(CRD)和 Custom Resource(CR)的 REST 请求(自定义资源的接口),也是服务链的最后一环,如果请求仍不能被处理则委托给 404 Handler 处理
func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) {
// 为 KubeAPIServer 创建配置
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
if err != nil {
return nil, err
}
// 为 APIExtensionsServer 创建配置
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
if err != nil {
return nil, err
}
// 1、初始化 APIExtensionsServer
notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
// 2、初始化 KubeAPIServer
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}
// 为 AggregatorServer 创建配置
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
// 3、初始化 AggregatorServer
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
}
// 返回的是最后的 AggregatorServer
return aggregatorServer, nil
}
这三个服务通过委托模式连接在一起,形成了一个链式结构,函数最后返回的 AggregatorServer
服务为头结点。把这个逻辑搞懂。
这三个服务的类型
APIExtensionsServer
:*apiextensionsapiserver.CustomResourceDefinitions
KubeAPIServer
:*controlplane.Instance
AggregatorServer
:*aggregatorapiserver.APIAggregator
// APIExtensionsServer 类型
type CustomResourceDefinitions struct {
GenericAPIServer *genericapiserver.GenericAPIServer
// ...
}
// KubeAPIServer 类型
type Instance struct {
GenericAPIServer *genericapiserver.GenericAPIServer
// ...
}
// AggregatorServer 类型
type APIAggregator struct {
GenericAPIServer *genericapiserver.GenericAPIServer
// ...
}
都有一个共同点,包含了 GenericAPIServer
成员,而该成员实现了 DelegationTarget
接口:
type DelegationTarget interface {
// ...
// 获取委托链中的下一个委托对象
NextDelegate() DelegationTarget
// 执行 API Server 安装设置步骤
PrepareRun() preparedGenericAPIServer
// ...
}
// 实现了 DelegationTarget 接口
type GenericAPIServer struct {
// ...
// delegationTarget是链中的下一个委托对象
delegationTarget DelegationTarget
// ...
}
// 实现 NextDelegate 方法
func (s *GenericAPIServer) NextDelegate() DelegationTarget {
return s.delegationTarget
}
// 实现 PrepareRun 方法,会递归调用委托对象的 PrepareRun 方法,直到最后一个
func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
// 调用下一个委托对象的 PrepareRun 方法
s.delegationTarget.PrepareRun()
// OpenAPI 路由的注册
if s.openAPIConfig != nil && !s.skipOpenAPIInstallation {
s.OpenAPIVersionedService, s.StaticOpenAPISpec = routes.OpenAPI{
Config: s.openAPIConfig,
}.InstallV2(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux)
}
if s.openAPIV3Config != nil && !s.skipOpenAPIInstallation {
if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
s.OpenAPIV3VersionedService = routes.OpenAPI{
Config: s.openAPIV3Config,
}.InstallV3(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux)
}
}
// 健康检查
s.installHealthz()
// 存活检查
s.installLivez()
// as soon as shutdown is initiated, readiness should start failing
readinessStopCh := s.lifecycleSignals.ShutdownInitiated.Signaled()
err := s.addReadyzShutdownCheck(readinessStopCh)
if err != nil {
klog.Errorf("Failed to install readyz shutdown check %s", err)
}
// 启动准备就绪检查
s.installReadyz()
return preparedGenericAPIServer{s}
}
基于委托模式,重新看 CreateServerChain 函数,从尾节点开始依次创建 API Server 委托对象:
// 0、初始化 404 Handler Server
notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
// 1、初始化 APIExtensionsServer ,传入 404 Handler Server 作为下一个委托对象
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
// 2、初始化 KubeAPIServer ,传入 APIExtensionsServer 作为下一个委托对象
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
// 3、初始化 AggregatorServer ,传入 KubeAPIServer 作为下一个委托对象
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
// 4、返回 AggregatorServer
return aggregatorServer, nil
先看 createAPIExtensionsServer :
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
return apiextensionsConfig.Complete().New(delegateAPIServer)
}
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
// 创建 APIExtensionsServer 委托对象,并传入所指向的下一个委托对象,这里是 404 Handler Server
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
// ...
}
再看 CreateKubeAPIServer:
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
return kubeAPIServerConfig.Complete().New(delegateAPIServer)
}
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
// ...
// 创建 KubeAPIServer 委托对象,并传入所指向的下一个委托对象,这里是 APIExtensionsServer
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
// ...
}
最后看 createAggregatorServer:
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
// ...
}
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
// 创建 AggregatorServer 委托对象,并传入所指向的下一个委托对象,这里是 KubeAPIServer
genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
// ...
}
可以看到三个服务的初始化过程都是一样,调用 c.GenericConfig.New("server name", delegationTarget)
方法:
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
// ...
// 创建 API Server 的 Handler 处理器
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
s := &GenericAPIServer{
// 保存下一个委托对象
delegationTarget: delegationTarget,
// 保存当前委托对象 API Server 的 Handler 处理器
Handler: apiServerHandler,
// ...
}
// ...
return s, nil
}
在继续看 NewAPIServerHandler 函数之前,先了解一下 github.com/emicklei/go-restful ,因为 apiserver 就是使用这个库实现的 RESTful API 。
直接用一个小例子搞懂 apiserver 中 go-restful 的使用:
package main
import (
"net/http"
"github.com/emicklei/go-restful/v3"
"k8s.io/apiserver/pkg/server"
"k8s.io/kubernetes/pkg/api/legacyscheme"
)
func main() {
// 创建 API Server 的 Handler 处理器
handler := server.NewAPIServerHandler(
"test-server",
legacyscheme.Codecs,
func(apiHandler http.Handler) http.Handler {
return apiHandler
},
nil)
// 注册路由
testApisV1 := new(restful.WebService).Path("/apis/test/v1")
{
testApisV1.Route(testApisV1.GET("hello").To(
func(req *restful.Request, resp *restful.Response) {
resp.WriteAsJson(map[string]interface{}{"k": "v"})
},
)).Doc("hello endpoint")
}
// 路由添加到 GoRestfulContainer
handler.GoRestfulContainer.Add(testApisV1)
// 启动监听服务
panic(http.ListenAndServe(":8080", handler))
}
// $ curl 127.0.0.1:8080/apis/test/v1/hello
// {
// "k": "v"
// }
继续看源码,可以看到 apiserver 实际只是对 go-restful 进行了一些简单的封装,使用了其中的一些基本方法。不过因为 go-restful 对于一些 API 有兼容性问题,因此引入了 Director 机制来选择是使用 GoRestfulContainer 还是 NonGoRestfulMux :
// 处理顺序:FullHandlerChain -> Director(选择) -> {GoRestfulContainer,NonGoRestfulMux}
type APIServerHandler struct {
// 完整的处理程序链,包含了所有的中间件和处理程序
FullHandlerChain http.Handler
// 注册和管理 go-restful 的路由和处理程序
GoRestfulContainer *restful.Container
// 注册和管理非 go-restful 的路由和处理程序
NonGoRestfulMux *mux.PathRecorderMux
// 根据已注册的 web 服务检查来选择使用哪个处理程序(gorestful 或非 gorestful)
Director http.Handler
}
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
// 非 go-restful ,以下称 mux 的初始化
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
// 自定义 404 处理器
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
// go-restful 的初始化
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
// 声明 director ,后续使用 director 来决定是使用 mux 还是 go-restful
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
// 返回 APIServerHandler
return &APIServerHandler{
// 在 director 基础上增加中间件,得到完整的处理程序链
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
至此,HTTP Server 的初始化完成。
微信公众号
更多内容请关注微信公众号:gopher云原生