5.3.2 Kubernetes 客户端开发

5.3.2 Kubernetes 客户端开发 #

client-go 是 Kubernetes 官方提供的 Go 客户端库,它为 Go 开发者提供了与 Kubernetes API Server 交互的完整功能。通过 client-go,我们可以创建、读取、更新和删除 Kubernetes 资源,监听资源变化,以及开发自定义控制器。

client-go 库概述 #

核心组件 #

client-go 库包含以下核心组件:

  1. Clientset:类型化客户端,提供对已知资源类型的访问
  2. Dynamic Client:动态客户端,可以操作任意资源类型
  3. Discovery Client:发现客户端,用于获取 API 资源信息
  4. RESTClient:底层 REST 客户端
  5. Informers:资源监听和缓存机制
  6. Workqueue:工作队列,用于事件处理

安装和初始化 #

# 安装 client-go
go mod init k8s-client-example
go get k8s.io/client-go@latest
go get k8s.io/apimachinery@latest
// client/client.go
package main

import (
    "context"
    "flag"
    "fmt"
    "path/filepath"

    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
)

func main() {
    var kubeconfig *string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig",
            filepath.Join(home, ".kube", "config"),
            "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "",
            "absolute path to the kubeconfig file")
    }
    flag.Parse()

    // 创建配置
    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        // 尝试集群内配置
        config, err = rest.InClusterConfig()
        if err != nil {
            panic(err.Error())
        }
    }

    // 创建客户端
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }

    // 使用客户端
    pods, err := clientset.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        panic(err.Error())
    }

    fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))
}

基本资源操作 #

Pod 操作示例 #

// pod-operations.go
package main

import (
    "context"
    "fmt"
    "log"

    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
)

type PodManager struct {
    clientset *kubernetes.Clientset
    namespace string
}

func NewPodManager(config *rest.Config, namespace string) (*PodManager, error) {
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        return nil, err
    }

    return &PodManager{
        clientset: clientset,
        namespace: namespace,
    }, nil
}

// 创建 Pod
func (pm *PodManager) CreatePod(name, image string) (*corev1.Pod, error) {
    pod := &corev1.Pod{
        ObjectMeta: metav1.ObjectMeta{
            Name:      name,
            Namespace: pm.namespace,
            Labels: map[string]string{
                "app": name,
            },
        },
        Spec: corev1.PodSpec{
            Containers: []corev1.Container{
                {
                    Name:  name,
                    Image: image,
                    Ports: []corev1.ContainerPort{
                        {
                            ContainerPort: 80,
                        },
                    },
                    Resources: corev1.ResourceRequirements{
                        Requests: corev1.ResourceList{
                            corev1.ResourceCPU:    resource.MustParse("100m"),
                            corev1.ResourceMemory: resource.MustParse("128Mi"),
                        },
                        Limits: corev1.ResourceList{
                            corev1.ResourceCPU:    resource.MustParse("500m"),
                            corev1.ResourceMemory: resource.MustParse("256Mi"),
                        },
                    },
                },
            },
            RestartPolicy: corev1.RestartPolicyAlways,
        },
    }

    result, err := pm.clientset.CoreV1().Pods(pm.namespace).Create(
        context.TODO(), pod, metav1.CreateOptions{})
    if err != nil {
        return nil, fmt.Errorf("failed to create pod: %v", err)
    }

    log.Printf("Created pod %s in namespace %s", result.Name, result.Namespace)
    return result, nil
}

// 获取 Pod
func (pm *PodManager) GetPod(name string) (*corev1.Pod, error) {
    pod, err := pm.clientset.CoreV1().Pods(pm.namespace).Get(
        context.TODO(), name, metav1.GetOptions{})
    if err != nil {
        return nil, fmt.Errorf("failed to get pod: %v", err)
    }
    return pod, nil
}

// 列出 Pods
func (pm *PodManager) ListPods(labelSelector string) (*corev1.PodList, error) {
    listOptions := metav1.ListOptions{}
    if labelSelector != "" {
        listOptions.LabelSelector = labelSelector
    }

    pods, err := pm.clientset.CoreV1().Pods(pm.namespace).List(
        context.TODO(), listOptions)
    if err != nil {
        return nil, fmt.Errorf("failed to list pods: %v", err)
    }
    return pods, nil
}

