5.3.3 Operator 开发基础

5.3.3 Operator 开发基础 #

Operator 是 Kubernetes 的一种扩展模式,它使用自定义资源(Custom Resources)来管理应用程序及其组件。Operator 将运维知识编码到软件中,自动化复杂应用的部署、扩展、备份和升级等操作。

Operator 模式概述 #

什么是 Operator #

Operator 是一种软件扩展,它使用自定义资源来管理应用程序及其组件。Operator 遵循 Kubernetes 的控制器模式,持续监控集群状态并采取行动以达到期望状态。

Operator 的核心组件 #

  1. 自定义资源定义(CRD):定义新的 API 资源类型
  2. 自定义控制器:监听资源变化并执行相应操作
  3. 自定义资源(CR):CRD 的实例,描述期望状态

Operator 成熟度模型 #

Level 5: Auto Pilot
├── 自动调优和异常检测
├── 自动故障恢复
└── 预测性维护

Level 4: Deep Insights
├── 监控和告警
├── 日志聚合
└── 性能分析

Level 3: Full Lifecycle
├── 应用升级
├── 故障恢复
└── 备份和恢复

Level 2: Seamless Upgrades
├── 补丁和小版本升级
├── 配置更新
└── 滚动更新

Level 1: Basic Install
├── 自动化安装
├── 配置管理
└── 基本运行时管理

开发环境准备 #

安装必要工具 #

# 安装 Operator SDK
curl -LO https://github.com/operator-framework/operator-sdk/releases/latest/download/operator-sdk_linux_amd64
chmod +x operator-sdk_linux_amd64
sudo mv operator-sdk_linux_amd64 /usr/local/bin/operator-sdk

# 安装 Kubebuilder
curl -L -o kubebuilder https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)
chmod +x kubebuilder
sudo mv kubebuilder /usr/local/bin/

# 验证安装
operator-sdk version
kubebuilder version

项目初始化 #

# 创建新的 Operator 项目
mkdir webapp-operator
cd webapp-operator

# 初始化项目
operator-sdk init --domain=example.com --repo=github.com/example/webapp-operator

# 创建 API 和控制器
operator-sdk create api --group=apps --version=v1 --kind=WebApp --resource --controller

自定义资源定义 #

WebApp CRD 定义 #

// api/v1/webapp_types.go
package v1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// WebAppSpec 定义 WebApp 的期望状态
type WebAppSpec struct {
    // 应用镜像
    Image string `json:"image"`

    // 副本数量
    Replicas *int32 `json:"replicas,omitempty"`

    // 端口配置
    Port int32 `json:"port,omitempty"`

    // 环境变量
    Env []EnvVar `json:"env,omitempty"`

    // 资源限制
    Resources ResourceRequirements `json:"resources,omitempty"`

    // 服务类型
    ServiceType ServiceType `json:"serviceType,omitempty"`

    // Ingress 配置
    Ingress *IngressSpec `json:"ingress,omitempty"`
}

// EnvVar 环境变量定义
type EnvVar struct {
    Name  string `json:"name"`
    Value string `json:"value"`
}

// ResourceRequirements 资源需求定义
type ResourceRequirements struct {
    Requests ResourceList `json:"requests,omitempty"`
    Limits   ResourceList `json:"limits,omitempty"`
}

// ResourceList 资源列表
type ResourceList map[string]string

// ServiceType 服务类型
type ServiceType string

const (
    ServiceTypeClusterIP    ServiceType = "ClusterIP"
    ServiceTypeNodePort     ServiceType = "NodePort"
    ServiceTypeLoadBalancer ServiceType = "LoadBalancer"
)

// IngressSpec Ingress 配置
type IngressSpec struct {
    Host string `json:"host"`
    Path string `json:"path,omitempty"`
    TLS  bool   `json:"tls,omitempty"`
}

// WebAppStatus 定义 WebApp 的观察状态
type WebAppStatus struct {
    // 当前副本数
    Replicas int32 `json:"replicas"`

    // 就绪副本数
    ReadyReplicas int32 `json:"readyReplicas"`

    // 部署状态
    Phase WebAppPhase `json:"phase,omitempty"`

    // 状态条件
    Conditions []WebAppCondition `json:"conditions,omitempty"`

    // 服务 URL
    ServiceURL string `json:"serviceURL,omitempty"`

    // Ingress URL
    IngressURL string `json:"ingressURL,omitempty"`
}

