5.3.4 CRD 自定义资源

5.3.4 CRD 自定义资源 #

自定义资源定义(Custom Resource Definition,CRD)是 Kubernetes 的扩展机制,允许用户定义新的 API 资源类型。通过 CRD,我们可以扩展 Kubernetes API,创建特定于应用程序的资源类型,并使用 Kubernetes 的声明式 API 来管理这些资源。

CRD 基础概念 #

什么是 CRD #

CRD 是一种 Kubernetes 资源,用于定义新的自定义资源类型。它告诉 Kubernetes API Server 如何存储和验证自定义资源的数据结构。

CRD 的组成部分 #

  1. Group:API 组,用于组织相关的资源类型
  2. Version:API 版本,支持版本演进
  3. Kind:资源类型名称
  4. Schema:资源的数据结构定义
  5. Scope:资源作用域(Namespaced 或 Cluster)

创建基础 CRD #

简单的数据库 CRD #

# database-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: databases.storage.example.com
spec:
  group: storage.example.com
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                engine:
                  type: string
                  enum: ["mysql", "postgresql", "mongodb"]
                version:
                  type: string
                storage:
                  type: string
                  pattern: "^[0-9]+[KMGT]i$"
                replicas:
                  type: integer
                  minimum: 1
                  maximum: 10
                backup:
                  type: object
                  properties:
                    enabled:
                      type: boolean
                    schedule:
                      type: string
                    retention:
                      type: string
              required:
                - engine
                - version
                - storage
            status:
              type: object
              properties:
                phase:
                  type: string
                  enum: ["Pending", "Running", "Failed"]
                conditions:
                  type: array
                  items:
                    type: object
                    properties:
                      type:
                        type: string
                      status:
                        type: string
                      lastTransitionTime:
                        type: string
                        format: date-time
                      reason:
                        type: string
                      message:
                        type: string
                endpoints:
                  type: object
                  properties:
                    primary:
                      type: string
                    readonly:
                      type: array
                      items:
                        type: string
      additionalPrinterColumns:
        - name: Engine
          type: string
          jsonPath: .spec.engine
        - name: Version
          type: string
          jsonPath: .spec.version
        - name: Storage
          type: string
          jsonPath: .spec.storage
        - name: Replicas
          type: integer
          jsonPath: .spec.replicas
        - name: Phase
          type: string
          jsonPath: .status.phase
        - name: Age
          type: date
          jsonPath: .metadata.creationTimestamp
      subresources:
        status: {}
        scale:
          specReplicasPath: .spec.replicas
          statusReplicasPath: .status.replicas
  scope: Namespaced
  names:
    plural: databases
    singular: database
    kind: Database
    shortNames:
      - db

应用 CRD #

# 创建 CRD
kubectl apply -f database-crd.yaml

# 验证 CRD 创建
kubectl get crd databases.storage.example.com

# 查看 CRD 详情
kubectl describe crd databases.storage.example.com

Go 类型定义 #

数据库资源类型 #

// api/v1/database_types.go
package v1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// DatabaseEngine 数据库引擎类型
type DatabaseEngine string

const (
    DatabaseEngineMySQL      DatabaseEngine = "mysql"
    DatabaseEnginePostgreSQL DatabaseEngine = "postgresql"
    DatabaseEngineMongoDB    DatabaseEngine = "mongodb"
)

// BackupSpec 备份配置
type BackupSpec struct {
    // 是否启用备份
    Enabled bool `json:"enabled"`

    // 备份调度(Cron 格式)
    Schedule string `json:"schedule,omitempty"`

    // 备份保留时间
    Retention string `json:"retention,omitempty"`

    // 备份存储位置
    StorageClass string `json:"storageClass,omitempty"`

    // 备份大小限制
    StorageSize string `json:"storageSize,omitempty"`
}

// DatabaseSpec 定义数据库的期望状态
type DatabaseSpec struct {
    // 数据库引擎
    Engine DatabaseEngine `json:"engine"`

    // 数据库版本
    Version string `json:"version"`

    // 存储大小
    Storage string `json:"storage"`

    // 副本数量
    Replicas int32 `json:"replicas,omitempty"`

    // 数据库配置参数
    Config map[string]string `json:"config,omitempty"`

    // 备份配置
    Backup *BackupSpec `json:"backup,omitempty"`

    // 资源限制
    Resources ResourceRequirements `json:"resources,omitempty"`

    // 网络配置
    Network NetworkSpec `json:"network,omitempty"`

    // 安全配置
    Security SecuritySpec `json:"security,omitempty"`
}

// ResourceRequirements 资源需求
type ResourceRequirements struct {
    Requests ResourceList `json:"requests,omitempty"`
    Limits   ResourceList `json:"limits,omitempty"`
}

// ResourceList 资源列表
type ResourceList map[string]string

// NetworkSpec 网络配置
type NetworkSpec struct {
    // 服务类型
    ServiceType string `json:"serviceType,omitempty"`

    // 端口配置
    Port int32 `json:"port,omitempty"`

    // 是否启用 SSL
    SSL bool `json:"ssl,omitempty"`
}