// 更新 Pod
func (pm *PodManager) UpdatePod(pod *corev1.Pod) (*corev1.Pod, error) {
    result, err := pm.clientset.CoreV1().Pods(pm.namespace).Update(
        context.TODO(), pod, metav1.UpdateOptions{})
    if err != nil {
        return nil, fmt.Errorf("failed to update pod: %v", err)
    }
    return result, nil
}

// 删除 Pod
func (pm *PodManager) DeletePod(name string) error {
    err := pm.clientset.CoreV1().Pods(pm.namespace).Delete(
        context.TODO(), name, metav1.DeleteOptions{})
    if err != nil {
        return fmt.Errorf("failed to delete pod: %v", err)
    }
    log.Printf("Deleted pod %s from namespace %s", name, pm.namespace)
    return nil
}

// 获取 Pod 日志
func (pm *PodManager) GetPodLogs(podName, containerName string) (string, error) {
    req := pm.clientset.CoreV1().Pods(pm.namespace).GetLogs(podName, &corev1.PodLogOptions{
        Container: containerName,
    })

    logs, err := req.Stream(context.TODO())
    if err != nil {
        return "", fmt.Errorf("failed to get pod logs: %v", err)
    }
    defer logs.Close()

    buf := new(bytes.Buffer)
    _, err = io.Copy(buf, logs)
    if err != nil {
        return "", fmt.Errorf("failed to read logs: %v", err)
    }

    return buf.String(), nil
}

Deployment 操作示例 #

// deployment-operations.go
package main

import (
    "context"
    "fmt"
    "log"

    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/util/intstr"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
)

type DeploymentManager struct {
    clientset *kubernetes.Clientset
    namespace string
}

func NewDeploymentManager(config *rest.Config, namespace string) (*DeploymentManager, error) {
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        return nil, err
    }

    return &DeploymentManager{
        clientset: clientset,
        namespace: namespace,
    }, nil
}

// 创建 Deployment
func (dm *DeploymentManager) CreateDeployment(name, image string, replicas int32) (*appsv1.Deployment, error) {
    deployment := &appsv1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name:      name,
            Namespace: dm.namespace,
        },
        Spec: appsv1.DeploymentSpec{
            Replicas: &replicas,
            Selector: &metav1.LabelSelector{
                MatchLabels: map[string]string{
                    "app": name,
                },
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: map[string]string{
                        "app": name,
                    },
                },
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{
                        {
                            Name:  name,
                            Image: image,
                            Ports: []corev1.ContainerPort{
                                {
                                    ContainerPort: 80,
                                },
                            },
                            LivenessProbe: &corev1.Probe{
                                ProbeHandler: corev1.ProbeHandler{
                                    HTTPGet: &corev1.HTTPGetAction{
                                        Path: "/health",
                                        Port: intstr.FromInt(80),
                                    },
                                },
                                InitialDelaySeconds: 30,
                                PeriodSeconds:       10,
                            },
                            ReadinessProbe: &corev1.Probe{
                                ProbeHandler: corev1.ProbeHandler{
                                    HTTPGet: &corev1.HTTPGetAction{
                                        Path: "/ready",
                                        Port: intstr.FromInt(80),
                                    },
                                },
                                InitialDelaySeconds: 5,
                                PeriodSeconds:       5,
                            },
                        },
                    },
                },
            },
            Strategy: appsv1.DeploymentStrategy{
                Type: appsv1.RollingUpdateDeploymentStrategyType,
                RollingUpdate: &appsv1.RollingUpdateDeployment{
                    MaxSurge:       &intstr.IntOrString{Type: intstr.Int, IntVal: 1},
                    MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 1},
                },
            },
        },
    }

    result, err := dm.clientset.AppsV1().Deployments(dm.namespace).Create(
        context.TODO(), deployment, metav1.CreateOptions{})
    if err != nil {
        return nil, fmt.Errorf("failed to create deployment: %v", err)
    }

    log.Printf("Created deployment %s in namespace %s", result.Name, result.Namespace)
    return result, nil
}