// WebAppPhase 应用阶段
type WebAppPhase string

const (
    WebAppPhasePending   WebAppPhase = "Pending"
    WebAppPhaseRunning   WebAppPhase = "Running"
    WebAppPhaseFailed    WebAppPhase = "Failed"
    WebAppPhaseSucceeded WebAppPhase = "Succeeded"
)

// WebAppCondition 状态条件
type WebAppCondition struct {
    Type               WebAppConditionType `json:"type"`
    Status             metav1.ConditionStatus `json:"status"`
    LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
    Reason             string `json:"reason,omitempty"`
    Message            string `json:"message,omitempty"`
}

// WebAppConditionType 条件类型
type WebAppConditionType string

const (
    WebAppConditionAvailable   WebAppConditionType = "Available"
    WebAppConditionProgressing WebAppConditionType = "Progressing"
    WebAppConditionDegraded    WebAppConditionType = "Degraded"
)

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas
//+kubebuilder:printcolumn:name="Image",type="string",JSONPath=".spec.image"
//+kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas"
//+kubebuilder:printcolumn:name="Ready",type="integer",JSONPath=".status.readyReplicas"
//+kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

// WebApp 是 WebApp API 的 Schema
type WebApp struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   WebAppSpec   `json:"spec,omitempty"`
    Status WebAppStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// WebAppList 包含 WebApp 列表
type WebAppList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata,omitempty"`
    Items           []WebApp `json:"items"`
}

func init() {
    SchemeBuilder.Register(&WebApp{}, &WebAppList{})
}

验证和默认值 #

// api/v1/webapp_webhook.go
package v1

import (
    "fmt"
    "k8s.io/apimachinery/pkg/runtime"
    ctrl "sigs.k8s.io/controller-runtime"
    logf "sigs.k8s.io/controller-runtime/pkg/log"
    "sigs.k8s.io/controller-runtime/pkg/webhook"
)

var webapplog = logf.Log.WithName("webapp-resource")

func (r *WebApp) SetupWebhookWithManager(mgr ctrl.Manager) error {
    return ctrl.NewWebhookManagedBy(mgr).
        For(r).
        Complete()
}

//+kubebuilder:webhook:path=/mutate-apps-v1-webapp,mutating=true,failurePolicy=fail,sideEffects=None,groups=apps,resources=webapps,verbs=create;update,versions=v1,name=mwebapp.kb.io,admissionReviewVersions=v1

var _ webhook.Defaulter = &WebApp{}

// Default 实现 webhook.Defaulter,设置默认值
func (r *WebApp) Default() {
    webapplog.Info("default", "name", r.Name)

    // 设置默认副本数
    if r.Spec.Replicas == nil {
        replicas := int32(1)
        r.Spec.Replicas = &replicas
    }

    // 设置默认端口
    if r.Spec.Port == 0 {
        r.Spec.Port = 8080
    }

    // 设置默认服务类型
    if r.Spec.ServiceType == "" {
        r.Spec.ServiceType = ServiceTypeClusterIP
    }

    // 设置默认 Ingress 路径
    if r.Spec.Ingress != nil && r.Spec.Ingress.Path == "" {
        r.Spec.Ingress.Path = "/"
    }
}

//+kubebuilder:webhook:path=/validate-apps-v1-webapp,mutating=false,failurePolicy=fail,sideEffects=None,groups=apps,resources=webapps,verbs=create;update,versions=v1,name=vwebapp.kb.io,admissionReviewVersions=v1

var _ webhook.Validator = &WebApp{}

// ValidateCreate 实现 webhook.Validator,验证创建
func (r *WebApp) ValidateCreate() error {
    webapplog.Info("validate create", "name", r.Name)
    return r.validateWebApp()
}

// ValidateUpdate 实现 webhook.Validator,验证更新
func (r *WebApp) ValidateUpdate(old runtime.Object) error {
    webapplog.Info("validate update", "name", r.Name)
    return r.validateWebApp()
}

// ValidateDelete 实现 webhook.Validator,验证删除
func (r *WebApp) ValidateDelete() error {
    webapplog.Info("validate delete", "name", r.Name)
    return nil
}