// SecuritySpec 安全配置
type SecuritySpec struct {
    // 认证方式
    AuthMethod string `json:"authMethod,omitempty"`

    // 用户凭据 Secret
    CredentialsSecret string `json:"credentialsSecret,omitempty"`

    // TLS 配置
    TLS *TLSSpec `json:"tls,omitempty"`
}

// TLSSpec TLS 配置
type TLSSpec struct {
    Enabled    bool   `json:"enabled"`
    SecretName string `json:"secretName,omitempty"`
}

// DatabasePhase 数据库阶段
type DatabasePhase string

const (
    DatabasePhasePending   DatabasePhase = "Pending"
    DatabasePhaseRunning   DatabasePhase = "Running"
    DatabasePhaseFailed    DatabasePhase = "Failed"
    DatabasePhaseUpgrading DatabasePhase = "Upgrading"
    DatabasePhaseBackup    DatabasePhase = "Backup"
)

// DatabaseConditionType 条件类型
type DatabaseConditionType string

const (
    DatabaseConditionReady       DatabaseConditionType = "Ready"
    DatabaseConditionProgressing DatabaseConditionType = "Progressing"
    DatabaseConditionDegraded    DatabaseConditionType = "Degraded"
    DatabaseConditionBackupReady DatabaseConditionType = "BackupReady"
)

// DatabaseCondition 数据库条件
type DatabaseCondition struct {
    Type               DatabaseConditionType  `json:"type"`
    Status             metav1.ConditionStatus `json:"status"`
    LastTransitionTime metav1.Time            `json:"lastTransitionTime,omitempty"`
    Reason             string                 `json:"reason,omitempty"`
    Message            string                 `json:"message,omitempty"`
}

// DatabaseEndpoints 数据库端点
type DatabaseEndpoints struct {
    // 主数据库端点
    Primary string `json:"primary,omitempty"`

    // 只读副本端点
    Readonly []string `json:"readonly,omitempty"`

    // 管理端点
    Management string `json:"management,omitempty"`
}

// DatabaseStatus 定义数据库的观察状态
type DatabaseStatus struct {
    // 当前阶段
    Phase DatabasePhase `json:"phase,omitempty"`

    // 状态条件
    Conditions []DatabaseCondition `json:"conditions,omitempty"`

    // 当前副本数
    Replicas int32 `json:"replicas"`

    // 就绪副本数
    ReadyReplicas int32 `json:"readyReplicas"`

    // 数据库端点
    Endpoints DatabaseEndpoints `json:"endpoints,omitempty"`

    // 存储使用情况
    StorageUsed string `json:"storageUsed,omitempty"`

    // 最后备份时间
    LastBackupTime *metav1.Time `json:"lastBackupTime,omitempty"`

    // 观察到的版本
    ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas
//+kubebuilder:printcolumn:name="Engine",type="string",JSONPath=".spec.engine"
//+kubebuilder:printcolumn:name="Version",type="string",JSONPath=".spec.version"
//+kubebuilder:printcolumn:name="Storage",type="string",JSONPath=".spec.storage"
//+kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas"
//+kubebuilder:printcolumn:name="Ready",type="integer",JSONPath=".status.readyReplicas"
//+kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

// Database 是数据库 API 的 Schema
type Database struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   DatabaseSpec   `json:"spec,omitempty"`
    Status DatabaseStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// DatabaseList 包含数据库列表
type DatabaseList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata,omitempty"`
    Items           []Database `json:"items"`
}

func init() {
    SchemeBuilder.Register(&Database{}, &DatabaseList{})
}

高级 CRD 特性 #

版本转换 #

// api/v1/database_conversion.go
package v1

import (
    "fmt"

    "sigs.k8s.io/controller-runtime/pkg/conversion"

    v1beta1 "github.com/example/database-operator/api/v1beta1"
)

// ConvertTo 转换到 Hub 版本
func (src *Database) ConvertTo(dstRaw conversion.Hub) error {
    dst := dstRaw.(*v1beta1.Database)

    // 转换 metadata
    dst.ObjectMeta = src.ObjectMeta

    // 转换 spec
    dst.Spec.Engine = string(src.Spec.Engine)
    dst.Spec.Version = src.Spec.Version
    dst.Spec.Storage = src.Spec.Storage
    dst.Spec.Replicas = src.Spec.Replicas

    // 转换配置
    if src.Spec.Config != nil {
        dst.Spec.Config = make(map[string]string)
        for k, v := range src.Spec.Config {
            dst.Spec.Config[k] = v
        }
    }

    // 转换备份配置
    if src.Spec.Backup != nil {
        dst.Spec.Backup = &v1beta1.BackupSpec{
            Enabled:   src.Spec.Backup.Enabled,
            Schedule:  src.Spec.Backup.Schedule,
            Retention: src.Spec.Backup.Retention,
        }
    }

    // 转换状态
    dst.Status.Phase = string(src.Status.Phase)
    dst.Status.Replicas = src.Status.Replicas
    dst.Status.ReadyReplicas = src.Status.ReadyReplicas

    return nil
}

