5.3.4 CRD 自定义资源 #
自定义资源定义(Custom Resource Definition,CRD)是 Kubernetes 的扩展机制,允许用户定义新的 API 资源类型。通过 CRD,我们可以扩展 Kubernetes API,创建特定于应用程序的资源类型,并使用 Kubernetes 的声明式 API 来管理这些资源。
CRD 基础概念 #
什么是 CRD #
CRD 是一种 Kubernetes 资源,用于定义新的自定义资源类型。它告诉 Kubernetes API Server 如何存储和验证自定义资源的数据结构。
CRD 的组成部分 #
- Group:API 组,用于组织相关的资源类型
- Version:API 版本,支持版本演进
- Kind:资源类型名称
- Schema:资源的数据结构定义
- Scope:资源作用域(Namespaced 或 Cluster)
创建基础 CRD #
简单的数据库 CRD #
# database-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: databases.storage.example.com
spec:
group: storage.example.com
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
engine:
type: string
enum: ["mysql", "postgresql", "mongodb"]
version:
type: string
storage:
type: string
pattern: "^[0-9]+[KMGT]i$"
replicas:
type: integer
minimum: 1
maximum: 10
backup:
type: object
properties:
enabled:
type: boolean
schedule:
type: string
retention:
type: string
required:
- engine
- version
- storage
status:
type: object
properties:
phase:
type: string
enum: ["Pending", "Running", "Failed"]
conditions:
type: array
items:
type: object
properties:
type:
type: string
status:
type: string
lastTransitionTime:
type: string
format: date-time
reason:
type: string
message:
type: string
endpoints:
type: object
properties:
primary:
type: string
readonly:
type: array
items:
type: string
additionalPrinterColumns:
- name: Engine
type: string
jsonPath: .spec.engine
- name: Version
type: string
jsonPath: .spec.version
- name: Storage
type: string
jsonPath: .spec.storage
- name: Replicas
type: integer
jsonPath: .spec.replicas
- name: Phase
type: string
jsonPath: .status.phase
- name: Age
type: date
jsonPath: .metadata.creationTimestamp
subresources:
status: {}
scale:
specReplicasPath: .spec.replicas
statusReplicasPath: .status.replicas
scope: Namespaced
names:
plural: databases
singular: database
kind: Database
shortNames:
- db
应用 CRD #
# 创建 CRD
kubectl apply -f database-crd.yaml
# 验证 CRD 创建
kubectl get crd databases.storage.example.com
# 查看 CRD 详情
kubectl describe crd databases.storage.example.com
Go 类型定义 #
数据库资源类型 #
// api/v1/database_types.go
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// DatabaseEngine 数据库引擎类型
type DatabaseEngine string
const (
DatabaseEngineMySQL DatabaseEngine = "mysql"
DatabaseEnginePostgreSQL DatabaseEngine = "postgresql"
DatabaseEngineMongoDB DatabaseEngine = "mongodb"
)
// BackupSpec 备份配置
type BackupSpec struct {
// 是否启用备份
Enabled bool `json:"enabled"`
// 备份调度(Cron 格式)
Schedule string `json:"schedule,omitempty"`
// 备份保留时间
Retention string `json:"retention,omitempty"`
// 备份存储位置
StorageClass string `json:"storageClass,omitempty"`
// 备份大小限制
StorageSize string `json:"storageSize,omitempty"`
}
// DatabaseSpec 定义数据库的期望状态
type DatabaseSpec struct {
// 数据库引擎
Engine DatabaseEngine `json:"engine"`
// 数据库版本
Version string `json:"version"`
// 存储大小
Storage string `json:"storage"`
// 副本数量
Replicas int32 `json:"replicas,omitempty"`
// 数据库配置参数
Config map[string]string `json:"config,omitempty"`
// 备份配置
Backup *BackupSpec `json:"backup,omitempty"`
// 资源限制
Resources ResourceRequirements `json:"resources,omitempty"`
// 网络配置
Network NetworkSpec `json:"network,omitempty"`
// 安全配置
Security SecuritySpec `json:"security,omitempty"`
}
// ResourceRequirements 资源需求
type ResourceRequirements struct {
Requests ResourceList `json:"requests,omitempty"`
Limits ResourceList `json:"limits,omitempty"`
}
// ResourceList 资源列表
type ResourceList map[string]string
// NetworkSpec 网络配置
type NetworkSpec struct {
// 服务类型
ServiceType string `json:"serviceType,omitempty"`
// 端口配置
Port int32 `json:"port,omitempty"`
// 是否启用 SSL
SSL bool `json:"ssl,omitempty"`
}
// SecuritySpec 安全配置
type SecuritySpec struct {
// 认证方式
AuthMethod string `json:"authMethod,omitempty"`
// 用户凭据 Secret
CredentialsSecret string `json:"credentialsSecret,omitempty"`
// TLS 配置
TLS *TLSSpec `json:"tls,omitempty"`
}
// TLSSpec TLS 配置
type TLSSpec struct {
Enabled bool `json:"enabled"`
SecretName string `json:"secretName,omitempty"`
}
// DatabasePhase 数据库阶段
type DatabasePhase string
const (
DatabasePhasePending DatabasePhase = "Pending"
DatabasePhaseRunning DatabasePhase = "Running"
DatabasePhaseFailed DatabasePhase = "Failed"
DatabasePhaseUpgrading DatabasePhase = "Upgrading"
DatabasePhaseBackup DatabasePhase = "Backup"
)
// DatabaseConditionType 条件类型
type DatabaseConditionType string
const (
DatabaseConditionReady DatabaseConditionType = "Ready"
DatabaseConditionProgressing DatabaseConditionType = "Progressing"
DatabaseConditionDegraded DatabaseConditionType = "Degraded"
DatabaseConditionBackupReady DatabaseConditionType = "BackupReady"
)
// DatabaseCondition 数据库条件
type DatabaseCondition struct {
Type DatabaseConditionType `json:"type"`
Status metav1.ConditionStatus `json:"status"`
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
Reason string `json:"reason,omitempty"`
Message string `json:"message,omitempty"`
}
// DatabaseEndpoints 数据库端点
type DatabaseEndpoints struct {
// 主数据库端点
Primary string `json:"primary,omitempty"`
// 只读副本端点
Readonly []string `json:"readonly,omitempty"`
// 管理端点
Management string `json:"management,omitempty"`
}
// DatabaseStatus 定义数据库的观察状态
type DatabaseStatus struct {
// 当前阶段
Phase DatabasePhase `json:"phase,omitempty"`
// 状态条件
Conditions []DatabaseCondition `json:"conditions,omitempty"`
// 当前副本数
Replicas int32 `json:"replicas"`
// 就绪副本数
ReadyReplicas int32 `json:"readyReplicas"`
// 数据库端点
Endpoints DatabaseEndpoints `json:"endpoints,omitempty"`
// 存储使用情况
StorageUsed string `json:"storageUsed,omitempty"`
// 最后备份时间
LastBackupTime *metav1.Time `json:"lastBackupTime,omitempty"`
// 观察到的版本
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas
//+kubebuilder:printcolumn:name="Engine",type="string",JSONPath=".spec.engine"
//+kubebuilder:printcolumn:name="Version",type="string",JSONPath=".spec.version"
//+kubebuilder:printcolumn:name="Storage",type="string",JSONPath=".spec.storage"
//+kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas"
//+kubebuilder:printcolumn:name="Ready",type="integer",JSONPath=".status.readyReplicas"
//+kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// Database 是数据库 API 的 Schema
type Database struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec DatabaseSpec `json:"spec,omitempty"`
Status DatabaseStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// DatabaseList 包含数据库列表
type DatabaseList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Database `json:"items"`
}
func init() {
SchemeBuilder.Register(&Database{}, &DatabaseList{})
}
高级 CRD 特性 #
版本转换 #
// api/v1/database_conversion.go
package v1
import (
"fmt"
"sigs.k8s.io/controller-runtime/pkg/conversion"
v1beta1 "github.com/example/database-operator/api/v1beta1"
)
// ConvertTo 转换到 Hub 版本
func (src *Database) ConvertTo(dstRaw conversion.Hub) error {
dst := dstRaw.(*v1beta1.Database)
// 转换 metadata
dst.ObjectMeta = src.ObjectMeta
// 转换 spec
dst.Spec.Engine = string(src.Spec.Engine)
dst.Spec.Version = src.Spec.Version
dst.Spec.Storage = src.Spec.Storage
dst.Spec.Replicas = src.Spec.Replicas
// 转换配置
if src.Spec.Config != nil {
dst.Spec.Config = make(map[string]string)
for k, v := range src.Spec.Config {
dst.Spec.Config[k] = v
}
}
// 转换备份配置
if src.Spec.Backup != nil {
dst.Spec.Backup = &v1beta1.BackupSpec{
Enabled: src.Spec.Backup.Enabled,
Schedule: src.Spec.Backup.Schedule,
Retention: src.Spec.Backup.Retention,
}
}
// 转换状态
dst.Status.Phase = string(src.Status.Phase)
dst.Status.Replicas = src.Status.Replicas
dst.Status.ReadyReplicas = src.Status.ReadyReplicas
return nil
}
// ConvertFrom 从 Hub 版本转换
func (dst *Database) ConvertFrom(srcRaw conversion.Hub) error {
src := srcRaw.(*v1beta1.Database)
// 转换 metadata
dst.ObjectMeta = src.ObjectMeta
// 转换 spec
dst.Spec.Engine = DatabaseEngine(src.Spec.Engine)
dst.Spec.Version = src.Spec.Version
dst.Spec.Storage = src.Spec.Storage
dst.Spec.Replicas = src.Spec.Replicas
// 转换配置
if src.Spec.Config != nil {
dst.Spec.Config = make(map[string]string)
for k, v := range src.Spec.Config {
dst.Spec.Config[k] = v
}
}
// 转换备份配置
if src.Spec.Backup != nil {
dst.Spec.Backup = &BackupSpec{
Enabled: src.Spec.Backup.Enabled,
Schedule: src.Spec.Backup.Schedule,
Retention: src.Spec.Backup.Retention,
}
}
// 转换状态
dst.Status.Phase = DatabasePhase(src.Status.Phase)
dst.Status.Replicas = src.Status.Replicas
dst.Status.ReadyReplicas = src.Status.ReadyReplicas
return nil
}
Webhook 验证 #
// api/v1/database_webhook.go
package v1
import (
"fmt"
"regexp"
"strconv"
"strings"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)
var databaselog = logf.Log.WithName("database-resource")
func (r *Database) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(r).
Complete()
}
//+kubebuilder:webhook:path=/mutate-storage-v1-database,mutating=true,failurePolicy=fail,sideEffects=None,groups=storage.example.com,resources=databases,verbs=create;update,versions=v1,name=mdatabase.kb.io,admissionReviewVersions=v1
var _ webhook.Defaulter = &Database{}
// Default 实现 webhook.Defaulter,设置默认值
func (r *Database) Default() {
databaselog.Info("default", "name", r.Name)
// 设置默认副本数
if r.Spec.Replicas == 0 {
r.Spec.Replicas = 1
}
// 设置默认网络配置
if r.Spec.Network.ServiceType == "" {
r.Spec.Network.ServiceType = "ClusterIP"
}
// 根据引擎设置默认端口
if r.Spec.Network.Port == 0 {
switch r.Spec.Engine {
case DatabaseEngineMySQL:
r.Spec.Network.Port = 3306
case DatabaseEnginePostgreSQL:
r.Spec.Network.Port = 5432
case DatabaseEngineMongoDB:
r.Spec.Network.Port = 27017
}
}
// 设置默认资源请求
if r.Spec.Resources.Requests == nil {
r.Spec.Resources.Requests = ResourceList{
"memory": "512Mi",
"cpu": "500m",
}
}
// 设置默认备份配置
if r.Spec.Backup == nil {
r.Spec.Backup = &BackupSpec{
Enabled: false,
Schedule: "0 2 * * *", // 每天凌晨2点
Retention: "7d",
}
}
}
//+kubebuilder:webhook:path=/validate-storage-v1-database,mutating=false,failurePolicy=fail,sideEffects=None,groups=storage.example.com,resources=databases,verbs=create;update,versions=v1,name=vdatabase.kb.io,admissionReviewVersions=v1
var _ webhook.Validator = &Database{}
// ValidateCreate 实现 webhook.Validator,验证创建
func (r *Database) ValidateCreate() error {
databaselog.Info("validate create", "name", r.Name)
return r.validateDatabase()
}
// ValidateUpdate 实现 webhook.Validator,验证更新
func (r *Database) ValidateUpdate(old runtime.Object) error {
databaselog.Info("validate update", "name", r.Name)
oldDB := old.(*Database)
// 验证不可变字段
if r.Spec.Engine != oldDB.Spec.Engine {
return fmt.Errorf("database engine cannot be changed")
}
// 验证版本升级
if err := r.validateVersionUpgrade(oldDB); err != nil {
return err
}
return r.validateDatabase()
}
// ValidateDelete 实现 webhook.Validator,验证删除
func (r *Database) ValidateDelete() error {
databaselog.Info("validate delete", "name", r.Name)
// 检查是否有备份正在进行
if r.Status.Phase == DatabasePhaseBackup {
return fmt.Errorf("cannot delete database while backup is in progress")
}
return nil
}
// validateDatabase 验证数据库规范
func (r *Database) validateDatabase() error {
// 验证引擎
if err := r.validateEngine(); err != nil {
return err
}
// 验证版本
if err := r.validateVersion(); err != nil {
return err
}
// 验证存储
if err := r.validateStorage(); err != nil {
return err
}
// 验证副本数
if err := r.validateReplicas(); err != nil {
return err
}
// 验证备份配置
if err := r.validateBackup(); err != nil {
return err
}
// 验证网络配置
if err := r.validateNetwork(); err != nil {
return err
}
return nil
}
// validateEngine 验证数据库引擎
func (r *Database) validateEngine() error {
switch r.Spec.Engine {
case DatabaseEngineMySQL, DatabaseEnginePostgreSQL, DatabaseEngineMongoDB:
return nil
default:
return fmt.Errorf("unsupported database engine: %s", r.Spec.Engine)
}
}
// validateVersion 验证版本格式
func (r *Database) validateVersion() error {
if r.Spec.Version == "" {
return fmt.Errorf("version cannot be empty")
}
// 验证版本格式(简单的语义版本检查)
versionRegex := regexp.MustCompile(`^\d+\.\d+(\.\d+)?$`)
if !versionRegex.MatchString(r.Spec.Version) {
return fmt.Errorf("invalid version format: %s", r.Spec.Version)
}
return nil
}
// validateStorage 验证存储配置
func (r *Database) validateStorage() error {
if r.Spec.Storage == "" {
return fmt.Errorf("storage cannot be empty")
}
// 验证存储格式
storageRegex := regexp.MustCompile(`^\d+[KMGT]i$`)
if !storageRegex.MatchString(r.Spec.Storage) {
return fmt.Errorf("invalid storage format: %s", r.Spec.Storage)
}
// 验证最小存储大小
if err := r.validateMinimumStorage(); err != nil {
return err
}
return nil
}
// validateMinimumStorage 验证最小存储大小
func (r *Database) validateMinimumStorage() error {
storage := r.Spec.Storage
// 提取数字和单位
var size int64
var unit string
if strings.HasSuffix(storage, "Gi") {
sizeStr := strings.TrimSuffix(storage, "Gi")
var err error
size, err = strconv.ParseInt(sizeStr, 10, 64)
if err != nil {
return fmt.Errorf("invalid storage size: %s", storage)
}
unit = "Gi"
} else if strings.HasSuffix(storage, "Mi") {
sizeStr := strings.TrimSuffix(storage, "Mi")
var err error
size, err = strconv.ParseInt(sizeStr, 10, 64)
if err != nil {
return fmt.Errorf("invalid storage size: %s", storage)
}
unit = "Mi"
}
// 检查最小存储要求
switch r.Spec.Engine {
case DatabaseEngineMySQL:
if unit == "Mi" && size < 1024 {
return fmt.Errorf("MySQL requires at least 1Gi storage")
}
if unit == "Gi" && size < 1 {
return fmt.Errorf("MySQL requires at least 1Gi storage")
}
case DatabaseEnginePostgreSQL:
if unit == "Mi" && size < 512 {
return fmt.Errorf("PostgreSQL requires at least 512Mi storage")
}
case DatabaseEngineMongoDB:
if unit == "Mi" && size < 2048 {
return fmt.Errorf("MongoDB requires at least 2Gi storage")
}
if unit == "Gi" && size < 2 {
return fmt.Errorf("MongoDB requires at least 2Gi storage")
}
}
return nil
}
// validateReplicas 验证副本数
func (r *Database) validateReplicas() error {
if r.Spec.Replicas < 1 {
return fmt.Errorf("replicas must be at least 1")
}
if r.Spec.Replicas > 10 {
return fmt.Errorf("replicas cannot exceed 10")
}
return nil
}
// validateBackup 验证备份配置
func (r *Database) validateBackup() error {
if r.Spec.Backup == nil {
return nil
}
if r.Spec.Backup.Enabled {
// 验证调度格式(简单的 Cron 验证)
if r.Spec.Backup.Schedule == "" {
return fmt.Errorf("backup schedule cannot be empty when backup is enabled")
}
// 验证保留时间格式
if r.Spec.Backup.Retention == "" {
return fmt.Errorf("backup retention cannot be empty when backup is enabled")
}
retentionRegex := regexp.MustCompile(`^\d+[dwhm]$`)
if !retentionRegex.MatchString(r.Spec.Backup.Retention) {
return fmt.Errorf("invalid backup retention format: %s", r.Spec.Backup.Retention)
}
}
return nil
}
// validateNetwork 验证网络配置
func (r *Database) validateNetwork() error {
// 验证服务类型
validServiceTypes := []string{"ClusterIP", "NodePort", "LoadBalancer"}
valid := false
for _, t := range validServiceTypes {
if r.Spec.Network.ServiceType == t {
valid = true
break
}
}
if !valid {
return fmt.Errorf("invalid service type: %s", r.Spec.Network.ServiceType)
}
// 验证端口范围
if r.Spec.Network.Port < 1 || r.Spec.Network.Port > 65535 {
return fmt.Errorf("port must be between 1 and 65535")
}
return nil
}
// validateVersionUpgrade 验证版本升级
func (r *Database) validateVersionUpgrade(old *Database) error {
if r.Spec.Version == old.Spec.Version {
return nil
}
// 解析版本号
newVersion, err := parseVersion(r.Spec.Version)
if err != nil {
return fmt.Errorf("invalid new version: %v", err)
}
oldVersion, err := parseVersion(old.Spec.Version)
if err != nil {
return fmt.Errorf("invalid old version: %v", err)
}
// 检查是否为向前升级
if !isVersionUpgrade(oldVersion, newVersion) {
return fmt.Errorf("version downgrade is not allowed: %s -> %s", old.Spec.Version, r.Spec.Version)
}
return nil
}
// parseVersion 解析版本号
func parseVersion(version string) ([]int, error) {
parts := strings.Split(version, ".")
var result []int
for _, part := range parts {
num, err := strconv.Atoi(part)
if err != nil {
return nil, err
}
result = append(result, num)
}
return result, nil
}
// isVersionUpgrade 检查是否为版本升级
func isVersionUpgrade(old, new []int) bool {
for i := 0; i < len(old) && i < len(new); i++ {
if new[i] > old[i] {
return true
} else if new[i] < old[i] {
return false
}
}
// 如果前面的版本号都相同,检查长度
return len(new) >= len(old)
}
控制器实现 #
数据库控制器 #
// controllers/database_controller.go
package controllers
import (
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
storagev1 "github.com/example/database-operator/api/v1"
)
// DatabaseReconciler 协调数据库资源
type DatabaseReconciler struct {
client.Client
Scheme *runtime.Scheme
}
//+kubebuilder:rbac:groups=storage.example.com,resources=databases,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=storage.example.com,resources=databases/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=storage.example.com,resources=databases/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;delete
// Reconcile 主要的协调循环
func (r *DatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// 获取数据库实例
database := &storagev1.Database{}
err := r.Get(ctx, req.NamespacedName, database)
if err != nil {
if errors.IsNotFound(err) {
logger.Info("Database resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
logger.Error(err, "Failed to get Database")
return ctrl.Result{}, err
}
// 处理删除
if database.DeletionTimestamp != nil {
return r.handleDeletion(ctx, database)
}
// 添加 finalizer
if !controllerutil.ContainsFinalizer(database, "database.finalizer") {
controllerutil.AddFinalizer(database, "database.finalizer")
return ctrl.Result{}, r.Update(ctx, database)
}
// 协调 ConfigMap
if err := r.reconcileConfigMap(ctx, database); err != nil {
return ctrl.Result{}, err
}
// 协调 StatefulSet
if err := r.reconcileStatefulSet(ctx, database); err != nil {
return ctrl.Result{}, err
}
// 协调 Service
if err := r.reconcileService(ctx, database); err != nil {
return ctrl.Result{}, err
}
// 协调备份
if database.Spec.Backup != nil && database.Spec.Backup.Enabled {
if err := r.reconcileBackup(ctx, database); err != nil {
return ctrl.Result{}, err
}
}
// 更新状态
if err := r.updateStatus(ctx, database); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: time.Minute * 5}, nil
}
// 协调 ConfigMap
func (r *DatabaseReconciler) reconcileConfigMap(ctx context.Context, database *storagev1.Database) error {
configMap := &corev1.ConfigMap{}
err := r.Get(ctx, types.NamespacedName{
Name: database.Name + "-config",
Namespace: database.Namespace,
}, configMap)
if err != nil && errors.IsNotFound(err) {
configMap = r.configMapForDatabase(database)
if err := controllerutil.SetControllerReference(database, configMap, r.Scheme); err != nil {
return err
}
return r.Create(ctx, configMap)
} else if err != nil {
return err
}
return nil
}
// 为数据库创建 ConfigMap
func (r *DatabaseReconciler) configMapForDatabase(database *storagev1.Database) *corev1.ConfigMap {
labels := map[string]string{
"app": database.Name,
"engine": string(database.Spec.Engine),
"managed-by": "database-operator",
}
data := make(map[string]string)
// 根据引擎类型生成配置
switch database.Spec.Engine {
case storagev1.DatabaseEngineMySQL:
data["my.cnf"] = r.generateMySQLConfig(database)
case storagev1.DatabaseEnginePostgreSQL:
data["postgresql.conf"] = r.generatePostgreSQLConfig(database)
case storagev1.DatabaseEngineMongoDB:
data["mongod.conf"] = r.generateMongoDBConfig(database)
}
// 添加用户自定义配置
for k, v := range database.Spec.Config {
data[k] = v
}
return &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: database.Name + "-config",
Namespace: database.Namespace,
Labels: labels,
},
Data: data,
}
}
// 生成 MySQL 配置
func (r *DatabaseReconciler) generateMySQLConfig(database *storagev1.Database) string {
config := `[mysqld]
port = %d
bind-address = 0.0.0.0
default-storage-engine = InnoDB
innodb_buffer_pool_size = 256M
max_connections = 100
`
return fmt.Sprintf(config, database.Spec.Network.Port)
}
// 生成 PostgreSQL 配置
func (r *DatabaseReconciler) generatePostgreSQLConfig(database *storagev1.Database) string {
config := `port = %d
listen_addresses = '*'
shared_buffers = 256MB
max_connections = 100
`
return fmt.Sprintf(config, database.Spec.Network.Port)
}
// 生成 MongoDB 配置
func (r *DatabaseReconciler) generateMongoDBConfig(database *storagev1.Database) string {
config := `net:
port: %d
bindIp: 0.0.0.0
storage:
dbPath: /data/db
journal:
enabled: true
`
return fmt.Sprintf(config, database.Spec.Network.Port)
}
// 协调 StatefulSet
func (r *DatabaseReconciler) reconcileStatefulSet(ctx context.Context, database *storagev1.Database) error {
statefulSet := &appsv1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{
Name: database.Name,
Namespace: database.Namespace,
}, statefulSet)
if err != nil && errors.IsNotFound(err) {
statefulSet = r.statefulSetForDatabase(database)
if err := controllerutil.SetControllerReference(database, statefulSet, r.Scheme); err != nil {
return err
}
return r.Create(ctx, statefulSet)
} else if err != nil {
return err
}
// 检查是否需要更新
if r.needsStatefulSetUpdate(database, statefulSet) {
statefulSet = r.statefulSetForDatabase(database)
if err := controllerutil.SetControllerReference(database, statefulSet, r.Scheme); err != nil {
return err
}
return r.Update(ctx, statefulSet)
}
return nil
}
// 为数据库创建 StatefulSet
func (r *DatabaseReconciler) statefulSetForDatabase(database *storagev1.Database) *appsv1.StatefulSet {
labels := map[string]string{
"app": database.Name,
"engine": string(database.Spec.Engine),
"managed-by": "database-operator",
}
// 构建容器镜像
image := r.getImageForEngine(database.Spec.Engine, database.Spec.Version)
// 构建环境变量
envVars := r.getEnvVarsForEngine(database)
// 构建资源需求
resources := corev1.ResourceRequirements{}
if database.Spec.Resources.Requests != nil {
resources.Requests = corev1.ResourceList{}
for k, v := range database.Spec.Resources.Requests {
resources.Requests[corev1.ResourceName(k)] = resource.MustParse(v)
}
}
if database.Spec.Resources.Limits != nil {
resources.Limits = corev1.ResourceList{}
for k, v := range database.Spec.Resources.Limits {
resources.Limits[corev1.ResourceName(k)] = resource.MustParse(v)
}
}
// 构建卷挂载
volumeMounts := []corev1.VolumeMount{
{
Name: "data",
MountPath: r.getDataPathForEngine(database.Spec.Engine),
},
{
Name: "config",
MountPath: r.getConfigPathForEngine(database.Spec.Engine),
},
}
// 构建卷
volumes := []corev1.Volume{
{
Name: "config",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: database.Name + "-config",
},
},
},
},
}
// 构建 PVC 模板
pvcTemplate := corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "data",
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse(database.Spec.Storage),
},
},
},
}
statefulSet := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: database.Name,
Namespace: database.Namespace,
Labels: labels,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &database.Spec.Replicas,
ServiceName: database.Name,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: database.Name,
Image: image,
Ports: r.getContainerPortsForEngine(database),
Env: envVars,
Resources: resources,
VolumeMounts: volumeMounts,
LivenessProbe: &corev1.Probe{
ProbeHandler: r.getLivenessProbeForEngine(database),
InitialDelaySeconds: 30,
PeriodSeconds: 10,
},
ReadinessProbe: &corev1.Probe{
ProbeHandler: r.getReadinessProbeForEngine(database),
InitialDelaySeconds: 5,
PeriodSeconds: 5,
},
},
},
Volumes: volumes,
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{pvcTemplate},
},
}
return statefulSet
}
// 获取引擎对应的镜像
func (r *DatabaseReconciler) getImageForEngine(engine storagev1.DatabaseEngine, version string) string {
switch engine {
case storagev1.DatabaseEngineMySQL:
return fmt.Sprintf("mysql:%s", version)
case storagev1.DatabaseEnginePostgreSQL:
return fmt.Sprintf("postgres:%s", version)
case storagev1.DatabaseEngineMongoDB:
return fmt.Sprintf("mongo:%s", version)
default:
return ""
}
}
// 获取引擎对应的环境变量
func (r *DatabaseReconciler) getEnvVarsForEngine(database *storagev1.Database) []corev1.EnvVar {
var envVars []corev1.EnvVar
switch database.Spec.Engine {
case storagev1.DatabaseEngineMySQL:
envVars = []corev1.EnvVar{
{Name: "MYSQL_ROOT_PASSWORD", Value: "rootpassword"},
{Name: "MYSQL_DATABASE", Value: database.Name},
}
case storagev1.DatabaseEnginePostgreSQL:
envVars = []corev1.EnvVar{
{Name: "POSTGRES_PASSWORD", Value: "password"},
{Name: "POSTGRES_DB", Value: database.Name},
}
case storagev1.DatabaseEngineMongoDB:
envVars = []corev1.EnvVar{
{Name: "MONGO_INITDB_ROOT_USERNAME", Value: "admin"},
{Name: "MONGO_INITDB_ROOT_PASSWORD", Value: "password"},
}
}
return envVars
}
// 获取引擎对应的数据路径
func (r *DatabaseReconciler) getDataPathForEngine(engine storagev1.DatabaseEngine) string {
switch engine {
case storagev1.DatabaseEngineMySQL:
return "/var/lib/mysql"
case storagev1.DatabaseEnginePostgreSQL:
return "/var/lib/postgresql/data"
case storagev1.DatabaseEngineMongoDB:
return "/data/db"
default:
return "/data"
}
}
// 获取引擎对应的配置路径
func (r *DatabaseReconciler) getConfigPathForEngine(engine storagev1.DatabaseEngine) string {
switch engine {
case storagev1.DatabaseEngineMySQL:
return "/etc/mysql/conf.d"
case storagev1.DatabaseEnginePostgreSQL:
return "/etc/postgresql"
case storagev1.DatabaseEngineMongoDB:
return "/etc/mongo"
default:
return "/etc/config"
}
}
// 获取引擎对应的容器端口
func (r *DatabaseReconciler) getContainerPortsForEngine(database *storagev1.Database) []corev1.ContainerPort {
return []corev1.ContainerPort{
{
ContainerPort: database.Spec.Network.Port,
Protocol: corev1.ProtocolTCP,
},
}
}
// 获取引擎对应的存活探针
func (r *DatabaseReconciler) getLivenessProbeForEngine(database *storagev1.Database) corev1.ProbeHandler {
switch database.Spec.Engine {
case storagev1.DatabaseEngineMySQL:
return corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"mysqladmin", "ping", "-h", "localhost"},
},
}
case storagev1.DatabaseEnginePostgreSQL:
return corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"pg_isready", "-U", "postgres"},
},
}
case storagev1.DatabaseEngineMongoDB:
return corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"mongo", "--eval", "db.adminCommand('ismaster')"},
},
}
default:
return corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{
Port: intstr.FromInt(int(database.Spec.Network.Port)),
},
}
}
}
// 获取引擎对应的就绪探针
func (r *DatabaseReconciler) getReadinessProbeForEngine(database *storagev1.Database) corev1.ProbeHandler {
return r.getLivenessProbeForEngine(database)
}
// 检查是否需要更新 StatefulSet
func (r *DatabaseReconciler) needsStatefulSetUpdate(database *storagev1.Database, statefulSet *appsv1.StatefulSet) bool {
// 检查副本数
if *statefulSet.Spec.Replicas != database.Spec.Replicas {
return true
}
// 检查镜像
expectedImage := r.getImageForEngine(database.Spec.Engine, database.Spec.Version)
if len(statefulSet.Spec.Template.Spec.Containers) > 0 {
if statefulSet.Spec.Template.Spec.Containers[0].Image != expectedImage {
return true
}
}
return false
}
// 协调 Service
func (r *DatabaseReconciler) reconcileService(ctx context.Context, database *storagev1.Database) error {
service := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{
Name: database.Name,
Namespace: database.Namespace,
}, service)
if err != nil && errors.IsNotFound(err) {
service = r.serviceForDatabase(database)
if err := controllerutil.SetControllerReference(database, service, r.Scheme); err != nil {
return err
}
return r.Create(ctx, service)
} else if err != nil {
return err
}
return nil
}
// 为数据库创建 Service
func (r *DatabaseReconciler) serviceForDatabase(database *storagev1.Database) *corev1.Service {
labels := map[string]string{
"app": database.Name,
"engine": string(database.Spec.Engine),
"managed-by": "database-operator",
}
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: database.Name,
Namespace: database.Namespace,
Labels: labels,
},
Spec: corev1.ServiceSpec{
Selector: labels,
Ports: []corev1.ServicePort{
{
Port: database.Spec.Network.Port,
TargetPort: intstr.FromInt(int(database.Spec.Network.Port)),
Protocol: corev1.ProtocolTCP,
},
},
Type: corev1.ServiceType(database.Spec.Network.ServiceType),
ClusterIP: "None", // Headless service for StatefulSet
},
}
return service
}
// 协调备份
func (r *DatabaseReconciler) reconcileBackup(ctx context.Context, database *storagev1.Database) error {
// 这里可以创建 CronJob 来执行定期备份
// 为了简化,这里只是一个占位符
return nil
}
// 更新状态
func (r *DatabaseReconciler) updateStatus(ctx context.Context, database *storagev1.Database) error {
// 获取 StatefulSet 状态
statefulSet := &appsv1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{
Name: database.Name,
Namespace: database.Namespace,
}, statefulSet)
if err != nil {
return err
}
// 更新状态
database.Status.Replicas = statefulSet.Status.Replicas
database.Status.ReadyReplicas = statefulSet.Status.ReadyReplicas
database.Status.ObservedGeneration = database.Generation
// 确定阶段
if statefulSet.Status.ReadyReplicas == database.Spec.Replicas {
database.Status.Phase = storagev1.DatabasePhaseRunning
} else if statefulSet.Status.Replicas > 0 {
database.Status.Phase = storagev1.DatabasePhasePending
} else {
database.Status.Phase = storagev1.DatabasePhaseFailed
}
// 设置端点
database.Status.Endpoints.Primary = fmt.Sprintf("%s.%s.svc.cluster.local:%d",
database.Name, database.Namespace, database.Spec.Network.Port)
return r.Status().Update(ctx, database)
}
// 处理删除
func (r *DatabaseReconciler) handleDeletion(ctx context.Context, database *storagev1.Database) (ctrl.Result, error) {
if controllerutil.ContainsFinalizer(database, "database.finalizer") {
// 执行清理逻辑
// 这里可以添加备份数据、清理外部资源等逻辑
controllerutil.RemoveFinalizer(database, "database.finalizer")
return ctrl.Result{}, r.Update(ctx, database)
}
return ctrl.Result{}, nil
}
// SetupWithManager 设置控制器管理器
func (r *DatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&storagev1.Database{}).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
Owns(&corev1.ConfigMap{}).
Complete(r)
}
使用示例 #
创建数据库实例 #
# mysql-database.yaml
apiVersion: storage.example.com/v1
kind: Database
metadata:
name: mysql-db
namespace: default
spec:
engine: mysql
version: "8.0"
storage: "10Gi"
replicas: 1
config:
max_connections: "200"
innodb_buffer_pool_size: "512M"
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1"
network:
serviceType: "ClusterIP"
port: 3306
ssl: true
backup:
enabled: true
schedule: "0 2 * * *"
retention: "7d"
storageClass: "fast-ssd"
storageSize: "5Gi"
security:
authMethod: "mysql_native_password"
credentialsSecret: "mysql-credentials"
tls:
enabled: true
secretName: "mysql-tls"
# 应用数据库配置
kubectl apply -f mysql-database.yaml
# 查看数据库状态
kubectl get databases
kubectl describe database mysql-db
# 查看相关资源
kubectl get statefulsets,services,configmaps -l app=mysql-db
通过掌握 CRD 自定义资源的开发,您可以扩展 Kubernetes 的功能,创建符合特定业务需求的资源类型,并通过控制器实现自动化管理。这是构建云原生应用生态系统的重要技能。