// validateWebApp 验证 WebApp 规范
func (r *WebApp) validateWebApp() error {
    // 验证镜像不为空
    if r.Spec.Image == "" {
        return fmt.Errorf("image cannot be empty")
    }

    // 验证副本数范围
    if r.Spec.Replicas != nil && (*r.Spec.Replicas < 0 || *r.Spec.Replicas > 100) {
        return fmt.Errorf("replicas must be between 0 and 100")
    }

    // 验证端口范围
    if r.Spec.Port < 1 || r.Spec.Port > 65535 {
        return fmt.Errorf("port must be between 1 and 65535")
    }

    // 验证环境变量名称
    for _, env := range r.Spec.Env {
        if env.Name == "" {
            return fmt.Errorf("environment variable name cannot be empty")
        }
    }

    // 验证 Ingress 配置
    if r.Spec.Ingress != nil {
        if r.Spec.Ingress.Host == "" {
            return fmt.Errorf("ingress host cannot be empty")
        }
    }

    return nil
}

控制器实现 #

主控制器逻辑 #

// controllers/webapp_controller.go
package controllers

import (
    "context"
    "fmt"
    "time"

    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    networkingv1 "k8s.io/api/networking/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/types"
    "k8s.io/apimachinery/pkg/util/intstr"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    "sigs.k8s.io/controller-runtime/pkg/log"

    appsv1alpha1 "github.com/example/webapp-operator/api/v1"
)

// WebAppReconciler 协调 WebApp 资源
type WebAppReconciler struct {
    client.Client
    Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=apps.example.com,resources=webapps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps.example.com,resources=webapps/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=apps.example.com,resources=webapps/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete

// Reconcile 是主要的协调循环
func (r *WebAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    logger := log.FromContext(ctx)

    // 获取 WebApp 实例
    webapp := &appsv1alpha1.WebApp{}
    err := r.Get(ctx, req.NamespacedName, webapp)
    if err != nil {
        if errors.IsNotFound(err) {
            logger.Info("WebApp resource not found. Ignoring since object must be deleted")
            return ctrl.Result{}, nil
        }
        logger.Error(err, "Failed to get WebApp")
        return ctrl.Result{}, err
    }

    // 处理删除
    if webapp.DeletionTimestamp != nil {
        return r.handleDeletion(ctx, webapp)
    }

    // 添加 finalizer
    if !controllerutil.ContainsFinalizer(webapp, "webapp.finalizer") {
        controllerutil.AddFinalizer(webapp, "webapp.finalizer")
        return ctrl.Result{}, r.Update(ctx, webapp)
    }

    // 协调 Deployment
    if err := r.reconcileDeployment(ctx, webapp); err != nil {
        return ctrl.Result{}, err
    }

    // 协调 Service
    if err := r.reconcileService(ctx, webapp); err != nil {
        return ctrl.Result{}, err
    }

    // 协调 Ingress
    if webapp.Spec.Ingress != nil {
        if err := r.reconcileIngress(ctx, webapp); err != nil {
            return ctrl.Result{}, err
        }
    }

    // 更新状态
    if err := r.updateStatus(ctx, webapp); err != nil {
        return ctrl.Result{}, err
    }

    return ctrl.Result{RequeueAfter: time.Minute * 5}, nil
}

// 协调 Deployment
func (r *WebAppReconciler) reconcileDeployment(ctx context.Context, webapp *appsv1alpha1.WebApp) error {
    logger := log.FromContext(ctx)

    deployment := &appsv1.Deployment{}
    err := r.Get(ctx, types.NamespacedName{
        Name:      webapp.Name,
        Namespace: webapp.Namespace,
    }, deployment)

    if err != nil && errors.IsNotFound(err) {
        // 创建新的 Deployment
        deployment = r.deploymentForWebApp(webapp)
        if err := controllerutil.SetControllerReference(webapp, deployment, r.Scheme); err != nil {
            return err
        }

        logger.Info("Creating a new Deployment", "Deployment.Namespace", deployment.Namespace, "Deployment.Name", deployment.Name)
        return r.Create(ctx, deployment)
    } else if err != nil {
        logger.Error(err, "Failed to get Deployment")
        return err
    }

    // 更新现有 Deployment
    if r.needsDeploymentUpdate(webapp, deployment) {
        logger.Info("Updating Deployment", "Deployment.Namespace", deployment.Namespace, "Deployment.Name", deployment.Name)
        deployment = r.deploymentForWebApp(webapp)
        if err := controllerutil.SetControllerReference(webapp, deployment, r.Scheme); err != nil {
            return err
        }
        return r.Update(ctx, deployment)
    }

    return nil
}

// 为 WebApp 创建 Deployment
func (r *WebAppReconciler) deploymentForWebApp(webapp *appsv1alpha1.WebApp) *appsv1.Deployment {
    labels := map[string]string{
        "app":        webapp.Name,
        "managed-by": "webapp-operator",
    }

    // 构建环境变量
    var envVars []corev1.EnvVar
    for _, env := range webapp.Spec.Env {
        envVars = append(envVars, corev1.EnvVar{
            Name:  env.Name,
            Value: env.Value,
        })
    }

    // 构建资源需求
    resources := corev1.ResourceRequirements{}
    if webapp.Spec.Resources.Requests != nil {
        resources.Requests = corev1.ResourceList{}
        for k, v := range webapp.Spec.Resources.Requests {
            resources.Requests[corev1.ResourceName(k)] = resource.MustParse(v)
        }
    }
    if webapp.Spec.Resources.Limits != nil {
        resources.Limits = corev1.ResourceList{}
        for k, v := range webapp.Spec.Resources.Limits {
            resources.Limits[corev1.ResourceName(k)] = resource.MustParse(v)
        }
    }

    deployment := &appsv1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name:      webapp.Name,
            Namespace: webapp.Namespace,
            Labels:    labels,
        },
        Spec: appsv1.DeploymentSpec{
            Replicas: webapp.Spec.Replicas,
            Selector: &metav1.LabelSelector{
                MatchLabels: labels,
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: labels,
                },
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{
                        {
                            Name:  webapp.Name,
                            Image: webapp.Spec.Image,
                            Ports: []corev1.ContainerPort{
                                {
                                    ContainerPort: webapp.Spec.Port,
                                    Protocol:      corev1.ProtocolTCP,
                                },
                            },
                            Env:       envVars,
                            Resources: resources,
                            LivenessProbe: &corev1.Probe{
                                ProbeHandler: corev1.ProbeHandler{
                                    HTTPGet: &corev1.HTTPGetAction{
                                        Path: "/health",
                                        Port: intstr.FromInt(int(webapp.Spec.Port)),
                                    },
                                },
                                InitialDelaySeconds: 30,
                                PeriodSeconds:       10,
                            },
                            ReadinessProbe: &corev1.Probe{
                                ProbeHandler: corev1.ProbeHandler{
                                    HTTPGet: &corev1.HTTPGetAction{
                                        Path: "/ready",
                                        Port: intstr.FromInt(int(webapp.Spec.Port)),
                                    },
                                },
                                InitialDelaySeconds: 5,
                                PeriodSeconds:       5,
                            },
                        },
                    },
                },
            },
        },
    }

    return deployment
}