// ConvertFrom 从 Hub 版本转换
func (dst *Database) ConvertFrom(srcRaw conversion.Hub) error {
    src := srcRaw.(*v1beta1.Database)

    // 转换 metadata
    dst.ObjectMeta = src.ObjectMeta

    // 转换 spec
    dst.Spec.Engine = DatabaseEngine(src.Spec.Engine)
    dst.Spec.Version = src.Spec.Version
    dst.Spec.Storage = src.Spec.Storage
    dst.Spec.Replicas = src.Spec.Replicas

    // 转换配置
    if src.Spec.Config != nil {
        dst.Spec.Config = make(map[string]string)
        for k, v := range src.Spec.Config {
            dst.Spec.Config[k] = v
        }
    }

    // 转换备份配置
    if src.Spec.Backup != nil {
        dst.Spec.Backup = &BackupSpec{
            Enabled:   src.Spec.Backup.Enabled,
            Schedule:  src.Spec.Backup.Schedule,
            Retention: src.Spec.Backup.Retention,
        }
    }

    // 转换状态
    dst.Status.Phase = DatabasePhase(src.Status.Phase)
    dst.Status.Replicas = src.Status.Replicas
    dst.Status.ReadyReplicas = src.Status.ReadyReplicas

    return nil
}

Webhook 验证 #

// api/v1/database_webhook.go
package v1

import (
    "fmt"
    "regexp"
    "strconv"
    "strings"

    "k8s.io/apimachinery/pkg/runtime"
    ctrl "sigs.k8s.io/controller-runtime"
    logf "sigs.k8s.io/controller-runtime/pkg/log"
    "sigs.k8s.io/controller-runtime/pkg/webhook"
)

var databaselog = logf.Log.WithName("database-resource")

func (r *Database) SetupWebhookWithManager(mgr ctrl.Manager) error {
    return ctrl.NewWebhookManagedBy(mgr).
        For(r).
        Complete()
}

//+kubebuilder:webhook:path=/mutate-storage-v1-database,mutating=true,failurePolicy=fail,sideEffects=None,groups=storage.example.com,resources=databases,verbs=create;update,versions=v1,name=mdatabase.kb.io,admissionReviewVersions=v1

var _ webhook.Defaulter = &Database{}

// Default 实现 webhook.Defaulter,设置默认值
func (r *Database) Default() {
    databaselog.Info("default", "name", r.Name)

    // 设置默认副本数
    if r.Spec.Replicas == 0 {
        r.Spec.Replicas = 1
    }

    // 设置默认网络配置
    if r.Spec.Network.ServiceType == "" {
        r.Spec.Network.ServiceType = "ClusterIP"
    }

    // 根据引擎设置默认端口
    if r.Spec.Network.Port == 0 {
        switch r.Spec.Engine {
        case DatabaseEngineMySQL:
            r.Spec.Network.Port = 3306
        case DatabaseEnginePostgreSQL:
            r.Spec.Network.Port = 5432
        case DatabaseEngineMongoDB:
            r.Spec.Network.Port = 27017
        }
    }

    // 设置默认资源请求
    if r.Spec.Resources.Requests == nil {
        r.Spec.Resources.Requests = ResourceList{
            "memory": "512Mi",
            "cpu":    "500m",
        }
    }

    // 设置默认备份配置
    if r.Spec.Backup == nil {
        r.Spec.Backup = &BackupSpec{
            Enabled:   false,
            Schedule:  "0 2 * * *", // 每天凌晨2点
            Retention: "7d",
        }
    }
}

//+kubebuilder:webhook:path=/validate-storage-v1-database,mutating=false,failurePolicy=fail,sideEffects=None,groups=storage.example.com,resources=databases,verbs=create;update,versions=v1,name=vdatabase.kb.io,admissionReviewVersions=v1

var _ webhook.Validator = &Database{}

// ValidateCreate 实现 webhook.Validator,验证创建
func (r *Database) ValidateCreate() error {
    databaselog.Info("validate create", "name", r.Name)
    return r.validateDatabase()
}

// ValidateUpdate 实现 webhook.Validator,验证更新
func (r *Database) ValidateUpdate(old runtime.Object) error {
    databaselog.Info("validate update", "name", r.Name)

    oldDB := old.(*Database)

    // 验证不可变字段
    if r.Spec.Engine != oldDB.Spec.Engine {
        return fmt.Errorf("database engine cannot be changed")
    }

    // 验证版本升级
    if err := r.validateVersionUpgrade(oldDB); err != nil {
        return err
    }

    return r.validateDatabase()
}

// ValidateDelete 实现 webhook.Validator,验证删除
func (r *Database) ValidateDelete() error {
    databaselog.Info("validate delete", "name", r.Name)

    // 检查是否有备份正在进行
    if r.Status.Phase == DatabasePhaseBackup {
        return fmt.Errorf("cannot delete database while backup is in progress")
    }

    return nil
}

// validateDatabase 验证数据库规范
func (r *Database) validateDatabase() error {
    // 验证引擎
    if err := r.validateEngine(); err != nil {
        return err
    }

    // 验证版本
    if err := r.validateVersion(); err != nil {
        return err
    }

    // 验证存储
    if err := r.validateStorage(); err != nil {
        return err
    }

    // 验证副本数
    if err := r.validateReplicas(); err != nil {
        return err
    }

    // 验证备份配置
    if err := r.validateBackup(); err != nil {
        return err
    }

    // 验证网络配置
    if err := r.validateNetwork(); err != nil {
        return err
    }

    return nil
}

