09. kube-apiserver 准入 Webhook

09. kube-apiserver 准入 Webhook

在上回介绍准入控制器时,我们提到了两个特殊的控制器:MutatingAdmissionWebhookValidatingAdmissionWebhook ,它们具有很高的扩展性,可以根据相关资源的配置调用相应的Webhook 服务,触发 HTTP 回调机制,以实现变更和验证操作。

Untitled

在对源码进行分析之前,我们先尝试体验一下 Admission Webhook 机制,以了解它的运行流程。

kube-apiserver 要求 Webhook 服务需要实现 https ,我们采用自建证书的方式,执行:

openssl genrsa -out ca.key 2048
openssl req -x509 -new -nodes -key ca.key -subj "/CN=YOUR_CA_NAME" -days 3650 -out ca.crt
openssl genrsa -out server.key 2048
openssl req -new -key server.key -subj "/CN=YOUR_SERVER_DNS" -out server.csr
echo "subjectAltName = IP:127.0.0.1" > extfile.cnf
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 365 -extfile extfile.cnf

就可以得到根证书 ca.crt、服务器证书 server.crt 和服务器私钥 server.key 三个文件。

MutatingAdmissionWebhook 为例,创建 test-mutating-webhook.yaml 文件:

apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
  name: test-mutating-webhook
webhooks:
  - name: test-mutating-webhook.example.com
    clientConfig:
      caBundle: <CA_CERTIFICATE>
      url: https://127.0.0.1:8080/mutate
    rules:
      - apiGroups: [""]
        apiVersions: ["v1"]
        operations: ["CREATE"]
        resources: ["pods"]
    failurePolicy: Fail
    sideEffects: None
    admissionReviewVersions: ["v1", "v1beta1"]

这个配置文件定义了一个 MutatingWebhookConfiguration 资源。该资源用于配置 MutatingAdmissionWebhook,以便拦截创建 Pod 资源的请求,并将这些请求转发到 https://127.0.0.1:8080/mutate 来进行资源的变更操作。

其中 <CA_CERTIFICATE> 是根证书的内容,需要执行 cat ca.crt | base64 -w 0 后将输出内容替换到其中。

在我们第1回搭建起来的 apiserver 及 kubectl 环境中执行创建命令:

$ kubectl apply -f test-mutating-webhook.yaml
mutatingwebhookconfiguration.admissionregistration.k8s.io/test-mutating-webhook created

接下来编写一个 Webhook 服务,并执行 go run main.go 运行:

package main

import (
	"encoding/json"
	"fmt"
	"io"
	"net/http"

	admissionv1 "k8s.io/api/admission/v1"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/serializer"
)

func main() {
	// 注册 /mutate 路由
	http.HandleFunc("/mutate", mutate)
	// 启动 webhook 服务
	panic(http.ListenAndServeTLS(":8080", "server.crt", "server.key", nil))
}

type patchOperation struct {
	Op    string      `json:"op"`
	Path  string      `json:"path"`
	Value interface{} `json:"value,omitempty"`
}

func mutate(w http.ResponseWriter, r *http.Request) {
	fmt.Println("收到来自 kube-apiserver 的请求.")

	// 解析 body 为 Pod 对象
	body, err := io.ReadAll(r.Body)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	deserializer := serializer.NewCodecFactory(runtime.NewScheme()).UniversalDeserializer()
	ar := admissionv1.AdmissionReview{}
	if _, _, err := deserializer.Decode(body, nil, &ar); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	var pod corev1.Pod
	if err := json.Unmarshal(ar.Request.Object.Raw, &pod); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	// 执行变更操作,添加一个注释
	pod.ObjectMeta.Annotations["mutate"] = "欢迎关注公众号:gopher云原生"

	// 构造 Patch 对象
	patch := []patchOperation{
		{
			Op:    "add",
			Path:  "/metadata/annotations",
			Value: pod.ObjectMeta.Annotations,
		},
	}
	patchBytes, err := json.Marshal(patch)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	// 将篡改后的 Patch 对象内容写入 response
	admissionReview := admissionv1.AdmissionReview{
		TypeMeta: metav1.TypeMeta{
			APIVersion: "admission.k8s.io/v1",
			Kind:       "AdmissionReview",
		},
		Response: &admissionv1.AdmissionResponse{
			UID:     ar.Request.UID,
			Allowed: true,
			Patch:   patchBytes,
			PatchType: func() *admissionv1.PatchType {
				pt := admissionv1.PatchTypeJSONPatch
				return &pt
			}(),
		},
	}
	resp, err := json.Marshal(admissionReview)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	if _, err := w.Write(resp); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	fmt.Println("资源变更成功.")
}

