09. kube-apiserver 准入 Webhook
在上回介绍准入控制器时,我们提到了两个特殊的控制器:MutatingAdmissionWebhook
和ValidatingAdmissionWebhook
,它们具有很高的扩展性,可以根据相关资源的配置调用相应的Webhook 服务,触发 HTTP 回调机制,以实现变更和验证操作。
在对源码进行分析之前,我们先尝试体验一下 Admission Webhook 机制,以了解它的运行流程。
kube-apiserver 要求 Webhook 服务需要实现 https ,我们采用自建证书的方式,执行:
openssl genrsa -out ca.key 2048
openssl req -x509 -new -nodes -key ca.key -subj "/CN=YOUR_CA_NAME" -days 3650 -out ca.crt
openssl genrsa -out server.key 2048
openssl req -new -key server.key -subj "/CN=YOUR_SERVER_DNS" -out server.csr
echo "subjectAltName = IP:127.0.0.1" > extfile.cnf
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 365 -extfile extfile.cnf
就可以得到根证书 ca.crt
、服务器证书 server.crt
和服务器私钥 server.key
三个文件。
以 MutatingAdmissionWebhook
为例,创建 test-mutating-webhook.yaml
文件:
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
name: test-mutating-webhook
webhooks:
- name: test-mutating-webhook.example.com
clientConfig:
caBundle: <CA_CERTIFICATE>
url: https://127.0.0.1:8080/mutate
rules:
- apiGroups: [""]
apiVersions: ["v1"]
operations: ["CREATE"]
resources: ["pods"]
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: ["v1", "v1beta1"]
这个配置文件定义了一个 MutatingWebhookConfiguration
资源。该资源用于配置 MutatingAdmissionWebhook
,以便拦截创建 Pod
资源的请求,并将这些请求转发到 https://127.0.0.1:8080/mutate
来进行资源的变更操作。
其中 <CA_CERTIFICATE>
是根证书的内容,需要执行 cat ca.crt | base64 -w 0
后将输出内容替换到其中。
在我们第1回搭建起来的 apiserver 及 kubectl 环境中执行创建命令:
$ kubectl apply -f test-mutating-webhook.yaml
mutatingwebhookconfiguration.admissionregistration.k8s.io/test-mutating-webhook created
接下来编写一个 Webhook 服务,并执行 go run main.go
运行:
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
)
func main() {
// 注册 /mutate 路由
http.HandleFunc("/mutate", mutate)
// 启动 webhook 服务
panic(http.ListenAndServeTLS(":8080", "server.crt", "server.key", nil))
}
type patchOperation struct {
Op string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value,omitempty"`
}
func mutate(w http.ResponseWriter, r *http.Request) {
fmt.Println("收到来自 kube-apiserver 的请求.")
// 解析 body 为 Pod 对象
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
deserializer := serializer.NewCodecFactory(runtime.NewScheme()).UniversalDeserializer()
ar := admissionv1.AdmissionReview{}
if _, _, err := deserializer.Decode(body, nil, &ar); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var pod corev1.Pod
if err := json.Unmarshal(ar.Request.Object.Raw, &pod); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 执行变更操作,添加一个注释
pod.ObjectMeta.Annotations["mutate"] = "欢迎关注公众号:gopher云原生"
// 构造 Patch 对象
patch := []patchOperation{
{
Op: "add",
Path: "/metadata/annotations",
Value: pod.ObjectMeta.Annotations,
},
}
patchBytes, err := json.Marshal(patch)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 将篡改后的 Patch 对象内容写入 response
admissionReview := admissionv1.AdmissionReview{
TypeMeta: metav1.TypeMeta{
APIVersion: "admission.k8s.io/v1",
Kind: "AdmissionReview",
},
Response: &admissionv1.AdmissionResponse{
UID: ar.Request.UID,
Allowed: true,
Patch: patchBytes,
PatchType: func() *admissionv1.PatchType {
pt := admissionv1.PatchTypeJSONPatch
return &pt
}(),
},
}
resp, err := json.Marshal(admissionReview)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if _, err := w.Write(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
fmt.Println("资源变更成功.")
}
之后只要集群中有 Pod 资源的创建,就会执行该变更 Webhook ,为 Pod 资源加入 mutate 注释。
开始测试,执行 kubectl apply -f pod.yaml
创建一个如下的 Pod 资源:
apiVersion: v1
kind: Pod
metadata:
name: nginx
spec:
containers:
- name: nginx
image: nginx:1.14.2
ports:
- containerPort: 80
其中 Webhook 服务的日志:
$ go run main.go
收到来自 kube-apiserver 的请求.
资源变更成功.
查看所创建的 Pod 资源的注释内容:
$ kubectl get pod nginx -o yaml | grep mutate
mutate: 欢迎关注公众号:gopher云原生
熟悉了准入 Webhook 的流程后,正式进入源码分析阶段。
来到上回讲过的 k8s.io/apiserver/pkg/server/plugins.go
的 RegisterAllAdmissionPlugins
方法:
// k8s.io/apiserver/pkg/server/plugins.go
func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {
lifecycle.Register(plugins)
// ValidatingAdmissionWebhook 注册
validatingwebhook.Register(plugins)
// MutatingAdmissionWebhook 注册
mutatingwebhook.Register(plugins)
validatingadmissionpolicy.Register(plugins)
}
还是以 MutatingAdmissionWebhook
为例,查看其注册方法:
// k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/plugin.go
// 插件名
const (
PluginName = "MutatingAdmissionWebhook"
)
// 插件的注册方法
func Register(plugins *admission.Plugins) {
plugins.Register(PluginName, func(configFile io.Reader) (admission.Interface, error) {
// 调用 NewMutatingWebhook 创建准入控制器插件
plugin, err := NewMutatingWebhook(configFile)
if err != nil {
return nil, err
}
return plugin, nil
})
}
type Plugin struct {
*generic.Webhook
}
var _ admission.MutationInterface = &Plugin{}
// NewMutatingWebhook returns a generic admission webhook plugin.
func NewMutatingWebhook(configFile io.Reader) (*Plugin, error) {
// 允许 Connect、Create、Delete、Update 四个操作
handler := admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update)
p := &Plugin{}
var err error
// 创建 Webhook 对象
p.Webhook, err = generic.NewWebhook(handler, configFile, configuration.NewMutatingWebhookConfigurationManager, newMutatingDispatcher(p))
if err != nil {
return nil, err
}
return p, nil
}
// 变更操作
func (a *Plugin) Admit(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {
// 调用 Webhook 对象的 Dispatch 方法来调用 Webhook 服务实现变更操作
return a.Webhook.Dispatch(ctx, attr, o)
}
MutatingAdmissionWebhook
控制器的变更操作是去调用 Webhook 服务,先看到 Webhook 对象的创建方法 generic.NewWebhook
:
// k8s.io/apiserver/pkg/admission/plugin/webhook/generic/webhook.go
func NewWebhook(handler *admission.Handler, configFile io.Reader, sourceFactory sourceFactory, dispatcherFactory dispatcherFactory) (*Webhook, error) {
// ...
return &Webhook{
Handler: handler,
// sourceFactory 是 MutatingWebhookConfiguration 资源的管理函数
sourceFactory: sourceFactory,
clientManager: &cm,
namespaceMatcher: &namespace.Matcher{},
objectMatcher: &object.Matcher{},
// dispatcher 是 generic.Dispatcher 接口实现,用于实现调用 Webhook 服务
dispatcher: dispatcherFactory(&cm),
filterCompiler: cel.NewFilterCompiler(),
}, nil
}
其中 MutatingWebhookConfiguration
资源的管理函数 sourceFactory 实际是 configuration.NewMutatingWebhookConfigurationManager
:
// k8s.io/apiserver/pkg/admission/configuration/mutating_webhook_manager.go
// 使用 client-go 库中的 f 客户端获取 MutatingWebhookConfigurations 资源
func NewMutatingWebhookConfigurationManager(f informers.SharedInformerFactory) generic.Source {
informer := f.Admissionregistration().V1().MutatingWebhookConfigurations()
manager := &mutatingWebhookConfigurationManager{
lister: informer.Lister(),
}
manager.lazy.Evaluate = manager.getConfiguration
handle, _ := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(_ interface{}) { manager.lazy.Notify() },
UpdateFunc: func(_, _ interface{}) { manager.lazy.Notify() },
DeleteFunc: func(_ interface{}) { manager.lazy.Notify() },
})
manager.hasSynced = handle.HasSynced
return manager
}
在后续的插件进行初始化时,会调用 Webhook 对象的 SetExternalKubeInformerFactory
方法,方法内调用该管理函数并将得到的管理对象 manager
保存到 hookSource
对象中:
// k8s.io/apiserver/pkg/admission/plugin/webhook/generic/webhook.go
func (a *Webhook) SetExternalKubeInformerFactory(f informers.SharedInformerFactory) {
namespaceInformer := f.Core().V1().Namespaces()
a.namespaceMatcher.NamespaceLister = namespaceInformer.Lister()
// 调用 sourceFactory 方法,传入 f 客户端,将结果返回给 hookSource
a.hookSource = a.sourceFactory(f)
a.SetReadyFunc(func() bool {
return namespaceInformer.Informer().HasSynced() && a.hookSource.HasSynced()
})
}
然后就可以在 Webhook 对象的 Dispatch 方法中,调用该 hookSource
对象,获取到当前所有的 MutatingWebhookConfiguration 资源 hooks
:
// k8s.io/apiserver/pkg/admission/plugin/webhook/generic/webhook.go
// Webhook 对象的 Dispatch 方法
func (a *Webhook) Dispatch(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {
// ...
// 获取当前所有的 MutatingWebhookConfiguration 资源
hooks := a.hookSource.Webhooks()
// 调用 Webhook 服务
return a.dispatcher.Dispatch(ctx, attr, o, hooks)
}
最后调用 dispatcher 对象来调用 Webhook 服务,跳转到其实现,即创建 Webhook 对象时传入的 newMutatingDispatcher(p)
方法:
// k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/dispatcher.go
type mutatingDispatcher struct {
cm *webhookutil.ClientManager
plugin *Plugin
}
func newMutatingDispatcher(p *Plugin) func(cm *webhookutil.ClientManager) generic.Dispatcher {
return func(cm *webhookutil.ClientManager) generic.Dispatcher {
return &mutatingDispatcher{cm, p}
}
}
// 最终调用 Webhook 服务的 Dispatch 方法
func (a *mutatingDispatcher) Dispatch(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces, hooks []webhook.WebhookAccessor) error {
// ...
// 遍历 hooks ,即所有的 MutatingWebhookConfiguration 资源
for i, hook := range hooks {
// ...
// 调用该 MutatingWebhookConfiguration 指定的 Webhook 服务,进行资源变更操作
changed, err := a.callAttrMutatingHook(ctx, hook, invocation, versionedAttr, annotator, o, round, i)
// ...
if err == nil {
// 调用 Webhook 服务成功,继续下一个 MutatingWebhookConfiguration 资源
continue
}
// 发生错误,返回失败
if rejectionErr, ok := err.(*webhookutil.ErrWebhookRejection); ok {
return rejectionErr.Status
}
return err
}
// ...
return nil
}
总结整个流程就是:MutatingAdmissionWebhook
控制器会遍历所有的 MutatingWebhookConfiguration
资源,根据其中的配置去调用对应的 Webhook 服务,只要其中一个发生错误,则退出。
关键作用的调用 Webhook 服务的 callAttrMutatingHook
方法,实际就是发起一个 POST HTTP 请求:
// k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/dispatcher.go
func (a *mutatingDispatcher) callAttrMutatingHook(ctx context.Context, h *admissionregistrationv1.MutatingWebhook, invocation *generic.WebhookInvocation, attr *admission.VersionedAttributes, annotator *webhookAnnotator, o admission.ObjectInterfaces, round, idx int) (bool, error) {
// ...
// HTTP 客户端
client, err := invocation.Webhook.GetRESTClient(a.cm)
// 创建 Post 请求,传递需要进行变更操作的资源
r := client.Post().Body(request)
// 开始发起请求
do := func() { err = r.Do(ctx).Into(response) }
do()
if err != nil {
// 变更失败,退出
return false, &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: fmt.Errorf("failed to call webhook: %w", err), Status: status}
}
// 变更成功的后续操作
// ...
return changed, nil
}
点到即止,本回先到这里。
微信公众号
更多内容请关注微信公众号:gopher云原生