// validateEngine 验证数据库引擎
func (r *Database) validateEngine() error {
    switch r.Spec.Engine {
    case DatabaseEngineMySQL, DatabaseEnginePostgreSQL, DatabaseEngineMongoDB:
        return nil
    default:
        return fmt.Errorf("unsupported database engine: %s", r.Spec.Engine)
    }
}

// validateVersion 验证版本格式
func (r *Database) validateVersion() error {
    if r.Spec.Version == "" {
        return fmt.Errorf("version cannot be empty")
    }

    // 验证版本格式(简单的语义版本检查)
    versionRegex := regexp.MustCompile(`^\d+\.\d+(\.\d+)?$`)
    if !versionRegex.MatchString(r.Spec.Version) {
        return fmt.Errorf("invalid version format: %s", r.Spec.Version)
    }

    return nil
}

// validateStorage 验证存储配置
func (r *Database) validateStorage() error {
    if r.Spec.Storage == "" {
        return fmt.Errorf("storage cannot be empty")
    }

    // 验证存储格式
    storageRegex := regexp.MustCompile(`^\d+[KMGT]i$`)
    if !storageRegex.MatchString(r.Spec.Storage) {
        return fmt.Errorf("invalid storage format: %s", r.Spec.Storage)
    }

    // 验证最小存储大小
    if err := r.validateMinimumStorage(); err != nil {
        return err
    }

    return nil
}

// validateMinimumStorage 验证最小存储大小
func (r *Database) validateMinimumStorage() error {
    storage := r.Spec.Storage

    // 提取数字和单位
    var size int64
    var unit string

    if strings.HasSuffix(storage, "Gi") {
        sizeStr := strings.TrimSuffix(storage, "Gi")
        var err error
        size, err = strconv.ParseInt(sizeStr, 10, 64)
        if err != nil {
            return fmt.Errorf("invalid storage size: %s", storage)
        }
        unit = "Gi"
    } else if strings.HasSuffix(storage, "Mi") {
        sizeStr := strings.TrimSuffix(storage, "Mi")
        var err error
        size, err = strconv.ParseInt(sizeStr, 10, 64)
        if err != nil {
            return fmt.Errorf("invalid storage size: %s", storage)
        }
        unit = "Mi"
    }

    // 检查最小存储要求
    switch r.Spec.Engine {
    case DatabaseEngineMySQL:
        if unit == "Mi" && size < 1024 {
            return fmt.Errorf("MySQL requires at least 1Gi storage")
        }
        if unit == "Gi" && size < 1 {
            return fmt.Errorf("MySQL requires at least 1Gi storage")
        }
    case DatabaseEnginePostgreSQL:
        if unit == "Mi" && size < 512 {
            return fmt.Errorf("PostgreSQL requires at least 512Mi storage")
        }
    case DatabaseEngineMongoDB:
        if unit == "Mi" && size < 2048 {
            return fmt.Errorf("MongoDB requires at least 2Gi storage")
        }
        if unit == "Gi" && size < 2 {
            return fmt.Errorf("MongoDB requires at least 2Gi storage")
        }
    }

    return nil
}

// validateReplicas 验证副本数
func (r *Database) validateReplicas() error {
    if r.Spec.Replicas < 1 {
        return fmt.Errorf("replicas must be at least 1")
    }

    if r.Spec.Replicas > 10 {
        return fmt.Errorf("replicas cannot exceed 10")
    }

    return nil
}

// validateBackup 验证备份配置
func (r *Database) validateBackup() error {
    if r.Spec.Backup == nil {
        return nil
    }

    if r.Spec.Backup.Enabled {
        // 验证调度格式(简单的 Cron 验证)
        if r.Spec.Backup.Schedule == "" {
            return fmt.Errorf("backup schedule cannot be empty when backup is enabled")
        }

        // 验证保留时间格式
        if r.Spec.Backup.Retention == "" {
            return fmt.Errorf("backup retention cannot be empty when backup is enabled")
        }

        retentionRegex := regexp.MustCompile(`^\d+[dwhm]$`)
        if !retentionRegex.MatchString(r.Spec.Backup.Retention) {
            return fmt.Errorf("invalid backup retention format: %s", r.Spec.Backup.Retention)
        }
    }

    return nil
}

// validateNetwork 验证网络配置
func (r *Database) validateNetwork() error {
    // 验证服务类型
    validServiceTypes := []string{"ClusterIP", "NodePort", "LoadBalancer"}
    valid := false
    for _, t := range validServiceTypes {
        if r.Spec.Network.ServiceType == t {
            valid = true
            break
        }
    }
    if !valid {
        return fmt.Errorf("invalid service type: %s", r.Spec.Network.ServiceType)
    }

    // 验证端口范围
    if r.Spec.Network.Port < 1 || r.Spec.Network.Port > 65535 {
        return fmt.Errorf("port must be between 1 and 65535")
    }

    return nil
}