之后只要集群中有 Pod 资源的创建,就会执行该变更 Webhook ,为 Pod 资源加入 mutate 注释。

开始测试,执行 kubectl apply -f pod.yaml 创建一个如下的 Pod 资源:

apiVersion: v1
kind: Pod
metadata:
  name: nginx
spec:
  containers:
  - name: nginx
    image: nginx:1.14.2
    ports:
    - containerPort: 80

其中 Webhook 服务的日志:

$ go run main.go 
收到来自 kube-apiserver 的请求.
资源变更成功.

查看所创建的 Pod 资源的注释内容:

$ kubectl get pod nginx -o yaml | grep mutate
    mutate: 欢迎关注公众号:gopher云原生

熟悉了准入 Webhook 的流程后,正式进入源码分析阶段。

来到上回讲过的 k8s.io/apiserver/pkg/server/plugins.goRegisterAllAdmissionPlugins 方法:

// k8s.io/apiserver/pkg/server/plugins.go

func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {
	lifecycle.Register(plugins)
	// ValidatingAdmissionWebhook 注册
	validatingwebhook.Register(plugins)
	// MutatingAdmissionWebhook 注册
	mutatingwebhook.Register(plugins)
	validatingadmissionpolicy.Register(plugins)
}

还是以 MutatingAdmissionWebhook 为例,查看其注册方法:

// k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/plugin.go

// 插件名
const (
	PluginName = "MutatingAdmissionWebhook"
)

// 插件的注册方法
func Register(plugins *admission.Plugins) {
	plugins.Register(PluginName, func(configFile io.Reader) (admission.Interface, error) {
		// 调用 NewMutatingWebhook 创建准入控制器插件
		plugin, err := NewMutatingWebhook(configFile)
		if err != nil {
			return nil, err
		}

		return plugin, nil
	})
}

type Plugin struct {
	*generic.Webhook
}

var _ admission.MutationInterface = &Plugin{}

// NewMutatingWebhook returns a generic admission webhook plugin.
func NewMutatingWebhook(configFile io.Reader) (*Plugin, error) {
	// 允许 Connect、Create、Delete、Update 四个操作
	handler := admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update)
	p := &Plugin{}
	var err error
	// 创建 Webhook 对象
	p.Webhook, err = generic.NewWebhook(handler, configFile, configuration.NewMutatingWebhookConfigurationManager, newMutatingDispatcher(p))
	if err != nil {
		return nil, err
	}

	return p, nil
}

// 变更操作
func (a *Plugin) Admit(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {
	// 调用 Webhook 对象的 Dispatch 方法来调用 Webhook 服务实现变更操作
	return a.Webhook.Dispatch(ctx, attr, o)
}

MutatingAdmissionWebhook 控制器的变更操作是去调用 Webhook 服务,先看到 Webhook 对象的创建方法 generic.NewWebhook

// k8s.io/apiserver/pkg/admission/plugin/webhook/generic/webhook.go

func NewWebhook(handler *admission.Handler, configFile io.Reader, sourceFactory sourceFactory, dispatcherFactory dispatcherFactory) (*Webhook, error) {
	// ...

	return &Webhook{
		Handler:          handler,
		// sourceFactory 是 MutatingWebhookConfiguration 资源的管理函数
		sourceFactory:    sourceFactory,
		clientManager:    &cm,
		namespaceMatcher: &namespace.Matcher{},
		objectMatcher:    &object.Matcher{},
		// dispatcher 是 generic.Dispatcher 接口实现,用于实现调用 Webhook 服务
		dispatcher:       dispatcherFactory(&cm),
		filterCompiler:   cel.NewFilterCompiler(),
	}, nil
}

其中 MutatingWebhookConfiguration 资源的管理函数 sourceFactory 实际是 configuration.NewMutatingWebhookConfigurationManager

// k8s.io/apiserver/pkg/admission/configuration/mutating_webhook_manager.go

// 使用 client-go 库中的 f 客户端获取 MutatingWebhookConfigurations 资源
func NewMutatingWebhookConfigurationManager(f informers.SharedInformerFactory) generic.Source {
	informer := f.Admissionregistration().V1().MutatingWebhookConfigurations()
	manager := &mutatingWebhookConfigurationManager{
		lister: informer.Lister(),
	}
	manager.lazy.Evaluate = manager.getConfiguration

	handle, _ := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    func(_ interface{}) { manager.lazy.Notify() },
		UpdateFunc: func(_, _ interface{}) { manager.lazy.Notify() },
		DeleteFunc: func(_ interface{}) { manager.lazy.Notify() },
	})
	manager.hasSynced = handle.HasSynced

	return manager
}