// 扩缩容 Deployment
func (dm *DeploymentManager) ScaleDeployment(name string, replicas int32) error {
    deployment, err := dm.clientset.AppsV1().Deployments(dm.namespace).Get(
        context.TODO(), name, metav1.GetOptions{})
    if err != nil {
        return fmt.Errorf("failed to get deployment: %v", err)
    }

    deployment.Spec.Replicas = &replicas

    _, err = dm.clientset.AppsV1().Deployments(dm.namespace).Update(
        context.TODO(), deployment, metav1.UpdateOptions{})
    if err != nil {
        return fmt.Errorf("failed to scale deployment: %v", err)
    }

    log.Printf("Scaled deployment %s to %d replicas", name, replicas)
    return nil
}

// 更新镜像
func (dm *DeploymentManager) UpdateImage(name, containerName, newImage string) error {
    deployment, err := dm.clientset.AppsV1().Deployments(dm.namespace).Get(
        context.TODO(), name, metav1.GetOptions{})
    if err != nil {
        return fmt.Errorf("failed to get deployment: %v", err)
    }

    // 更新容器镜像
    for i, container := range deployment.Spec.Template.Spec.Containers {
        if container.Name == containerName {
            deployment.Spec.Template.Spec.Containers[i].Image = newImage
            break
        }
    }

    _, err = dm.clientset.AppsV1().Deployments(dm.namespace).Update(
        context.TODO(), deployment, metav1.UpdateOptions{})
    if err != nil {
        return fmt.Errorf("failed to update deployment image: %v", err)
    }

    log.Printf("Updated deployment %s container %s to image %s", name, containerName, newImage)
    return nil
}

动态客户端 #

动态客户端可以操作任意类型的 Kubernetes 资源:

// dynamic-client.go
package main

import (
    "context"
    "fmt"
    "log"

    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/client-go/dynamic"
    "k8s.io/client-go/rest"
)

type DynamicResourceManager struct {
    client    dynamic.Interface
    namespace string
}

func NewDynamicResourceManager(config *rest.Config, namespace string) (*DynamicResourceManager, error) {
    client, err := dynamic.NewForConfig(config)
    if err != nil {
        return nil, err
    }

    return &DynamicResourceManager{
        client:    client,
        namespace: namespace,
    }, nil
}

// 创建任意资源
func (drm *DynamicResourceManager) CreateResource(gvr schema.GroupVersionResource, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
    result, err := drm.client.Resource(gvr).Namespace(drm.namespace).Create(
        context.TODO(), obj, metav1.CreateOptions{})
    if err != nil {
        return nil, fmt.Errorf("failed to create resource: %v", err)
    }
    return result, nil
}

// 获取任意资源
func (drm *DynamicResourceManager) GetResource(gvr schema.GroupVersionResource, name string) (*unstructured.Unstructured, error) {
    result, err := drm.client.Resource(gvr).Namespace(drm.namespace).Get(
        context.TODO(), name, metav1.GetOptions{})
    if err != nil {
        return nil, fmt.Errorf("failed to get resource: %v", err)
    }
    return result, nil
}

// 列出任意资源
func (drm *DynamicResourceManager) ListResources(gvr schema.GroupVersionResource) (*unstructured.UnstructuredList, error) {
    result, err := drm.client.Resource(gvr).Namespace(drm.namespace).List(
        context.TODO(), metav1.ListOptions{})
    if err != nil {
        return nil, fmt.Errorf("failed to list resources: %v", err)
    }
    return result, nil
}

// 使用示例
func (drm *DynamicResourceManager) ExampleUsage() {
    // 定义 ConfigMap 的 GVR
    configMapGVR := schema.GroupVersionResource{
        Group:    "",
        Version:  "v1",
        Resource: "configmaps",
    }

    // 创建 ConfigMap
    configMap := &unstructured.Unstructured{
        Object: map[string]interface{}{
            "apiVersion": "v1",
            "kind":       "ConfigMap",
            "metadata": map[string]interface{}{
                "name":      "example-config",
                "namespace": drm.namespace,
            },
            "data": map[string]interface{}{
                "key1": "value1",
                "key2": "value2",
            },
        },
    }

    created, err := drm.CreateResource(configMapGVR, configMap)
    if err != nil {
        log.Printf("Failed to create ConfigMap: %v", err)
        return
    }

    log.Printf("Created ConfigMap: %s", created.GetName())

    // 获取 ConfigMap
    retrieved, err := drm.GetResource(configMapGVR, "example-config")
    if err != nil {
        log.Printf("Failed to get ConfigMap: %v", err)
        return
    }

    log.Printf("Retrieved ConfigMap: %s", retrieved.GetName())
}

