5.3.3 Operator 开发基础 #
Operator 是 Kubernetes 的一种扩展模式,它使用自定义资源(Custom Resources)来管理应用程序及其组件。Operator 将运维知识编码到软件中,自动化复杂应用的部署、扩展、备份和升级等操作。
Operator 模式概述 #
什么是 Operator #
Operator 是一种软件扩展,它使用自定义资源来管理应用程序及其组件。Operator 遵循 Kubernetes 的控制器模式,持续监控集群状态并采取行动以达到期望状态。
Operator 的核心组件 #
- 自定义资源定义(CRD):定义新的 API 资源类型
- 自定义控制器:监听资源变化并执行相应操作
- 自定义资源(CR):CRD 的实例,描述期望状态
Operator 成熟度模型 #
Level 5: Auto Pilot
├── 自动调优和异常检测
├── 自动故障恢复
└── 预测性维护
Level 4: Deep Insights
├── 监控和告警
├── 日志聚合
└── 性能分析
Level 3: Full Lifecycle
├── 应用升级
├── 故障恢复
└── 备份和恢复
Level 2: Seamless Upgrades
├── 补丁和小版本升级
├── 配置更新
└── 滚动更新
Level 1: Basic Install
├── 自动化安装
├── 配置管理
└── 基本运行时管理
开发环境准备 #
安装必要工具 #
# 安装 Operator SDK
curl -LO https://github.com/operator-framework/operator-sdk/releases/latest/download/operator-sdk_linux_amd64
chmod +x operator-sdk_linux_amd64
sudo mv operator-sdk_linux_amd64 /usr/local/bin/operator-sdk
# 安装 Kubebuilder
curl -L -o kubebuilder https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)
chmod +x kubebuilder
sudo mv kubebuilder /usr/local/bin/
# 验证安装
operator-sdk version
kubebuilder version
项目初始化 #
# 创建新的 Operator 项目
mkdir webapp-operator
cd webapp-operator
# 初始化项目
operator-sdk init --domain=example.com --repo=github.com/example/webapp-operator
# 创建 API 和控制器
operator-sdk create api --group=apps --version=v1 --kind=WebApp --resource --controller
自定义资源定义 #
WebApp CRD 定义 #
// api/v1/webapp_types.go
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// WebAppSpec 定义 WebApp 的期望状态
type WebAppSpec struct {
// 应用镜像
Image string `json:"image"`
// 副本数量
Replicas *int32 `json:"replicas,omitempty"`
// 端口配置
Port int32 `json:"port,omitempty"`
// 环境变量
Env []EnvVar `json:"env,omitempty"`
// 资源限制
Resources ResourceRequirements `json:"resources,omitempty"`
// 服务类型
ServiceType ServiceType `json:"serviceType,omitempty"`
// Ingress 配置
Ingress *IngressSpec `json:"ingress,omitempty"`
}
// EnvVar 环境变量定义
type EnvVar struct {
Name string `json:"name"`
Value string `json:"value"`
}
// ResourceRequirements 资源需求定义
type ResourceRequirements struct {
Requests ResourceList `json:"requests,omitempty"`
Limits ResourceList `json:"limits,omitempty"`
}
// ResourceList 资源列表
type ResourceList map[string]string
// ServiceType 服务类型
type ServiceType string
const (
ServiceTypeClusterIP ServiceType = "ClusterIP"
ServiceTypeNodePort ServiceType = "NodePort"
ServiceTypeLoadBalancer ServiceType = "LoadBalancer"
)
// IngressSpec Ingress 配置
type IngressSpec struct {
Host string `json:"host"`
Path string `json:"path,omitempty"`
TLS bool `json:"tls,omitempty"`
}
// WebAppStatus 定义 WebApp 的观察状态
type WebAppStatus struct {
// 当前副本数
Replicas int32 `json:"replicas"`
// 就绪副本数
ReadyReplicas int32 `json:"readyReplicas"`
// 部署状态
Phase WebAppPhase `json:"phase,omitempty"`
// 状态条件
Conditions []WebAppCondition `json:"conditions,omitempty"`
// 服务 URL
ServiceURL string `json:"serviceURL,omitempty"`
// Ingress URL
IngressURL string `json:"ingressURL,omitempty"`
}
// WebAppPhase 应用阶段
type WebAppPhase string
const (
WebAppPhasePending WebAppPhase = "Pending"
WebAppPhaseRunning WebAppPhase = "Running"
WebAppPhaseFailed WebAppPhase = "Failed"
WebAppPhaseSucceeded WebAppPhase = "Succeeded"
)
// WebAppCondition 状态条件
type WebAppCondition struct {
Type WebAppConditionType `json:"type"`
Status metav1.ConditionStatus `json:"status"`
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
Reason string `json:"reason,omitempty"`
Message string `json:"message,omitempty"`
}
// WebAppConditionType 条件类型
type WebAppConditionType string
const (
WebAppConditionAvailable WebAppConditionType = "Available"
WebAppConditionProgressing WebAppConditionType = "Progressing"
WebAppConditionDegraded WebAppConditionType = "Degraded"
)
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas
//+kubebuilder:printcolumn:name="Image",type="string",JSONPath=".spec.image"
//+kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas"
//+kubebuilder:printcolumn:name="Ready",type="integer",JSONPath=".status.readyReplicas"
//+kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// WebApp 是 WebApp API 的 Schema
type WebApp struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec WebAppSpec `json:"spec,omitempty"`
Status WebAppStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// WebAppList 包含 WebApp 列表
type WebAppList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []WebApp `json:"items"`
}
func init() {
SchemeBuilder.Register(&WebApp{}, &WebAppList{})
}
验证和默认值 #
// api/v1/webapp_webhook.go
package v1
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)
var webapplog = logf.Log.WithName("webapp-resource")
func (r *WebApp) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(r).
Complete()
}
//+kubebuilder:webhook:path=/mutate-apps-v1-webapp,mutating=true,failurePolicy=fail,sideEffects=None,groups=apps,resources=webapps,verbs=create;update,versions=v1,name=mwebapp.kb.io,admissionReviewVersions=v1
var _ webhook.Defaulter = &WebApp{}
// Default 实现 webhook.Defaulter,设置默认值
func (r *WebApp) Default() {
webapplog.Info("default", "name", r.Name)
// 设置默认副本数
if r.Spec.Replicas == nil {
replicas := int32(1)
r.Spec.Replicas = &replicas
}
// 设置默认端口
if r.Spec.Port == 0 {
r.Spec.Port = 8080
}
// 设置默认服务类型
if r.Spec.ServiceType == "" {
r.Spec.ServiceType = ServiceTypeClusterIP
}
// 设置默认 Ingress 路径
if r.Spec.Ingress != nil && r.Spec.Ingress.Path == "" {
r.Spec.Ingress.Path = "/"
}
}
//+kubebuilder:webhook:path=/validate-apps-v1-webapp,mutating=false,failurePolicy=fail,sideEffects=None,groups=apps,resources=webapps,verbs=create;update,versions=v1,name=vwebapp.kb.io,admissionReviewVersions=v1
var _ webhook.Validator = &WebApp{}
// ValidateCreate 实现 webhook.Validator,验证创建
func (r *WebApp) ValidateCreate() error {
webapplog.Info("validate create", "name", r.Name)
return r.validateWebApp()
}
// ValidateUpdate 实现 webhook.Validator,验证更新
func (r *WebApp) ValidateUpdate(old runtime.Object) error {
webapplog.Info("validate update", "name", r.Name)
return r.validateWebApp()
}
// ValidateDelete 实现 webhook.Validator,验证删除
func (r *WebApp) ValidateDelete() error {
webapplog.Info("validate delete", "name", r.Name)
return nil
}
// validateWebApp 验证 WebApp 规范
func (r *WebApp) validateWebApp() error {
// 验证镜像不为空
if r.Spec.Image == "" {
return fmt.Errorf("image cannot be empty")
}
// 验证副本数范围
if r.Spec.Replicas != nil && (*r.Spec.Replicas < 0 || *r.Spec.Replicas > 100) {
return fmt.Errorf("replicas must be between 0 and 100")
}
// 验证端口范围
if r.Spec.Port < 1 || r.Spec.Port > 65535 {
return fmt.Errorf("port must be between 1 and 65535")
}
// 验证环境变量名称
for _, env := range r.Spec.Env {
if env.Name == "" {
return fmt.Errorf("environment variable name cannot be empty")
}
}
// 验证 Ingress 配置
if r.Spec.Ingress != nil {
if r.Spec.Ingress.Host == "" {
return fmt.Errorf("ingress host cannot be empty")
}
}
return nil
}
控制器实现 #
主控制器逻辑 #
// controllers/webapp_controller.go
package controllers
import (
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
appsv1alpha1 "github.com/example/webapp-operator/api/v1"
)
// WebAppReconciler 协调 WebApp 资源
type WebAppReconciler struct {
client.Client
Scheme *runtime.Scheme
}
//+kubebuilder:rbac:groups=apps.example.com,resources=webapps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps.example.com,resources=webapps/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=apps.example.com,resources=webapps/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
// Reconcile 是主要的协调循环
func (r *WebAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// 获取 WebApp 实例
webapp := &appsv1alpha1.WebApp{}
err := r.Get(ctx, req.NamespacedName, webapp)
if err != nil {
if errors.IsNotFound(err) {
logger.Info("WebApp resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
logger.Error(err, "Failed to get WebApp")
return ctrl.Result{}, err
}
// 处理删除
if webapp.DeletionTimestamp != nil {
return r.handleDeletion(ctx, webapp)
}
// 添加 finalizer
if !controllerutil.ContainsFinalizer(webapp, "webapp.finalizer") {
controllerutil.AddFinalizer(webapp, "webapp.finalizer")
return ctrl.Result{}, r.Update(ctx, webapp)
}
// 协调 Deployment
if err := r.reconcileDeployment(ctx, webapp); err != nil {
return ctrl.Result{}, err
}
// 协调 Service
if err := r.reconcileService(ctx, webapp); err != nil {
return ctrl.Result{}, err
}
// 协调 Ingress
if webapp.Spec.Ingress != nil {
if err := r.reconcileIngress(ctx, webapp); err != nil {
return ctrl.Result{}, err
}
}
// 更新状态
if err := r.updateStatus(ctx, webapp); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: time.Minute * 5}, nil
}
// 协调 Deployment
func (r *WebAppReconciler) reconcileDeployment(ctx context.Context, webapp *appsv1alpha1.WebApp) error {
logger := log.FromContext(ctx)
deployment := &appsv1.Deployment{}
err := r.Get(ctx, types.NamespacedName{
Name: webapp.Name,
Namespace: webapp.Namespace,
}, deployment)
if err != nil && errors.IsNotFound(err) {
// 创建新的 Deployment
deployment = r.deploymentForWebApp(webapp)
if err := controllerutil.SetControllerReference(webapp, deployment, r.Scheme); err != nil {
return err
}
logger.Info("Creating a new Deployment", "Deployment.Namespace", deployment.Namespace, "Deployment.Name", deployment.Name)
return r.Create(ctx, deployment)
} else if err != nil {
logger.Error(err, "Failed to get Deployment")
return err
}
// 更新现有 Deployment
if r.needsDeploymentUpdate(webapp, deployment) {
logger.Info("Updating Deployment", "Deployment.Namespace", deployment.Namespace, "Deployment.Name", deployment.Name)
deployment = r.deploymentForWebApp(webapp)
if err := controllerutil.SetControllerReference(webapp, deployment, r.Scheme); err != nil {
return err
}
return r.Update(ctx, deployment)
}
return nil
}
// 为 WebApp 创建 Deployment
func (r *WebAppReconciler) deploymentForWebApp(webapp *appsv1alpha1.WebApp) *appsv1.Deployment {
labels := map[string]string{
"app": webapp.Name,
"managed-by": "webapp-operator",
}
// 构建环境变量
var envVars []corev1.EnvVar
for _, env := range webapp.Spec.Env {
envVars = append(envVars, corev1.EnvVar{
Name: env.Name,
Value: env.Value,
})
}
// 构建资源需求
resources := corev1.ResourceRequirements{}
if webapp.Spec.Resources.Requests != nil {
resources.Requests = corev1.ResourceList{}
for k, v := range webapp.Spec.Resources.Requests {
resources.Requests[corev1.ResourceName(k)] = resource.MustParse(v)
}
}
if webapp.Spec.Resources.Limits != nil {
resources.Limits = corev1.ResourceList{}
for k, v := range webapp.Spec.Resources.Limits {
resources.Limits[corev1.ResourceName(k)] = resource.MustParse(v)
}
}
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: webapp.Name,
Namespace: webapp.Namespace,
Labels: labels,
},
Spec: appsv1.DeploymentSpec{
Replicas: webapp.Spec.Replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: webapp.Name,
Image: webapp.Spec.Image,
Ports: []corev1.ContainerPort{
{
ContainerPort: webapp.Spec.Port,
Protocol: corev1.ProtocolTCP,
},
},
Env: envVars,
Resources: resources,
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/health",
Port: intstr.FromInt(int(webapp.Spec.Port)),
},
},
InitialDelaySeconds: 30,
PeriodSeconds: 10,
},
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/ready",
Port: intstr.FromInt(int(webapp.Spec.Port)),
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 5,
},
},
},
},
},
},
}
return deployment
}
// 协调 Service
func (r *WebAppReconciler) reconcileService(ctx context.Context, webapp *appsv1alpha1.WebApp) error {
logger := log.FromContext(ctx)
service := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{
Name: webapp.Name,
Namespace: webapp.Namespace,
}, service)
if err != nil && errors.IsNotFound(err) {
// 创建新的 Service
service = r.serviceForWebApp(webapp)
if err := controllerutil.SetControllerReference(webapp, service, r.Scheme); err != nil {
return err
}
logger.Info("Creating a new Service", "Service.Namespace", service.Namespace, "Service.Name", service.Name)
return r.Create(ctx, service)
} else if err != nil {
logger.Error(err, "Failed to get Service")
return err
}
return nil
}
// 为 WebApp 创建 Service
func (r *WebAppReconciler) serviceForWebApp(webapp *appsv1alpha1.WebApp) *corev1.Service {
labels := map[string]string{
"app": webapp.Name,
"managed-by": "webapp-operator",
}
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: webapp.Name,
Namespace: webapp.Namespace,
Labels: labels,
},
Spec: corev1.ServiceSpec{
Selector: labels,
Ports: []corev1.ServicePort{
{
Port: 80,
TargetPort: intstr.FromInt(int(webapp.Spec.Port)),
Protocol: corev1.ProtocolTCP,
},
},
Type: corev1.ServiceType(webapp.Spec.ServiceType),
},
}
return service
}
// 协调 Ingress
func (r *WebAppReconciler) reconcileIngress(ctx context.Context, webapp *appsv1alpha1.WebApp) error {
logger := log.FromContext(ctx)
ingress := &networkingv1.Ingress{}
err := r.Get(ctx, types.NamespacedName{
Name: webapp.Name,
Namespace: webapp.Namespace,
}, ingress)
if err != nil && errors.IsNotFound(err) {
// 创建新的 Ingress
ingress = r.ingressForWebApp(webapp)
if err := controllerutil.SetControllerReference(webapp, ingress, r.Scheme); err != nil {
return err
}
logger.Info("Creating a new Ingress", "Ingress.Namespace", ingress.Namespace, "Ingress.Name", ingress.Name)
return r.Create(ctx, ingress)
} else if err != nil {
logger.Error(err, "Failed to get Ingress")
return err
}
return nil
}
// 为 WebApp 创建 Ingress
func (r *WebAppReconciler) ingressForWebApp(webapp *appsv1alpha1.WebApp) *networkingv1.Ingress {
labels := map[string]string{
"app": webapp.Name,
"managed-by": "webapp-operator",
}
pathType := networkingv1.PathTypePrefix
ingress := &networkingv1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: webapp.Name,
Namespace: webapp.Namespace,
Labels: labels,
},
Spec: networkingv1.IngressSpec{
Rules: []networkingv1.IngressRule{
{
Host: webapp.Spec.Ingress.Host,
IngressRuleValue: networkingv1.IngressRuleValue{
HTTP: &networkingv1.HTTPIngressRuleValue{
Paths: []networkingv1.HTTPIngressPath{
{
Path: webapp.Spec.Ingress.Path,
PathType: &pathType,
Backend: networkingv1.IngressBackend{
Service: &networkingv1.IngressServiceBackend{
Name: webapp.Name,
Port: networkingv1.ServiceBackendPort{
Number: 80,
},
},
},
},
},
},
},
},
},
},
}
// 添加 TLS 配置
if webapp.Spec.Ingress.TLS {
ingress.Spec.TLS = []networkingv1.IngressTLS{
{
Hosts: []string{webapp.Spec.Ingress.Host},
SecretName: webapp.Name + "-tls",
},
}
}
return ingress
}
// 更新状态
func (r *WebAppReconciler) updateStatus(ctx context.Context, webapp *appsv1alpha1.WebApp) error {
// 获取 Deployment 状态
deployment := &appsv1.Deployment{}
err := r.Get(ctx, types.NamespacedName{
Name: webapp.Name,
Namespace: webapp.Namespace,
}, deployment)
if err != nil {
return err
}
// 更新状态
webapp.Status.Replicas = deployment.Status.Replicas
webapp.Status.ReadyReplicas = deployment.Status.ReadyReplicas
// 确定阶段
if deployment.Status.ReadyReplicas == *webapp.Spec.Replicas {
webapp.Status.Phase = appsv1alpha1.WebAppPhaseRunning
} else if deployment.Status.Replicas > 0 {
webapp.Status.Phase = appsv1alpha1.WebAppPhasePending
} else {
webapp.Status.Phase = appsv1alpha1.WebAppPhaseFailed
}
// 设置服务 URL
webapp.Status.ServiceURL = fmt.Sprintf("http://%s.%s.svc.cluster.local", webapp.Name, webapp.Namespace)
// 设置 Ingress URL
if webapp.Spec.Ingress != nil {
protocol := "http"
if webapp.Spec.Ingress.TLS {
protocol = "https"
}
webapp.Status.IngressURL = fmt.Sprintf("%s://%s%s", protocol, webapp.Spec.Ingress.Host, webapp.Spec.Ingress.Path)
}
return r.Status().Update(ctx, webapp)
}
// 处理删除
func (r *WebAppReconciler) handleDeletion(ctx context.Context, webapp *appsv1alpha1.WebApp) (ctrl.Result, error) {
logger := log.FromContext(ctx)
if controllerutil.ContainsFinalizer(webapp, "webapp.finalizer") {
// 执行清理逻辑
logger.Info("Performing cleanup for WebApp", "name", webapp.Name)
// 移除 finalizer
controllerutil.RemoveFinalizer(webapp, "webapp.finalizer")
return ctrl.Result{}, r.Update(ctx, webapp)
}
return ctrl.Result{}, nil
}
// 检查是否需要更新 Deployment
func (r *WebAppReconciler) needsDeploymentUpdate(webapp *appsv1alpha1.WebApp, deployment *appsv1.Deployment) bool {
// 检查副本数
if *deployment.Spec.Replicas != *webapp.Spec.Replicas {
return true
}
// 检查镜像
if len(deployment.Spec.Template.Spec.Containers) > 0 {
if deployment.Spec.Template.Spec.Containers[0].Image != webapp.Spec.Image {
return true
}
}
return false
}
// SetupWithManager 设置控制器管理器
func (r *WebAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&appsv1alpha1.WebApp{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
Owns(&networkingv1.Ingress{}).
Complete(r)
}
测试和部署 #
单元测试 #
// controllers/webapp_controller_test.go
package controllers
import (
"context"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
appsv1alpha1 "github.com/example/webapp-operator/api/v1"
)
var _ = Describe("WebApp Controller", func() {
Context("When creating a WebApp", func() {
It("Should create a Deployment and Service", func() {
ctx := context.Background()
webapp := &appsv1alpha1.WebApp{
ObjectMeta: metav1.ObjectMeta{
Name: "test-webapp",
Namespace: "default",
},
Spec: appsv1alpha1.WebAppSpec{
Image: "nginx:1.20",
Replicas: &[]int32{2}[0],
Port: 80,
},
}
Expect(k8sClient.Create(ctx, webapp)).Should(Succeed())
// 验证 Deployment 创建
deployment := &appsv1.Deployment{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "test-webapp",
Namespace: "default",
}, deployment)
}, time.Second*10, time.Millisecond*250).Should(Succeed())
Expect(*deployment.Spec.Replicas).Should(Equal(int32(2)))
Expect(deployment.Spec.Template.Spec.Containers[0].Image).Should(Equal("nginx:1.20"))
// 验证 Service 创建
service := &corev1.Service{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "test-webapp",
Namespace: "default",
}, service)
}, time.Second*10, time.Millisecond*250).Should(Succeed())
Expect(service.Spec.Ports[0].Port).Should(Equal(int32(80)))
})
})
})
部署配置 #
# config/samples/apps_v1_webapp.yaml
apiVersion: apps.example.com/v1
kind: WebApp
metadata:
name: webapp-sample
spec:
image: nginx:1.20
replicas: 3
port: 80
env:
- name: ENV
value: production
resources:
requests:
memory: "64Mi"
cpu: "250m"
limits:
memory: "128Mi"
cpu: "500m"
serviceType: ClusterIP
ingress:
host: webapp.example.com
path: /
tls: true
Makefile 构建和部署 #
# Makefile
# 构建和推送镜像
docker-build:
docker build -t webapp-operator:latest .
docker-push:
docker push webapp-operator:latest
# 安装 CRD
install:
kubectl apply -f config/crd/bases
# 卸载 CRD
uninstall:
kubectl delete -f config/crd/bases
# 部署控制器
deploy:
kubectl apply -f config/rbac
kubectl apply -f config/manager
# 运行测试
test:
go test ./... -coverprofile cover.out
# 生成代码
generate:
controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
# 生成 manifests
manifests:
controller-gen rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases
通过掌握 Operator 开发基础,您可以创建功能强大的 Kubernetes 扩展,自动化复杂应用的管理和运维任务。这为构建云原生应用生态系统提供了强大的工具。