// 协调 Service
func (r *WebAppReconciler) reconcileService(ctx context.Context, webapp *appsv1alpha1.WebApp) error {
    logger := log.FromContext(ctx)

    service := &corev1.Service{}
    err := r.Get(ctx, types.NamespacedName{
        Name:      webapp.Name,
        Namespace: webapp.Namespace,
    }, service)

    if err != nil && errors.IsNotFound(err) {
        // 创建新的 Service
        service = r.serviceForWebApp(webapp)
        if err := controllerutil.SetControllerReference(webapp, service, r.Scheme); err != nil {
            return err
        }

        logger.Info("Creating a new Service", "Service.Namespace", service.Namespace, "Service.Name", service.Name)
        return r.Create(ctx, service)
    } else if err != nil {
        logger.Error(err, "Failed to get Service")
        return err
    }

    return nil
}

// 为 WebApp 创建 Service
func (r *WebAppReconciler) serviceForWebApp(webapp *appsv1alpha1.WebApp) *corev1.Service {
    labels := map[string]string{
        "app":        webapp.Name,
        "managed-by": "webapp-operator",
    }

    service := &corev1.Service{
        ObjectMeta: metav1.ObjectMeta{
            Name:      webapp.Name,
            Namespace: webapp.Namespace,
            Labels:    labels,
        },
        Spec: corev1.ServiceSpec{
            Selector: labels,
            Ports: []corev1.ServicePort{
                {
                    Port:       80,
                    TargetPort: intstr.FromInt(int(webapp.Spec.Port)),
                    Protocol:   corev1.ProtocolTCP,
                },
            },
            Type: corev1.ServiceType(webapp.Spec.ServiceType),
        },
    }

    return service
}