在后续的插件进行初始化时,会调用 Webhook 对象的 SetExternalKubeInformerFactory 方法,方法内调用该管理函数并将得到的管理对象 manager 保存到 hookSource 对象中:

// k8s.io/apiserver/pkg/admission/plugin/webhook/generic/webhook.go

func (a *Webhook) SetExternalKubeInformerFactory(f informers.SharedInformerFactory) {
	namespaceInformer := f.Core().V1().Namespaces()
	a.namespaceMatcher.NamespaceLister = namespaceInformer.Lister()
	// 调用 sourceFactory 方法,传入 f 客户端,将结果返回给 hookSource
	a.hookSource = a.sourceFactory(f)
	a.SetReadyFunc(func() bool {
		return namespaceInformer.Informer().HasSynced() && a.hookSource.HasSynced()
	})
}

然后就可以在 Webhook 对象的 Dispatch 方法中,调用该 hookSource 对象,获取到当前所有的 MutatingWebhookConfiguration 资源 hooks

// k8s.io/apiserver/pkg/admission/plugin/webhook/generic/webhook.go

// Webhook 对象的 Dispatch 方法
func (a *Webhook) Dispatch(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {
	// ...
	// 获取当前所有的 MutatingWebhookConfiguration 资源
	hooks := a.hookSource.Webhooks()
	// 调用 Webhook 服务
	return a.dispatcher.Dispatch(ctx, attr, o, hooks)
}

最后调用 dispatcher 对象来调用 Webhook 服务,跳转到其实现,即创建 Webhook 对象时传入的 newMutatingDispatcher(p) 方法:

// k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/dispatcher.go

type mutatingDispatcher struct {
	cm     *webhookutil.ClientManager
	plugin *Plugin
}

func newMutatingDispatcher(p *Plugin) func(cm *webhookutil.ClientManager) generic.Dispatcher {
	return func(cm *webhookutil.ClientManager) generic.Dispatcher {
		return &mutatingDispatcher{cm, p}
	}
}

// 最终调用 Webhook 服务的 Dispatch 方法
func (a *mutatingDispatcher) Dispatch(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces, hooks []webhook.WebhookAccessor) error {
	// ...

	// 遍历 hooks ,即所有的 MutatingWebhookConfiguration 资源
	for i, hook := range hooks {
		// ...
		
		// 调用该 MutatingWebhookConfiguration 指定的 Webhook 服务,进行资源变更操作
		changed, err := a.callAttrMutatingHook(ctx, hook, invocation, versionedAttr, annotator, o, round, i)
		
		// ...
		if err == nil {
			// 调用 Webhook 服务成功,继续下一个 MutatingWebhookConfiguration 资源
			continue
		}
		// 发生错误,返回失败
		if rejectionErr, ok := err.(*webhookutil.ErrWebhookRejection); ok {
			return rejectionErr.Status
		}
		return err
	}
	// ...
	return nil
}

总结整个流程就是:MutatingAdmissionWebhook 控制器会遍历所有的 MutatingWebhookConfiguration 资源,根据其中的配置去调用对应的 Webhook 服务,只要其中一个发生错误,则退出。

关键作用的调用 Webhook 服务的 callAttrMutatingHook 方法,实际就是发起一个 POST HTTP 请求:

// k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/dispatcher.go

func (a *mutatingDispatcher) callAttrMutatingHook(ctx context.Context, h *admissionregistrationv1.MutatingWebhook, invocation *generic.WebhookInvocation, attr *admission.VersionedAttributes, annotator *webhookAnnotator, o admission.ObjectInterfaces, round, idx int) (bool, error) {
	// ...

	// HTTP 客户端
	client, err := invocation.Webhook.GetRESTClient(a.cm)
	
	// 创建 Post 请求,传递需要进行变更操作的资源
	r := client.Post().Body(request)

	// 开始发起请求
	do := func() { err = r.Do(ctx).Into(response) }
	do()
	if err != nil {
		// 变更失败,退出
		return false, &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: fmt.Errorf("failed to call webhook: %w", err), Status: status}
	}
	// 变更成功的后续操作

	// ...
	return changed, nil
}

点到即止,本回先到这里。

微信公众号

更多内容请关注微信公众号:gopher云原生