08. kube-apiserver API 准入控制
接上回, API 经过认证和鉴权后,来到第三阶段:准入控制(Admission Controllers)。
在 kube-apiserver 中,准入控制器实际就是一段代码,它会在请求通过 认证(Authentication)和 鉴权(Authorization)之后、对象被持久化到 etcd 之前拦截到达 API 服务器的请求。
准入控制器按操作类型可以分为 验证(Validating) 和 变更(Mutating) 两种,其中变更准入控制器可以修改用户提交的资源对象信息;验证准入控制器则可以用于验证用户提交的资源对象信息。
还需要注意的一点是,准入控制器是用来限制 创建、删除、修改 对象的请求,也可以阻止自定义动作,例如通过 API 服务器代理连接到 Pod 的请求。 但是不会也不能去阻止 读取(get、watch 或 list)对象的请求。
当前版本支持的准入控制器一共有 36 种:
AlwaysAdmit、AlwaysDeny、AlwaysPullImages、CertificateApproval、CertificateSigning、CertificateSubjectRestriction、ClusterTrustBundleAttest、DefaultIngressClass、DefaultStorageClass、DefaultTolerationSeconds、DenyServiceExternalIPs、EventRateLimit、ExtendedResourceToleration、ImagePolicyWebhook、LimitPodHardAntiAffinityTopology、LimitRanger、MutatingAdmissionWebhook、NamespaceAutoProvision、NamespaceExists、NamespaceLifecycle、NodeRestriction、OwnerReferencesPermissionEnforcement、PersistentVolumeClaimResize、PersistentVolumeLabel、PodNodeSelector、PodSecurity、PodTolerationRestriction、Priority、ResourceQuota、RuntimeClass、SecurityContextDeny、ServiceAccount、StorageObjectInUseProtection、TaintNodesByCondition、ValidatingAdmissionPolicy、ValidatingAdmissionWebhook
其中默认启用的就有 20 种:
NamespaceLifecycle、LimitRanger、ServiceAccount、TaintNodesByCondition、PodSecurity、Priority、DefaultTolerationSeconds、DefaultStorageClass、StorageObjectInUseProtection、PersistentVolumeClaimResize、RuntimeClass、CertificateApproval、CertificateSigning、ClusterTrustBundleAttest、CertificateSubjectRestriction、DefaultIngressClass、MutatingAdmissionWebhook、ValidatingAdmissionPolicy、ValidatingAdmissionWebhook、ResourceQuota
启用或禁用可以分别通过 --enable-admission-plugins
和 --disable-admission-plugins
参数控制。
废话不再多说,开始进入源码分析。
来到第 1 回的初始化默认启动参数时的 NewServerRunOptions
方法:
// cmd/kube-apiserver/app/options/options.go
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
// 准入控制器的配置
Admission: kubeoptions.NewAdmissionOptions(),
// 认证器的配置,见上一回
Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(),
// 鉴权器的配置,见上一回
Authorization: kubeoptions.NewBuiltInAuthorizationOptions(),
// ...
}
// ...
return &s
}
看其中准入控制器的配置 NewAdmissionOptions
方法:
// pkg/kubeapiserver/options/admission.go
func NewAdmissionOptions() *AdmissionOptions {
// 通过 genericoptions.NewAdmissionOptions() 来创建 AdmissionOptions 实例
options := genericoptions.NewAdmissionOptions()
// ...
}
// k8s.io/apiserver/pkg/server/options/admission.go
func NewAdmissionOptions() *AdmissionOptions {
options := &AdmissionOptions{
Plugins: admission.NewPlugins(),
Decorators: admission.Decorators{admission.DecoratorFunc(admissionmetrics.WithControllerMetrics)},
RecommendedPluginOrder: []string{lifecycle.PluginName, mutatingwebhook.PluginName, validatingadmissionpolicy.PluginName, validatingwebhook.PluginName},
DefaultOffPlugins: sets.NewString(),
}
// 注册所有准入控制器插件
server.RegisterAllAdmissionPlugins(options.Plugins)
return options
}
// k8s.io/apiserver/pkg/server/plugins.go
func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {
lifecycle.Register(plugins)
validatingwebhook.Register(plugins)
mutatingwebhook.Register(plugins)
validatingadmissionpolicy.Register(plugins)
}
在 k8s.io/apiserver/pkg/server/plugins.go
的 RegisterAllAdmissionPlugins
方法中注册了 4 个准入控制器:NamespaceLifecycle、ValidatingAdmissionWebhook、MutatingAdmissionWebhook、ValidatingAdmissionPolicy 。
其中有两个特殊的控制器:MutatingAdmissionWebhook
和 ValidatingAdmissionWebhook
,它们提供了准入控制器的高度扩展性,可以根据相关的资源配置,调用对应的 Webhook 服务,触发 HTTP 回调机制,来分别实现变更和验证操作。
继续看到 NewAdmissionOptions
方法:
// pkg/kubeapiserver/options/admission.go
func NewAdmissionOptions() *AdmissionOptions {
// 通过 genericoptions.NewAdmissionOptions() 来创建 AdmissionOptions 实例
options := genericoptions.NewAdmissionOptions()
RegisterAllAdmissionPlugins(options.Plugins)
// ...
}
func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {
admit.Register(plugins) // DEPRECATED as no real meaning
alwayspullimages.Register(plugins)
antiaffinity.Register(plugins)
defaulttolerationseconds.Register(plugins)
defaultingressclass.Register(plugins)
denyserviceexternalips.Register(plugins)
deny.Register(plugins) // DEPRECATED as no real meaning
eventratelimit.Register(plugins)
extendedresourcetoleration.Register(plugins)
gc.Register(plugins)
imagepolicy.Register(plugins)
limitranger.Register(plugins)
autoprovision.Register(plugins)
exists.Register(plugins)
noderestriction.Register(plugins)
nodetaint.Register(plugins)
label.Register(plugins) // DEPRECATED, future PVs should not rely on labels for zone topology
podnodeselector.Register(plugins)
podtolerationrestriction.Register(plugins)
runtimeclass.Register(plugins)
resourcequota.Register(plugins)
podsecurity.Register(plugins)
podpriority.Register(plugins)
scdeny.Register(plugins)
serviceaccount.Register(plugins)
setdefault.Register(plugins)
resize.Register(plugins)
storageobjectinuseprotection.Register(plugins)
certapproval.Register(plugins)
certsigning.Register(plugins)
ctbattest.Register(plugins)
certsubjectrestriction.Register(plugins)
}
在这里的 RegisterAllAdmissionPlugins
方法会继续注册剩下的 32 个准入控制器。
这样,36 个准入控制器都完成了注册工作。
以 AlwaysAdmit
为例,该准入控制器允许所有的 Pod 进入集群,虽然该插件已被弃用,但其源码很简单,可以方便我们学习,来到 admit.Register(plugins)
:
// plugin/pkg/admission/admit/admission.go
// 插件名定义
const PluginName = "AlwaysAdmit"
func Register(plugins *admission.Plugins) {
// 插件注册,就是调用 plugins.Register 方法,内部实现很简单,就是一个 map + 锁 ,不展开
plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
// config 参数实际就是资源对象
// 返回一个实现了 admission.Interface 接口的准入控制器
return NewAlwaysAdmit(), nil
})
}
type alwaysAdmit struct{}
var _ admission.MutationInterface = alwaysAdmit{}
var _ admission.ValidationInterface = alwaysAdmit{}
// 实现了 admission.MutationInterface 接口,用于执行变更操作
func (alwaysAdmit) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) (err error) {
return nil
}
// 实现了 admission.ValidationInterface 接口,用于执行验证操作
func (alwaysAdmit) Validate(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) (err error) {
return nil
}
// 实现了 admission.Interface 接口,用于判断当前请求的操作该准入控制器是否可以进行处理
func (alwaysAdmit) Handles(operation admission.Operation) bool {
return true
}
func NewAlwaysAdmit() admission.Interface {
return new(alwaysAdmit)
}
可以看到要实现一个准入控制器,只需要实现以下接口:
// k8s.io/apiserver/pkg/admission/interfaces.go
// 准入控制器的接口
type Interface interface {
// 用于判断当前的 admission controller 是否能够处理给定的操作
// 操作可以是 CREATE(创建)、UPDATE(更新)、DELETE(删除)或 CONNECT(连接)之一
Handles(operation Operation) bool
}
// 变更准入控制器的接口
type MutationInterface interface {
// 继承准入控制器接口
Interface
// 执行变更操作
Admit(ctx context.Context, a Attributes, o ObjectInterfaces) (err error)
}
// 验证准入控制器的接口
type ValidationInterface interface {
// 继承准入控制器接口
Interface
// 执行验证操作
Validate(ctx context.Context, a Attributes, o ObjectInterfaces) (err error)
}
所有准入控制器插件注册完毕后,还要过滤出需要禁用的:
// pkg/kubeapiserver/options/admission.go
func NewAdmissionOptions() *AdmissionOptions {
// 通过 genericoptions.NewAdmissionOptions() 来创建 AdmissionOptions 实例
options := genericoptions.NewAdmissionOptions()
RegisterAllAdmissionPlugins(options.Plugins)
// 所有的准入控制器插件名列表
options.RecommendedPluginOrder = AllOrderedPlugins
// 默认禁用的准入控制器插件
options.DefaultOffPlugins = DefaultOffAdmissionPlugins()
return &AdmissionOptions{
GenericAdmission: options,
}
}
// pkg/kubeapiserver/options/plugins.go
func DefaultOffAdmissionPlugins() sets.String {
// 默认启用的 20 个准入控制器
defaultOnPlugins := sets.NewString(
lifecycle.PluginName, // NamespaceLifecycle
limitranger.PluginName, // LimitRanger
serviceaccount.PluginName, // ServiceAccount
setdefault.PluginName, // DefaultStorageClass
resize.PluginName, // PersistentVolumeClaimResize
defaulttolerationseconds.PluginName, // DefaultTolerationSeconds
mutatingwebhook.PluginName, // MutatingAdmissionWebhook
validatingwebhook.PluginName, // ValidatingAdmissionWebhook
resourcequota.PluginName, // ResourceQuota
storageobjectinuseprotection.PluginName, // StorageObjectInUseProtection
podpriority.PluginName, // Priority
nodetaint.PluginName, // TaintNodesByCondition
runtimeclass.PluginName, // RuntimeClass
certapproval.PluginName, // CertificateApproval
certsigning.PluginName, // CertificateSigning
ctbattest.PluginName, // ClusterTrustBundleAttest
certsubjectrestriction.PluginName, // CertificateSubjectRestriction
defaultingressclass.PluginName, // DefaultIngressClass
podsecurity.PluginName, // PodSecurity
validatingadmissionpolicy.PluginName, // ValidatingAdmissionPolicy, only active when feature gate ValidatingAdmissionPolicy is enabled
)
// 过滤启用的,剩下的就是禁用的
return sets.NewString(AllOrderedPlugins...).Difference(defaultOnPlugins)
}
配置创建完后,开始初始化,来到 buildGenericConfig
方法:
// cmd/kube-apiserver/app/server.go
func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,
) (
genericConfig *genericapiserver.Config,
versionedInformers clientgoinformers.SharedInformerFactory,
serviceResolver aggregatorapiserver.ServiceResolver,
pluginInitializers []admission.PluginInitializer,
admissionPostStartHook genericapiserver.PostStartHookFunc,
storageFactory *serverstorage.DefaultStorageFactory,
lastErr error,
) {
// ...
// 准入控制器的初始化
err = s.Admission.ApplyTo(
genericConfig,
versionedInformers,
kubeClientConfig,
utilfeature.DefaultFeatureGate,
pluginInitializers...)
// ...
return
}
跳到 s.Admission.ApplyTo
方法:
// pkg/kubeapiserver/options/admission.go
func (a *AdmissionOptions) ApplyTo(
c *server.Config,
informers informers.SharedInformerFactory,
kubeAPIServerClientConfig *rest.Config,
features featuregate.FeatureGate,
pluginInitializers ...admission.PluginInitializer,
) error {
// ...
return a.GenericAdmission.ApplyTo(c, informers, kubeAPIServerClientConfig, features, pluginInitializers...)
}
// k8s.io/apiserver/pkg/server/options/admission.go
func (a *AdmissionOptions) ApplyTo(
c *server.Config,
informers informers.SharedInformerFactory,
kubeAPIServerClientConfig *rest.Config,
features featuregate.FeatureGate,
pluginInitializers ...admission.PluginInitializer,
) error {
// ...
// 获取启用的插件名,默认 20 个
pluginNames := a.enabledPluginNames()
// 读取插件的配置文件(若有)
pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(pluginNames, a.ConfigFile, configScheme)
// 创建一个 clientset 客户端,用于插件和 api server 交互,可操作内置资源
clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig)
// 创建一个 dynamicClient 客户端,用于插件和 api server 交互,可操作自定义资源
dynamicClient, err := dynamic.NewForConfig(kubeAPIServerClientConfig)
genericInitializer := initializer.New(clientset, dynamicClient, informers, c.Authorization.Authorizer, features, c.DrainedNotify())
initializersChain := admission.PluginInitializers{genericInitializer}
initializersChain = append(initializersChain, pluginInitializers...)
// 初始化准入控制器链,将所有插件合并成一个插件
admissionChain, err := a.Plugins.NewFromPlugins(pluginNames, pluginsConfigProvider, initializersChain, a.Decorators)
// 封装指标,用于观测
c.AdmissionControl = admissionmetrics.WithStepMetrics(admissionChain)
return nil
}
继续跳到 a.Plugins.NewFromPlugins
方法:
// k8s.io/apiserver/pkg/admission/plugins.go
func (ps *Plugins) NewFromPlugins(pluginNames []string, configProvider ConfigProvider, pluginInitializer PluginInitializer, decorator Decorator) (Interface, error) {
// 准入控制器列表
handlers := []Interface{}
mutationPlugins := []string{}
validationPlugins := []string{}
// 遍历所有插件
for _, pluginName := range pluginNames {
pluginConfig, err := configProvider.ConfigFor(pluginName)
if err != nil {
return nil, err
}
plugin, err := ps.InitPlugin(pluginName, pluginConfig, pluginInitializer)
if err != nil {
return nil, err
}
if plugin != nil {
// 添加到控制器列表中
if decorator != nil {
handlers = append(handlers, decorator.Decorate(plugin, pluginName))
} else {
handlers = append(handlers, plugin)
}
// 属于变更插件
if _, ok := plugin.(MutationInterface); ok {
mutationPlugins = append(mutationPlugins, pluginName)
}
// 属于验证插件
if _, ok := plugin.(ValidationInterface); ok {
validationPlugins = append(validationPlugins, pluginName)
}
}
}
if len(mutationPlugins) != 0 {
klog.Infof("Loaded %d mutating admission controller(s) successfully in the following order: %s.", len(mutationPlugins), strings.Join(mutationPlugins, ","))
}
if len(validationPlugins) != 0 {
klog.Infof("Loaded %d validating admission controller(s) successfully in the following order: %s.", len(validationPlugins), strings.Join(validationPlugins, ","))
}
// 使用装饰器模式创建一个新的准入控制器
return newReinvocationHandler(chainAdmissionHandler(handlers)), nil
}
chainAdmissionHandler
实际是一个 []Interface
类型,其又实现了准入控制器的接口:
// k8s.io/apiserver/pkg/admission/chain.go
type chainAdmissionHandler []Interface
func NewChainHandler(handlers ...Interface) chainAdmissionHandler {
return chainAdmissionHandler(handlers)
}
func (admissionHandler chainAdmissionHandler) Admit(ctx context.Context, a Attributes, o ObjectInterfaces) error {
// 遍历准入控制器列表
for _, handler := range admissionHandler {
if !handler.Handles(a.GetOperation()) {
// 如果该准入控制器无法处理该类型的请求,则跳过
continue
}
if mutator, ok := handler.(MutationInterface); ok {
// 该准入控制器实现了变更接口,执行变更操作,如果失败,则返回错误
err := mutator.Admit(ctx, a, o)
if err != nil {
return err
}
}
}
return nil
}
func (admissionHandler chainAdmissionHandler) Validate(ctx context.Context, a Attributes, o ObjectInterfaces) error {
// 遍历准入控制器列表
for _, handler := range admissionHandler {
if !handler.Handles(a.GetOperation()) {
// 如果该准入控制器无法处理该类型的请求,则跳过
continue
}
if validator, ok := handler.(ValidationInterface); ok {
// 该准入控制器实现了验证接口,执行验证操作,如果失败,则返回错误
err := validator.Validate(ctx, a, o)
if err != nil {
return err
}
}
}
return nil
}
func (admissionHandler chainAdmissionHandler) Handles(operation Operation) bool {
for _, handler := range admissionHandler {
if handler.Handles(operation) {
return true
}
}
return false
}
从 chainAdmissionHandler
的实现可以看出,和之前的认证、鉴权(只要有一个认证或鉴权器通过则返回成功)相反,准入控制器只要有一个不通过则返回失败。
经过 chainAdmissionHandler
的包装后,最后还会再使用 newReinvocationHandler
方法包装一层:
// k8s.io/apiserver/pkg/admission/reinvocation.go
func newReinvocationHandler(admissionChain Interface) Interface {
return &reinvoker{admissionChain}
}
type reinvoker struct {
admissionChain Interface
}
func (r *reinvoker) Admit(ctx context.Context, a Attributes, o ObjectInterfaces) error {
if mutator, ok := r.admissionChain.(MutationInterface); ok {
err := mutator.Admit(ctx, a, o)
if err != nil {
return err
}
s := a.GetReinvocationContext()
if s.ShouldReinvoke() {
s.SetIsReinvoke()
return mutator.Admit(ctx, a, o)
}
}
return nil
}
func (r *reinvoker) Validate(ctx context.Context, a Attributes, o ObjectInterfaces) error {
if validator, ok := r.admissionChain.(ValidationInterface); ok {
return validator.Validate(ctx, a, o)
}
return nil
}
func (r *reinvoker) Handles(operation Operation) bool {
return r.admissionChain.Handles(operation)
}
到目前为止,准入控制器都还只是处于配置、初始化阶段,真正去调用执行是在第 6 回讲到的 handler 的处理流程 中。
先来看第 3 回核心 API 的路由注册的 registerResourceHandlers
方法:
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
// 准入控制器
admit := a.group.Admit
for _, action := range actions {
switch action.Verb {
// ...
case "POST":
// ...
// 传入准入控制器 admit
var handler restful.RouteFunction
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResource(creater, reqScope, admit)
}
// ...
case "DELETE": // Delete a resource.
// 传入准入控制器 admit
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit))
// ... 其他操作都是类似的,省略
}
// ...
}
// ...
}
在这里,只要是 创建、删除、修改 对象的请求都会传入准入控制器 admit 对象,以 POST 为例,来看 restfulCreateResource
方法( restfulCreateNamedResource
方法也一样):
// k8s.io/apiserver/pkg/endpoints/installer.go
func restfulCreateResource(r rest.Creater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
}
}
// k8s.io/apiserver/pkg/endpoints/handlers/create.go
func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
}
// k8s.io/apiserver/pkg/endpoints/handlers/create.go
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// ...
// 准入控制器
admit = admission.WithAudit(admit)
admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
// 资源的请求处理函数,其中内部最后会调用 etcd 来存储对象
requestFunc := func() (runtime.Object, error) {
return r.Create(
ctx,
name,
obj,
// 将准入控制器转化为验证准入控制器传入
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
options,
)
}
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
// 先判断是否实现了变更准入控制器接口,然后调用执行变更操作
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
return nil, err
}
}
// 进入请求处理函数
result, err := requestFunc()
// ...
return result, err
})
// ...
}
}
进入请求函数 requestFunc()
方法后,最后会一层层跳转来到 Create
方法:
// k8s.io/apiserver/pkg/registry/generic/registry/store.go
// createValidation 是传入的验证准入控制器
func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
// 先调用验证准入控制器进行验证操作
if createValidation != nil {
if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
return nil, err
}
}
// 后续才开始对资源进行创建
// ...
if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
// ...
}
// ...
}
因此,准入控制器会先执行变更操作,再执行验证操作 。
变更操作已经一目了然了,看看刚才的验证操作的 createValidation
函数的实现 rest.AdmissionToValidateObjectFunc
方法:
// k8s.io/apiserver/pkg/registry/rest/create.go
func AdmissionToValidateObjectFunc(admit admission.Interface, staticAttributes admission.Attributes, o admission.ObjectInterfaces) ValidateObjectFunc {
// 判断是否实现了验证准入控制器接口
validatingAdmission, ok := admit.(admission.ValidationInterface)
if !ok {
return func(ctx context.Context, obj runtime.Object) error { return nil }
}
return func(ctx context.Context, obj runtime.Object) error {
name := staticAttributes.GetName()
// in case the generated name is populated
if len(name) == 0 {
if metadata, err := meta.Accessor(obj); err == nil {
name = metadata.GetName()
}
}
finalAttributes := admission.NewAttributesRecord(
obj,
staticAttributes.GetOldObject(),
staticAttributes.GetKind(),
staticAttributes.GetNamespace(),
name,
staticAttributes.GetResource(),
staticAttributes.GetSubresource(),
staticAttributes.GetOperation(),
staticAttributes.GetOperationOptions(),
staticAttributes.IsDryRun(),
staticAttributes.GetUserInfo(),
)
// 是否支持该操作
if !validatingAdmission.Handles(finalAttributes.GetOperation()) {
return nil
}
// 执行验证操作
return validatingAdmission.Validate(ctx, finalAttributes, o)
}
}
实际也很简单。
本回完。
微信公众号
更多内容请关注微信公众号:gopher云原生