// 协调 Ingress
func (r *WebAppReconciler) reconcileIngress(ctx context.Context, webapp *appsv1alpha1.WebApp) error {
    logger := log.FromContext(ctx)

    ingress := &networkingv1.Ingress{}
    err := r.Get(ctx, types.NamespacedName{
        Name:      webapp.Name,
        Namespace: webapp.Namespace,
    }, ingress)

    if err != nil && errors.IsNotFound(err) {
        // 创建新的 Ingress
        ingress = r.ingressForWebApp(webapp)
        if err := controllerutil.SetControllerReference(webapp, ingress, r.Scheme); err != nil {
            return err
        }

        logger.Info("Creating a new Ingress", "Ingress.Namespace", ingress.Namespace, "Ingress.Name", ingress.Name)
        return r.Create(ctx, ingress)
    } else if err != nil {
        logger.Error(err, "Failed to get Ingress")
        return err
    }

    return nil
}

// 为 WebApp 创建 Ingress
func (r *WebAppReconciler) ingressForWebApp(webapp *appsv1alpha1.WebApp) *networkingv1.Ingress {
    labels := map[string]string{
        "app":        webapp.Name,
        "managed-by": "webapp-operator",
    }

    pathType := networkingv1.PathTypePrefix
    ingress := &networkingv1.Ingress{
        ObjectMeta: metav1.ObjectMeta{
            Name:      webapp.Name,
            Namespace: webapp.Namespace,
            Labels:    labels,
        },
        Spec: networkingv1.IngressSpec{
            Rules: []networkingv1.IngressRule{
                {
                    Host: webapp.Spec.Ingress.Host,
                    IngressRuleValue: networkingv1.IngressRuleValue{
                        HTTP: &networkingv1.HTTPIngressRuleValue{
                            Paths: []networkingv1.HTTPIngressPath{
                                {
                                    Path:     webapp.Spec.Ingress.Path,
                                    PathType: &pathType,
                                    Backend: networkingv1.IngressBackend{
                                        Service: &networkingv1.IngressServiceBackend{
                                            Name: webapp.Name,
                                            Port: networkingv1.ServiceBackendPort{
                                                Number: 80,
                                            },
                                        },
                                    },
                                },
                            },
                        },
                    },
                },
            },
        },
    }

    // 添加 TLS 配置
    if webapp.Spec.Ingress.TLS {
        ingress.Spec.TLS = []networkingv1.IngressTLS{
            {
                Hosts:      []string{webapp.Spec.Ingress.Host},
                SecretName: webapp.Name + "-tls",
            },
        }
    }

    return ingress
}

// 更新状态
func (r *WebAppReconciler) updateStatus(ctx context.Context, webapp *appsv1alpha1.WebApp) error {
    // 获取 Deployment 状态
    deployment := &appsv1.Deployment{}
    err := r.Get(ctx, types.NamespacedName{
        Name:      webapp.Name,
        Namespace: webapp.Namespace,
    }, deployment)
    if err != nil {
        return err
    }

    // 更新状态
    webapp.Status.Replicas = deployment.Status.Replicas
    webapp.Status.ReadyReplicas = deployment.Status.ReadyReplicas

    // 确定阶段
    if deployment.Status.ReadyReplicas == *webapp.Spec.Replicas {
        webapp.Status.Phase = appsv1alpha1.WebAppPhaseRunning
    } else if deployment.Status.Replicas > 0 {
        webapp.Status.Phase = appsv1alpha1.WebAppPhasePending
    } else {
        webapp.Status.Phase = appsv1alpha1.WebAppPhaseFailed
    }

    // 设置服务 URL
    webapp.Status.ServiceURL = fmt.Sprintf("http://%s.%s.svc.cluster.local", webapp.Name, webapp.Namespace)

    // 设置 Ingress URL
    if webapp.Spec.Ingress != nil {
        protocol := "http"
        if webapp.Spec.Ingress.TLS {
            protocol = "https"
        }
        webapp.Status.IngressURL = fmt.Sprintf("%s://%s%s", protocol, webapp.Spec.Ingress.Host, webapp.Spec.Ingress.Path)
    }

    return r.Status().Update(ctx, webapp)
}