// validateVersionUpgrade 验证版本升级
func (r *Database) validateVersionUpgrade(old *Database) error {
    if r.Spec.Version == old.Spec.Version {
        return nil
    }

    // 解析版本号
    newVersion, err := parseVersion(r.Spec.Version)
    if err != nil {
        return fmt.Errorf("invalid new version: %v", err)
    }

    oldVersion, err := parseVersion(old.Spec.Version)
    if err != nil {
        return fmt.Errorf("invalid old version: %v", err)
    }

    // 检查是否为向前升级
    if !isVersionUpgrade(oldVersion, newVersion) {
        return fmt.Errorf("version downgrade is not allowed: %s -> %s", old.Spec.Version, r.Spec.Version)
    }

    return nil
}

// parseVersion 解析版本号
func parseVersion(version string) ([]int, error) {
    parts := strings.Split(version, ".")
    var result []int

    for _, part := range parts {
        num, err := strconv.Atoi(part)
        if err != nil {
            return nil, err
        }
        result = append(result, num)
    }

    return result, nil
}

// isVersionUpgrade 检查是否为版本升级
func isVersionUpgrade(old, new []int) bool {
    for i := 0; i < len(old) && i < len(new); i++ {
        if new[i] > old[i] {
            return true
        } else if new[i] < old[i] {
            return false
        }
    }

    // 如果前面的版本号都相同,检查长度
    return len(new) >= len(old)
}

控制器实现 #

数据库控制器 #

// controllers/database_controller.go
package controllers

import (
    "context"
    "fmt"
    "time"

    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    "k8s.io/apimachinery/pkg/api/resource"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/types"
    "k8s.io/apimachinery/pkg/util/intstr"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    "sigs.k8s.io/controller-runtime/pkg/log"

    storagev1 "github.com/example/database-operator/api/v1"
)