Informers 和事件监听 #

Informers 提供了高效的资源监听和缓存机制:

// informer-example.go
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/fields"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/cache"
)

type PodWatcher struct {
    clientset *kubernetes.Clientset
    informer  cache.SharedIndexInformer
}

func NewPodWatcher(config *rest.Config, namespace string) (*PodWatcher, error) {
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        return nil, err
    }

    // 创建 Informer Factory
    factory := informers.NewSharedInformerFactoryWithOptions(
        clientset,
        time.Second*30,
        informers.WithNamespace(namespace),
    )

    // 获取 Pod Informer
    podInformer := factory.Core().V1().Pods().Informer()

    return &PodWatcher{
        clientset: clientset,
        informer:  podInformer,
    }, nil
}

// 添加事件处理器
func (pw *PodWatcher) AddEventHandlers() {
    pw.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            pod := obj.(*corev1.Pod)
            log.Printf("Pod ADDED: %s/%s", pod.Namespace, pod.Name)
            pw.handlePodAdd(pod)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            oldPod := oldObj.(*corev1.Pod)
            newPod := newObj.(*corev1.Pod)

            if oldPod.ResourceVersion == newPod.ResourceVersion {
                return
            }

            log.Printf("Pod UPDATED: %s/%s", newPod.Namespace, newPod.Name)
            pw.handlePodUpdate(oldPod, newPod)
        },
        DeleteFunc: func(obj interface{}) {
            pod := obj.(*corev1.Pod)
            log.Printf("Pod DELETED: %s/%s", pod.Namespace, pod.Name)
            pw.handlePodDelete(pod)
        },
    })
}

// 处理 Pod 添加事件
func (pw *PodWatcher) handlePodAdd(pod *corev1.Pod) {
    log.Printf("New pod created: %s, Phase: %s", pod.Name, pod.Status.Phase)

    // 检查 Pod 是否有特定标签
    if app, exists := pod.Labels["app"]; exists {
        log.Printf("Pod %s belongs to app: %s", pod.Name, app)
    }
}

// 处理 Pod 更新事件
func (pw *PodWatcher) handlePodUpdate(oldPod, newPod *corev1.Pod) {
    // 检查状态变化
    if oldPod.Status.Phase != newPod.Status.Phase {
        log.Printf("Pod %s phase changed from %s to %s",
            newPod.Name, oldPod.Status.Phase, newPod.Status.Phase)
    }

    // 检查就绪状态变化
    oldReady := isPodReady(oldPod)
    newReady := isPodReady(newPod)

    if oldReady != newReady {
        log.Printf("Pod %s ready status changed from %t to %t",
            newPod.Name, oldReady, newReady)
    }
}

// 处理 Pod 删除事件
func (pw *PodWatcher) handlePodDelete(pod *corev1.Pod) {
    log.Printf("Pod %s was deleted, final phase: %s", pod.Name, pod.Status.Phase)
}

// 检查 Pod 是否就绪
func isPodReady(pod *corev1.Pod) bool {
    for _, condition := range pod.Status.Conditions {
        if condition.Type == corev1.PodReady {
            return condition.Status == corev1.ConditionTrue
        }
    }
    return false
}

// 启动监听
func (pw *PodWatcher) Start(ctx context.Context) {
    pw.AddEventHandlers()

    go pw.informer.Run(ctx.Done())

    // 等待缓存同步
    if !cache.WaitForCacheSync(ctx.Done(), pw.informer.HasSynced) {
        log.Fatal("Failed to sync cache")
    }

    log.Println("Pod watcher started successfully")
}

// 使用 Field Selector 的自定义 Informer
func (pw *PodWatcher) CreateCustomInformer(namespace string) cache.SharedIndexInformer {
    listWatcher := cache.NewListWatchFromClient(
        pw.clientset.CoreV1().RESTClient(),
        "pods",
        namespace,
        fields.Everything(),
    )

    informer := cache.NewSharedIndexInformer(
        listWatcher,
        &corev1.Pod{},
        time.Second*30,
        cache.Indexers{},
    )

    return informer
}

工作队列和控制器模式 #

实现一个简单的控制器:

// controller-example.go
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
)