// 处理删除
func (r *WebAppReconciler) handleDeletion(ctx context.Context, webapp *appsv1alpha1.WebApp) (ctrl.Result, error) {
    logger := log.FromContext(ctx)

    if controllerutil.ContainsFinalizer(webapp, "webapp.finalizer") {
        // 执行清理逻辑
        logger.Info("Performing cleanup for WebApp", "name", webapp.Name)

        // 移除 finalizer
        controllerutil.RemoveFinalizer(webapp, "webapp.finalizer")
        return ctrl.Result{}, r.Update(ctx, webapp)
    }

    return ctrl.Result{}, nil
}

// 检查是否需要更新 Deployment
func (r *WebAppReconciler) needsDeploymentUpdate(webapp *appsv1alpha1.WebApp, deployment *appsv1.Deployment) bool {
    // 检查副本数
    if *deployment.Spec.Replicas != *webapp.Spec.Replicas {
        return true
    }

    // 检查镜像
    if len(deployment.Spec.Template.Spec.Containers) > 0 {
        if deployment.Spec.Template.Spec.Containers[0].Image != webapp.Spec.Image {
            return true
        }
    }

    return false
}

// SetupWithManager 设置控制器管理器
func (r *WebAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&appsv1alpha1.WebApp{}).
        Owns(&appsv1.Deployment{}).
        Owns(&corev1.Service{}).
        Owns(&networkingv1.Ingress{}).
        Complete(r)
}

测试和部署 #

单元测试 #

// controllers/webapp_controller_test.go
package controllers

import (
    "context"
    "time"

    . "github.com/onsi/ginkgo/v2"
    . "github.com/onsi/gomega"
    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/types"

    appsv1alpha1 "github.com/example/webapp-operator/api/v1"
)

var _ = Describe("WebApp Controller", func() {
    Context("When creating a WebApp", func() {
        It("Should create a Deployment and Service", func() {
            ctx := context.Background()

            webapp := &appsv1alpha1.WebApp{
                ObjectMeta: metav1.ObjectMeta{
                    Name:      "test-webapp",
                    Namespace: "default",
                },
                Spec: appsv1alpha1.WebAppSpec{
                    Image:    "nginx:1.20",
                    Replicas: &[]int32{2}[0],
                    Port:     80,
                },
            }

            Expect(k8sClient.Create(ctx, webapp)).Should(Succeed())

            // 验证 Deployment 创建
            deployment := &appsv1.Deployment{}
            Eventually(func() error {
                return k8sClient.Get(ctx, types.NamespacedName{
                    Name:      "test-webapp",
                    Namespace: "default",
                }, deployment)
            }, time.Second*10, time.Millisecond*250).Should(Succeed())

            Expect(*deployment.Spec.Replicas).Should(Equal(int32(2)))
            Expect(deployment.Spec.Template.Spec.Containers[0].Image).Should(Equal("nginx:1.20"))

            // 验证 Service 创建
            service := &corev1.Service{}
            Eventually(func() error {
                return k8sClient.Get(ctx, types.NamespacedName{
                    Name:      "test-webapp",
                    Namespace: "default",
                }, service)
            }, time.Second*10, time.Millisecond*250).Should(Succeed())

            Expect(service.Spec.Ports[0].Port).Should(Equal(int32(80)))
        })
    })
})

部署配置 #

# config/samples/apps_v1_webapp.yaml
apiVersion: apps.example.com/v1
kind: WebApp
metadata:
  name: webapp-sample
spec:
  image: nginx:1.20
  replicas: 3
  port: 80
  env:
    - name: ENV
      value: production
  resources:
    requests:
      memory: "64Mi"
      cpu: "250m"
    limits:
      memory: "128Mi"
      cpu: "500m"
  serviceType: ClusterIP
  ingress:
    host: webapp.example.com
    path: /
    tls: true

Makefile 构建和部署 #

# Makefile
# 构建和推送镜像
docker-build:
	docker build -t webapp-operator:latest .

docker-push:
	docker push webapp-operator:latest

# 安装 CRD
install:
	kubectl apply -f config/crd/bases

# 卸载 CRD
uninstall:
	kubectl delete -f config/crd/bases

# 部署控制器
deploy:
	kubectl apply -f config/rbac
	kubectl apply -f config/manager

# 运行测试
test:
	go test ./... -coverprofile cover.out

# 生成代码
generate:
	controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."

# 生成 manifests
manifests:
	controller-gen rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases

通过掌握 Operator 开发基础,您可以创建功能强大的 Kubernetes 扩展,自动化复杂应用的管理和运维任务。这为构建云原生应用生态系统提供了强大的工具。