// DatabaseReconciler 协调数据库资源
type DatabaseReconciler struct {
    client.Client
    Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=storage.example.com,resources=databases,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=storage.example.com,resources=databases/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=storage.example.com,resources=databases/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;delete

// Reconcile 主要的协调循环
func (r *DatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    logger := log.FromContext(ctx)

    // 获取数据库实例
    database := &storagev1.Database{}
    err := r.Get(ctx, req.NamespacedName, database)
    if err != nil {
        if errors.IsNotFound(err) {
            logger.Info("Database resource not found. Ignoring since object must be deleted")
            return ctrl.Result{}, nil
        }
        logger.Error(err, "Failed to get Database")
        return ctrl.Result{}, err
    }

    // 处理删除
    if database.DeletionTimestamp != nil {
        return r.handleDeletion(ctx, database)
    }

    // 添加 finalizer
    if !controllerutil.ContainsFinalizer(database, "database.finalizer") {
        controllerutil.AddFinalizer(database, "database.finalizer")
        return ctrl.Result{}, r.Update(ctx, database)
    }

    // 协调 ConfigMap
    if err := r.reconcileConfigMap(ctx, database); err != nil {
        return ctrl.Result{}, err
    }

    // 协调 StatefulSet
    if err := r.reconcileStatefulSet(ctx, database); err != nil {
        return ctrl.Result{}, err
    }

    // 协调 Service
    if err := r.reconcileService(ctx, database); err != nil {
        return ctrl.Result{}, err
    }

    // 协调备份
    if database.Spec.Backup != nil && database.Spec.Backup.Enabled {
        if err := r.reconcileBackup(ctx, database); err != nil {
            return ctrl.Result{}, err
        }
    }

    // 更新状态
    if err := r.updateStatus(ctx, database); err != nil {
        return ctrl.Result{}, err
    }

    return ctrl.Result{RequeueAfter: time.Minute * 5}, nil
}

// 协调 ConfigMap
func (r *DatabaseReconciler) reconcileConfigMap(ctx context.Context, database *storagev1.Database) error {
    configMap := &corev1.ConfigMap{}
    err := r.Get(ctx, types.NamespacedName{
        Name:      database.Name + "-config",
        Namespace: database.Namespace,
    }, configMap)

    if err != nil && errors.IsNotFound(err) {
        configMap = r.configMapForDatabase(database)
        if err := controllerutil.SetControllerReference(database, configMap, r.Scheme); err != nil {
            return err
        }
        return r.Create(ctx, configMap)
    } else if err != nil {
        return err
    }

    return nil
}

// 为数据库创建 ConfigMap
func (r *DatabaseReconciler) configMapForDatabase(database *storagev1.Database) *corev1.ConfigMap {
    labels := map[string]string{
        "app":        database.Name,
        "engine":     string(database.Spec.Engine),
        "managed-by": "database-operator",
    }

    data := make(map[string]string)

    // 根据引擎类型生成配置
    switch database.Spec.Engine {
    case storagev1.DatabaseEngineMySQL:
        data["my.cnf"] = r.generateMySQLConfig(database)
    case storagev1.DatabaseEnginePostgreSQL:
        data["postgresql.conf"] = r.generatePostgreSQLConfig(database)
    case storagev1.DatabaseEngineMongoDB:
        data["mongod.conf"] = r.generateMongoDBConfig(database)
    }

    // 添加用户自定义配置
    for k, v := range database.Spec.Config {
        data[k] = v
    }

    return &corev1.ConfigMap{
        ObjectMeta: metav1.ObjectMeta{
            Name:      database.Name + "-config",
            Namespace: database.Namespace,
            Labels:    labels,
        },
        Data: data,
    }
}

// 生成 MySQL 配置
func (r *DatabaseReconciler) generateMySQLConfig(database *storagev1.Database) string {
    config := `[mysqld]
port = %d
bind-address = 0.0.0.0
default-storage-engine = InnoDB
innodb_buffer_pool_size = 256M
max_connections = 100
`
    return fmt.Sprintf(config, database.Spec.Network.Port)
}

// 生成 PostgreSQL 配置
func (r *DatabaseReconciler) generatePostgreSQLConfig(database *storagev1.Database) string {
    config := `port = %d
listen_addresses = '*'
shared_buffers = 256MB
max_connections = 100
`
    return fmt.Sprintf(config, database.Spec.Network.Port)
}

// 生成 MongoDB 配置
func (r *DatabaseReconciler) generateMongoDBConfig(database *storagev1.Database) string {
    config := `net:
  port: %d
  bindIp: 0.0.0.0
storage:
  dbPath: /data/db
  journal:
    enabled: true
`
    return fmt.Sprintf(config, database.Spec.Network.Port)
}

// 协调 StatefulSet
func (r *DatabaseReconciler) reconcileStatefulSet(ctx context.Context, database *storagev1.Database) error {
    statefulSet := &appsv1.StatefulSet{}
    err := r.Get(ctx, types.NamespacedName{
        Name:      database.Name,
        Namespace: database.Namespace,
    }, statefulSet)

    if err != nil && errors.IsNotFound(err) {
        statefulSet = r.statefulSetForDatabase(database)
        if err := controllerutil.SetControllerReference(database, statefulSet, r.Scheme); err != nil {
            return err
        }
        return r.Create(ctx, statefulSet)
    } else if err != nil {
        return err
    }

    // 检查是否需要更新
    if r.needsStatefulSetUpdate(database, statefulSet) {
        statefulSet = r.statefulSetForDatabase(database)
        if err := controllerutil.SetControllerReference(database, statefulSet, r.Scheme); err != nil {
            return err
        }
        return r.Update(ctx, statefulSet)
    }

    return nil
}

// 为数据库创建 StatefulSet
func (r *DatabaseReconciler) statefulSetForDatabase(database *storagev1.Database) *appsv1.StatefulSet {
    labels := map[string]string{
        "app":        database.Name,
        "engine":     string(database.Spec.Engine),
        "managed-by": "database-operator",
    }

    // 构建容器镜像
    image := r.getImageForEngine(database.Spec.Engine, database.Spec.Version)

    // 构建环境变量
    envVars := r.getEnvVarsForEngine(database)

    // 构建资源需求
    resources := corev1.ResourceRequirements{}
    if database.Spec.Resources.Requests != nil {
        resources.Requests = corev1.ResourceList{}
        for k, v := range database.Spec.Resources.Requests {
            resources.Requests[corev1.ResourceName(k)] = resource.MustParse(v)
        }
    }
    if database.Spec.Resources.Limits != nil {
        resources.Limits = corev1.ResourceList{}
        for k, v := range database.Spec.Resources.Limits {
            resources.Limits[corev1.ResourceName(k)] = resource.MustParse(v)
        }
    }

    // 构建卷挂载
    volumeMounts := []corev1.VolumeMount{
        {
            Name:      "data",
            MountPath: r.getDataPathForEngine(database.Spec.Engine),
        },
        {
            Name:      "config",
            MountPath: r.getConfigPathForEngine(database.Spec.Engine),
        },
    }

    // 构建卷
    volumes := []corev1.Volume{
        {
            Name: "config",
            VolumeSource: corev1.VolumeSource{
                ConfigMap: &corev1.ConfigMapVolumeSource{
                    LocalObjectReference: corev1.LocalObjectReference{
                        Name: database.Name + "-config",
                    },
                },
            },
        },
    }

    // 构建 PVC 模板
    pvcTemplate := corev1.PersistentVolumeClaim{
        ObjectMeta: metav1.ObjectMeta{
            Name: "data",
        },
        Spec: corev1.PersistentVolumeClaimSpec{
            AccessModes: []corev1.PersistentVolumeAccessMode{
                corev1.ReadWriteOnce,
            },
            Resources: corev1.ResourceRequirements{
                Requests: corev1.ResourceList{
                    corev1.ResourceStorage: resource.MustParse(database.Spec.Storage),
                },
            },
        },
    }

    statefulSet := &appsv1.StatefulSet{
        ObjectMeta: metav1.ObjectMeta{
            Name:      database.Name,
            Namespace: database.Namespace,
            Labels:    labels,
        },
        Spec: appsv1.StatefulSetSpec{
            Replicas:    &database.Spec.Replicas,
            ServiceName: database.Name,
            Selector: &metav1.LabelSelector{
                MatchLabels: labels,
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: labels,
                },
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{
                        {
                            Name:         database.Name,
                            Image:        image,
                            Ports:        r.getContainerPortsForEngine(database),
                            Env:          envVars,
                            Resources:    resources,
                            VolumeMounts: volumeMounts,
                            LivenessProbe: &corev1.Probe{
                                ProbeHandler: r.getLivenessProbeForEngine(database),
                                InitialDelaySeconds: 30,
                                PeriodSeconds:       10,
                            },
                            ReadinessProbe: &corev1.Probe{
                                ProbeHandler: r.getReadinessProbeForEngine(database),
                                InitialDelaySeconds: 5,
                                PeriodSeconds:       5,
                            },
                        },
                    },
                    Volumes: volumes,
                },
            },
            VolumeClaimTemplates: []corev1.PersistentVolumeClaim{pvcTemplate},
        },
    }

    return statefulSet
}