type PodController struct {
    clientset    *kubernetes.Clientset
    podInformer  cache.SharedIndexInformer
    workqueue    workqueue.RateLimitingInterface
}

func NewPodController(config *rest.Config) (*PodController, error) {
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        return nil, err
    }

    factory := informers.NewSharedInformerFactory(clientset, time.Second*30)
    podInformer := factory.Core().V1().Pods().Informer()

    controller := &PodController{
        clientset:   clientset,
        podInformer: podInformer,
        workqueue:   workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pods"),
    }

    // 添加事件处理器
    podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueuePod,
        UpdateFunc: func(old, new interface{}) {
            controller.enqueuePod(new)
        },
        DeleteFunc: controller.enqueuePod,
    })

    return controller, nil
}

// 将 Pod 加入工作队列
func (c *PodController) enqueuePod(obj interface{}) {
    var key string
    var err error

    if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
        utilruntime.HandleError(err)
        return
    }

    c.workqueue.Add(key)
}

// 启动控制器
func (c *PodController) Run(ctx context.Context, workers int) error {
    defer utilruntime.HandleCrash()
    defer c.workqueue.ShutDown()

    log.Println("Starting Pod controller")

    go c.podInformer.Run(ctx.Done())

    // 等待缓存同步
    if !cache.WaitForCacheSync(ctx.Done(), c.podInformer.HasSynced) {
        return fmt.Errorf("failed to wait for caches to sync")
    }

    log.Println("Starting workers")
    for i := 0; i < workers; i++ {
        go wait.UntilWithContext(ctx, c.runWorker, time.Second)
    }

    log.Println("Started workers")
    <-ctx.Done()
    log.Println("Shutting down workers")

    return nil
}

// 工作协程
func (c *PodController) runWorker(ctx context.Context) {
    for c.processNextWorkItem(ctx) {
    }
}

// 处理工作队列中的下一个项目
func (c *PodController) processNextWorkItem(ctx context.Context) bool {
    obj, shutdown := c.workqueue.Get()
    if shutdown {
        return false
    }

    err := func(obj interface{}) error {
        defer c.workqueue.Done(obj)

        var key string
        var ok bool

        if key, ok = obj.(string); !ok {
            c.workqueue.Forget(obj)
            utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
            return nil
        }

        if err := c.syncHandler(ctx, key); err != nil {
            c.workqueue.AddRateLimited(key)
            return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
        }

        c.workqueue.Forget(obj)
        log.Printf("Successfully synced '%s'", key)
        return nil
    }(obj)

    if err != nil {
        utilruntime.HandleError(err)
        return true
    }

    return true
}

// 同步处理器
func (c *PodController) syncHandler(ctx context.Context, key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
        return nil
    }

    // 从缓存中获取 Pod
    pod, err := c.podInformer.GetIndexer().GetByKey(key)
    if err != nil {
        return err
    }

    if pod == nil {
        // Pod 已被删除
        log.Printf("Pod %s/%s has been deleted", namespace, name)
        return nil
    }

    podObj := pod.(*corev1.Pod)
    return c.processPod(ctx, podObj)
}

// 处理 Pod 逻辑
func (c *PodController) processPod(ctx context.Context, pod *corev1.Pod) error {
    log.Printf("Processing pod: %s/%s, Phase: %s", pod.Namespace, pod.Name, pod.Status.Phase)

    // 示例:为没有标签的 Pod 添加标签
    if pod.Labels == nil {
        pod.Labels = make(map[string]string)
    }

    if _, exists := pod.Labels["managed-by"]; !exists {
        pod.Labels["managed-by"] = "pod-controller"

        _, err := c.clientset.CoreV1().Pods(pod.Namespace).Update(
            ctx, pod, metav1.UpdateOptions{})
        if err != nil {
            if errors.IsConflict(err) {
                // 资源版本冲突,重新入队
                return fmt.Errorf("conflict updating pod %s/%s", pod.Namespace, pod.Name)
            }
            return err
        }

        log.Printf("Added managed-by label to pod %s/%s", pod.Namespace, pod.Name)
    }

    return nil
}

认证和授权 #

服务账户和 RBAC #

# rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: pod-controller
  namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: pod-controller