// 获取引擎对应的镜像
func (r *DatabaseReconciler) getImageForEngine(engine storagev1.DatabaseEngine, version string) string {
    switch engine {
    case storagev1.DatabaseEngineMySQL:
        return fmt.Sprintf("mysql:%s", version)
    case storagev1.DatabaseEnginePostgreSQL:
        return fmt.Sprintf("postgres:%s", version)
    case storagev1.DatabaseEngineMongoDB:
        return fmt.Sprintf("mongo:%s", version)
    default:
        return ""
    }
}

// 获取引擎对应的环境变量
func (r *DatabaseReconciler) getEnvVarsForEngine(database *storagev1.Database) []corev1.EnvVar {
    var envVars []corev1.EnvVar

    switch database.Spec.Engine {
    case storagev1.DatabaseEngineMySQL:
        envVars = []corev1.EnvVar{
            {Name: "MYSQL_ROOT_PASSWORD", Value: "rootpassword"},
            {Name: "MYSQL_DATABASE", Value: database.Name},
        }
    case storagev1.DatabaseEnginePostgreSQL:
        envVars = []corev1.EnvVar{
            {Name: "POSTGRES_PASSWORD", Value: "password"},
            {Name: "POSTGRES_DB", Value: database.Name},
        }
    case storagev1.DatabaseEngineMongoDB:
        envVars = []corev1.EnvVar{
            {Name: "MONGO_INITDB_ROOT_USERNAME", Value: "admin"},
            {Name: "MONGO_INITDB_ROOT_PASSWORD", Value: "password"},
        }
    }

    return envVars
}

// 获取引擎对应的数据路径
func (r *DatabaseReconciler) getDataPathForEngine(engine storagev1.DatabaseEngine) string {
    switch engine {
    case storagev1.DatabaseEngineMySQL:
        return "/var/lib/mysql"
    case storagev1.DatabaseEnginePostgreSQL:
        return "/var/lib/postgresql/data"
    case storagev1.DatabaseEngineMongoDB:
        return "/data/db"
    default:
        return "/data"
    }
}

// 获取引擎对应的配置路径
func (r *DatabaseReconciler) getConfigPathForEngine(engine storagev1.DatabaseEngine) string {
    switch engine {
    case storagev1.DatabaseEngineMySQL:
        return "/etc/mysql/conf.d"
    case storagev1.DatabaseEnginePostgreSQL:
        return "/etc/postgresql"
    case storagev1.DatabaseEngineMongoDB:
        return "/etc/mongo"
    default:
        return "/etc/config"
    }
}

// 获取引擎对应的容器端口
func (r *DatabaseReconciler) getContainerPortsForEngine(database *storagev1.Database) []corev1.ContainerPort {
    return []corev1.ContainerPort{
        {
            ContainerPort: database.Spec.Network.Port,
            Protocol:      corev1.ProtocolTCP,
        },
    }
}

// 获取引擎对应的存活探针
func (r *DatabaseReconciler) getLivenessProbeForEngine(database *storagev1.Database) corev1.ProbeHandler {
    switch database.Spec.Engine {
    case storagev1.DatabaseEngineMySQL:
        return corev1.ProbeHandler{
            Exec: &corev1.ExecAction{
                Command: []string{"mysqladmin", "ping", "-h", "localhost"},
            },
        }
    case storagev1.DatabaseEnginePostgreSQL:
        return corev1.ProbeHandler{
            Exec: &corev1.ExecAction{
                Command: []string{"pg_isready", "-U", "postgres"},
            },
        }
    case storagev1.DatabaseEngineMongoDB:
        return corev1.ProbeHandler{
            Exec: &corev1.ExecAction{
                Command: []string{"mongo", "--eval", "db.adminCommand('ismaster')"},
            },
        }
    default:
        return corev1.ProbeHandler{
            TCPSocket: &corev1.TCPSocketAction{
                Port: intstr.FromInt(int(database.Spec.Network.Port)),
            },
        }
    }
}

// 获取引擎对应的就绪探针
func (r *DatabaseReconciler) getReadinessProbeForEngine(database *storagev1.Database) corev1.ProbeHandler {
    return r.getLivenessProbeForEngine(database)
}

// 检查是否需要更新 StatefulSet
func (r *DatabaseReconciler) needsStatefulSetUpdate(database *storagev1.Database, statefulSet *appsv1.StatefulSet) bool {
    // 检查副本数
    if *statefulSet.Spec.Replicas != database.Spec.Replicas {
        return true
    }

    // 检查镜像
    expectedImage := r.getImageForEngine(database.Spec.Engine, database.Spec.Version)
    if len(statefulSet.Spec.Template.Spec.Containers) > 0 {
        if statefulSet.Spec.Template.Spec.Containers[0].Image != expectedImage {
            return true
        }
    }

    return false
}