rules:
  - apiGroups: [""]
    resources: ["pods"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: [""]
    resources: ["events"]
    verbs: ["create"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: pod-controller
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: pod-controller
subjects:
  - kind: ServiceAccount
    name: pod-controller
    namespace: default

集群内认证 #

// in-cluster-auth.go
package main

import (
    "context"
    "log"

    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
)

func main() {
    // 使用集群内配置
    config, err := rest.InClusterConfig()
    if err != nil {
        log.Fatal(err)
    }

    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Fatal(err)
    }

    // 获取当前命名空间
    namespace := getCurrentNamespace()

    pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Found %d pods in namespace %s", len(pods.Items), namespace)
}

func getCurrentNamespace() string {
    // 从服务账户令牌文件中读取命名空间
    if data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil {
        return string(data)
    }
    return "default"
}

完整应用示例 #

Pod 监控应用 #

// pod-monitor.go
package main

import (
    "context"
    "encoding/json"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/cache"
)

type PodMonitor struct {
    clientset *kubernetes.Clientset
    informer  cache.SharedIndexInformer
    podStats  map[string]*PodInfo
}

type PodInfo struct {
    Name      string    `json:"name"`
    Namespace string    `json:"namespace"`
    Phase     string    `json:"phase"`
    Ready     bool      `json:"ready"`
    CreatedAt time.Time `json:"created_at"`
    UpdatedAt time.Time `json:"updated_at"`
}

func NewPodMonitor(config *rest.Config) (*PodMonitor, error) {
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        return nil, err
    }

    factory := informers.NewSharedInformerFactory(clientset, time.Second*30)
    podInformer := factory.Core().V1().Pods().Informer()

    monitor := &PodMonitor{
        clientset: clientset,
        informer:  podInformer,
        podStats:  make(map[string]*PodInfo),
    }

    podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    monitor.onPodAdd,
        UpdateFunc: monitor.onPodUpdate,
        DeleteFunc: monitor.onPodDelete,
    })

    return monitor, nil
}

func (pm *PodMonitor) onPodAdd(obj interface{}) {
    pod := obj.(*corev1.Pod)
    key := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)

    pm.podStats[key] = &PodInfo{
        Name:      pod.Name,
        Namespace: pod.Namespace,
        Phase:     string(pod.Status.Phase),
        Ready:     isPodReady(pod),
        CreatedAt: pod.CreationTimestamp.Time,
        UpdatedAt: time.Now(),
    }

    log.Printf("Pod added: %s", key)
}

func (pm *PodMonitor) onPodUpdate(oldObj, newObj interface{}) {
    pod := newObj.(*corev1.Pod)
    key := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)

    if info, exists := pm.podStats[key]; exists {
        info.Phase = string(pod.Status.Phase)
        info.Ready = isPodReady(pod)
        info.UpdatedAt = time.Now()
    }

    log.Printf("Pod updated: %s", key)
}

func (pm *PodMonitor) onPodDelete(obj interface{}) {
    pod := obj.(*corev1.Pod)
    key := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)

    delete(pm.podStats, key)
    log.Printf("Pod deleted: %s", key)
}

func (pm *PodMonitor) Start(ctx context.Context) {
    go pm.informer.Run(ctx.Done())

    if !cache.WaitForCacheSync(ctx.Done(), pm.informer.HasSynced) {
        log.Fatal("Failed to sync cache")
    }

    log.Println("Pod monitor started")
}

func (pm *PodMonitor) GetStats() map[string]*PodInfo {
    return pm.podStats
}

// HTTP 服务器
func (pm *PodMonitor) StartHTTPServer(port string) {
    http.HandleFunc("/pods", func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(pm.GetStats())
    })

    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusOK)
        w.Write([]byte("OK"))
    })

    log.Printf("HTTP server starting on port %s", port)
    log.Fatal(http.ListenAndServe(":"+port, nil))
}

func main() {
    config, err := rest.InClusterConfig()
    if err != nil {
        log.Fatal(err)
    }

    monitor, err := NewPodMonitor(config)
    if err != nil {
        log.Fatal(err)
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 启动监控
    monitor.Start(ctx)

    // 启动 HTTP 服务器
    go monitor.StartHTTPServer("8080")

    // 等待信号
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    <-sigCh

    log.Println("Shutting down...")
    cancel()
}

通过掌握 client-go 的使用,您可以开发出功能强大的 Kubernetes 应用,为后续的 Operator 开发和自定义控制器实现奠定坚实的基础。