// 协调 Service
func (r *DatabaseReconciler) reconcileService(ctx context.Context, database *storagev1.Database) error {
    service := &corev1.Service{}
    err := r.Get(ctx, types.NamespacedName{
        Name:      database.Name,
        Namespace: database.Namespace,
    }, service)

    if err != nil && errors.IsNotFound(err) {
        service = r.serviceForDatabase(database)
        if err := controllerutil.SetControllerReference(database, service, r.Scheme); err != nil {
            return err
        }
        return r.Create(ctx, service)
    } else if err != nil {
        return err
    }

    return nil
}

// 为数据库创建 Service
func (r *DatabaseReconciler) serviceForDatabase(database *storagev1.Database) *corev1.Service {
    labels := map[string]string{
        "app":        database.Name,
        "engine":     string(database.Spec.Engine),
        "managed-by": "database-operator",
    }

    service := &corev1.Service{
        ObjectMeta: metav1.ObjectMeta{
            Name:      database.Name,
            Namespace: database.Namespace,
            Labels:    labels,
        },
        Spec: corev1.ServiceSpec{
            Selector: labels,
            Ports: []corev1.ServicePort{
                {
                    Port:       database.Spec.Network.Port,
                    TargetPort: intstr.FromInt(int(database.Spec.Network.Port)),
                    Protocol:   corev1.ProtocolTCP,
                },
            },
            Type:      corev1.ServiceType(database.Spec.Network.ServiceType),
            ClusterIP: "None", // Headless service for StatefulSet
        },
    }

    return service
}

// 协调备份
func (r *DatabaseReconciler) reconcileBackup(ctx context.Context, database *storagev1.Database) error {
    // 这里可以创建 CronJob 来执行定期备份
    // 为了简化,这里只是一个占位符
    return nil
}

// 更新状态
func (r *DatabaseReconciler) updateStatus(ctx context.Context, database *storagev1.Database) error {
    // 获取 StatefulSet 状态
    statefulSet := &appsv1.StatefulSet{}
    err := r.Get(ctx, types.NamespacedName{
        Name:      database.Name,
        Namespace: database.Namespace,
    }, statefulSet)
    if err != nil {
        return err
    }

    // 更新状态
    database.Status.Replicas = statefulSet.Status.Replicas
    database.Status.ReadyReplicas = statefulSet.Status.ReadyReplicas
    database.Status.ObservedGeneration = database.Generation

    // 确定阶段
    if statefulSet.Status.ReadyReplicas == database.Spec.Replicas {
        database.Status.Phase = storagev1.DatabasePhaseRunning
    } else if statefulSet.Status.Replicas > 0 {
        database.Status.Phase = storagev1.DatabasePhasePending
    } else {
        database.Status.Phase = storagev1.DatabasePhaseFailed
    }

    // 设置端点
    database.Status.Endpoints.Primary = fmt.Sprintf("%s.%s.svc.cluster.local:%d",
        database.Name, database.Namespace, database.Spec.Network.Port)

    return r.Status().Update(ctx, database)
}

// 处理删除
func (r *DatabaseReconciler) handleDeletion(ctx context.Context, database *storagev1.Database) (ctrl.Result, error) {
    if controllerutil.ContainsFinalizer(database, "database.finalizer") {
        // 执行清理逻辑
        // 这里可以添加备份数据、清理外部资源等逻辑

        controllerutil.RemoveFinalizer(database, "database.finalizer")
        return ctrl.Result{}, r.Update(ctx, database)
    }

    return ctrl.Result{}, nil
}

// SetupWithManager 设置控制器管理器
func (r *DatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&storagev1.Database{}).
        Owns(&appsv1.StatefulSet{}).
        Owns(&corev1.Service{}).
        Owns(&corev1.ConfigMap{}).
        Complete(r)
}

使用示例 #

创建数据库实例 #

# mysql-database.yaml
apiVersion: storage.example.com/v1
kind: Database
metadata:
  name: mysql-db
  namespace: default
spec:
  engine: mysql
  version: "8.0"
  storage: "10Gi"
  replicas: 1
  config:
    max_connections: "200"
    innodb_buffer_pool_size: "512M"
  resources:
    requests:
      memory: "1Gi"
      cpu: "500m"
    limits:
      memory: "2Gi"
      cpu: "1"
  network:
    serviceType: "ClusterIP"
    port: 3306
    ssl: true
  backup:
    enabled: true
    schedule: "0 2 * * *"
    retention: "7d"
    storageClass: "fast-ssd"
    storageSize: "5Gi"
  security:
    authMethod: "mysql_native_password"
    credentialsSecret: "mysql-credentials"
    tls:
      enabled: true
      secretName: "mysql-tls"
# 应用数据库配置
kubectl apply -f mysql-database.yaml

# 查看数据库状态
kubectl get databases
kubectl describe database mysql-db

# 查看相关资源
kubectl get statefulsets,services,configmaps -l app=mysql-db

通过掌握 CRD 自定义资源的开发,您可以扩展 Kubernetes 的功能,创建符合特定业务需求的资源类型,并通过控制器实现自动化管理。这是构建云原生应用生态系统的重要技能。