5.3.2 Kubernetes 客户端开发 #
client-go 是 Kubernetes 官方提供的 Go 客户端库,它为 Go 开发者提供了与 Kubernetes API Server 交互的完整功能。通过 client-go,我们可以创建、读取、更新和删除 Kubernetes 资源,监听资源变化,以及开发自定义控制器。
client-go 库概述 #
核心组件 #
client-go 库包含以下核心组件:
- Clientset:类型化客户端,提供对已知资源类型的访问
- Dynamic Client:动态客户端,可以操作任意资源类型
- Discovery Client:发现客户端,用于获取 API 资源信息
- RESTClient:底层 REST 客户端
- Informers:资源监听和缓存机制
- Workqueue:工作队列,用于事件处理
安装和初始化 #
# 安装 client-go
go mod init k8s-client-example
go get k8s.io/client-go@latest
go get k8s.io/apimachinery@latest
// client/client.go
package main
import (
"context"
"flag"
"fmt"
"path/filepath"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig",
filepath.Join(home, ".kube", "config"),
"(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "",
"absolute path to the kubeconfig file")
}
flag.Parse()
// 创建配置
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
// 尝试集群内配置
config, err = rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
}
// 创建客户端
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 使用客户端
pods, err := clientset.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))
}
基本资源操作 #
Pod 操作示例 #
// pod-operations.go
package main
import (
"context"
"fmt"
"log"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
type PodManager struct {
clientset *kubernetes.Clientset
namespace string
}
func NewPodManager(config *rest.Config, namespace string) (*PodManager, error) {
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return &PodManager{
clientset: clientset,
namespace: namespace,
}, nil
}
// 创建 Pod
func (pm *PodManager) CreatePod(name, image string) (*corev1.Pod, error) {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: pm.namespace,
Labels: map[string]string{
"app": name,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: name,
Image: image,
Ports: []corev1.ContainerPort{
{
ContainerPort: 80,
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
corev1.ResourceMemory: resource.MustParse("128Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("256Mi"),
},
},
},
},
RestartPolicy: corev1.RestartPolicyAlways,
},
}
result, err := pm.clientset.CoreV1().Pods(pm.namespace).Create(
context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create pod: %v", err)
}
log.Printf("Created pod %s in namespace %s", result.Name, result.Namespace)
return result, nil
}
// 获取 Pod
func (pm *PodManager) GetPod(name string) (*corev1.Pod, error) {
pod, err := pm.clientset.CoreV1().Pods(pm.namespace).Get(
context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get pod: %v", err)
}
return pod, nil
}
// 列出 Pods
func (pm *PodManager) ListPods(labelSelector string) (*corev1.PodList, error) {
listOptions := metav1.ListOptions{}
if labelSelector != "" {
listOptions.LabelSelector = labelSelector
}
pods, err := pm.clientset.CoreV1().Pods(pm.namespace).List(
context.TODO(), listOptions)
if err != nil {
return nil, fmt.Errorf("failed to list pods: %v", err)
}
return pods, nil
}
// 更新 Pod
func (pm *PodManager) UpdatePod(pod *corev1.Pod) (*corev1.Pod, error) {
result, err := pm.clientset.CoreV1().Pods(pm.namespace).Update(
context.TODO(), pod, metav1.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to update pod: %v", err)
}
return result, nil
}
// 删除 Pod
func (pm *PodManager) DeletePod(name string) error {
err := pm.clientset.CoreV1().Pods(pm.namespace).Delete(
context.TODO(), name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete pod: %v", err)
}
log.Printf("Deleted pod %s from namespace %s", name, pm.namespace)
return nil
}
// 获取 Pod 日志
func (pm *PodManager) GetPodLogs(podName, containerName string) (string, error) {
req := pm.clientset.CoreV1().Pods(pm.namespace).GetLogs(podName, &corev1.PodLogOptions{
Container: containerName,
})
logs, err := req.Stream(context.TODO())
if err != nil {
return "", fmt.Errorf("failed to get pod logs: %v", err)
}
defer logs.Close()
buf := new(bytes.Buffer)
_, err = io.Copy(buf, logs)
if err != nil {
return "", fmt.Errorf("failed to read logs: %v", err)
}
return buf.String(), nil
}
Deployment 操作示例 #
// deployment-operations.go
package main
import (
"context"
"fmt"
"log"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
type DeploymentManager struct {
clientset *kubernetes.Clientset
namespace string
}
func NewDeploymentManager(config *rest.Config, namespace string) (*DeploymentManager, error) {
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return &DeploymentManager{
clientset: clientset,
namespace: namespace,
}, nil
}
// 创建 Deployment
func (dm *DeploymentManager) CreateDeployment(name, image string, replicas int32) (*appsv1.Deployment, error) {
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: dm.namespace,
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": name,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": name,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: name,
Image: image,
Ports: []corev1.ContainerPort{
{
ContainerPort: 80,
},
},
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/health",
Port: intstr.FromInt(80),
},
},
InitialDelaySeconds: 30,
PeriodSeconds: 10,
},
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/ready",
Port: intstr.FromInt(80),
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 5,
},
},
},
},
},
Strategy: appsv1.DeploymentStrategy{
Type: appsv1.RollingUpdateDeploymentStrategyType,
RollingUpdate: &appsv1.RollingUpdateDeployment{
MaxSurge: &intstr.IntOrString{Type: intstr.Int, IntVal: 1},
MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 1},
},
},
},
}
result, err := dm.clientset.AppsV1().Deployments(dm.namespace).Create(
context.TODO(), deployment, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create deployment: %v", err)
}
log.Printf("Created deployment %s in namespace %s", result.Name, result.Namespace)
return result, nil
}
// 扩缩容 Deployment
func (dm *DeploymentManager) ScaleDeployment(name string, replicas int32) error {
deployment, err := dm.clientset.AppsV1().Deployments(dm.namespace).Get(
context.TODO(), name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get deployment: %v", err)
}
deployment.Spec.Replicas = &replicas
_, err = dm.clientset.AppsV1().Deployments(dm.namespace).Update(
context.TODO(), deployment, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to scale deployment: %v", err)
}
log.Printf("Scaled deployment %s to %d replicas", name, replicas)
return nil
}
// 更新镜像
func (dm *DeploymentManager) UpdateImage(name, containerName, newImage string) error {
deployment, err := dm.clientset.AppsV1().Deployments(dm.namespace).Get(
context.TODO(), name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get deployment: %v", err)
}
// 更新容器镜像
for i, container := range deployment.Spec.Template.Spec.Containers {
if container.Name == containerName {
deployment.Spec.Template.Spec.Containers[i].Image = newImage
break
}
}
_, err = dm.clientset.AppsV1().Deployments(dm.namespace).Update(
context.TODO(), deployment, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update deployment image: %v", err)
}
log.Printf("Updated deployment %s container %s to image %s", name, containerName, newImage)
return nil
}
动态客户端 #
动态客户端可以操作任意类型的 Kubernetes 资源:
// dynamic-client.go
package main
import (
"context"
"fmt"
"log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)
type DynamicResourceManager struct {
client dynamic.Interface
namespace string
}
func NewDynamicResourceManager(config *rest.Config, namespace string) (*DynamicResourceManager, error) {
client, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
return &DynamicResourceManager{
client: client,
namespace: namespace,
}, nil
}
// 创建任意资源
func (drm *DynamicResourceManager) CreateResource(gvr schema.GroupVersionResource, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
result, err := drm.client.Resource(gvr).Namespace(drm.namespace).Create(
context.TODO(), obj, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create resource: %v", err)
}
return result, nil
}
// 获取任意资源
func (drm *DynamicResourceManager) GetResource(gvr schema.GroupVersionResource, name string) (*unstructured.Unstructured, error) {
result, err := drm.client.Resource(gvr).Namespace(drm.namespace).Get(
context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get resource: %v", err)
}
return result, nil
}
// 列出任意资源
func (drm *DynamicResourceManager) ListResources(gvr schema.GroupVersionResource) (*unstructured.UnstructuredList, error) {
result, err := drm.client.Resource(gvr).Namespace(drm.namespace).List(
context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to list resources: %v", err)
}
return result, nil
}
// 使用示例
func (drm *DynamicResourceManager) ExampleUsage() {
// 定义 ConfigMap 的 GVR
configMapGVR := schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "configmaps",
}
// 创建 ConfigMap
configMap := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": map[string]interface{}{
"name": "example-config",
"namespace": drm.namespace,
},
"data": map[string]interface{}{
"key1": "value1",
"key2": "value2",
},
},
}
created, err := drm.CreateResource(configMapGVR, configMap)
if err != nil {
log.Printf("Failed to create ConfigMap: %v", err)
return
}
log.Printf("Created ConfigMap: %s", created.GetName())
// 获取 ConfigMap
retrieved, err := drm.GetResource(configMapGVR, "example-config")
if err != nil {
log.Printf("Failed to get ConfigMap: %v", err)
return
}
log.Printf("Retrieved ConfigMap: %s", retrieved.GetName())
}
Informers 和事件监听 #
Informers 提供了高效的资源监听和缓存机制:
// informer-example.go
package main
import (
"context"
"fmt"
"log"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
type PodWatcher struct {
clientset *kubernetes.Clientset
informer cache.SharedIndexInformer
}
func NewPodWatcher(config *rest.Config, namespace string) (*PodWatcher, error) {
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
// 创建 Informer Factory
factory := informers.NewSharedInformerFactoryWithOptions(
clientset,
time.Second*30,
informers.WithNamespace(namespace),
)
// 获取 Pod Informer
podInformer := factory.Core().V1().Pods().Informer()
return &PodWatcher{
clientset: clientset,
informer: podInformer,
}, nil
}
// 添加事件处理器
func (pw *PodWatcher) AddEventHandlers() {
pw.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
log.Printf("Pod ADDED: %s/%s", pod.Namespace, pod.Name)
pw.handlePodAdd(pod)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)
if oldPod.ResourceVersion == newPod.ResourceVersion {
return
}
log.Printf("Pod UPDATED: %s/%s", newPod.Namespace, newPod.Name)
pw.handlePodUpdate(oldPod, newPod)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
log.Printf("Pod DELETED: %s/%s", pod.Namespace, pod.Name)
pw.handlePodDelete(pod)
},
})
}
// 处理 Pod 添加事件
func (pw *PodWatcher) handlePodAdd(pod *corev1.Pod) {
log.Printf("New pod created: %s, Phase: %s", pod.Name, pod.Status.Phase)
// 检查 Pod 是否有特定标签
if app, exists := pod.Labels["app"]; exists {
log.Printf("Pod %s belongs to app: %s", pod.Name, app)
}
}
// 处理 Pod 更新事件
func (pw *PodWatcher) handlePodUpdate(oldPod, newPod *corev1.Pod) {
// 检查状态变化
if oldPod.Status.Phase != newPod.Status.Phase {
log.Printf("Pod %s phase changed from %s to %s",
newPod.Name, oldPod.Status.Phase, newPod.Status.Phase)
}
// 检查就绪状态变化
oldReady := isPodReady(oldPod)
newReady := isPodReady(newPod)
if oldReady != newReady {
log.Printf("Pod %s ready status changed from %t to %t",
newPod.Name, oldReady, newReady)
}
}
// 处理 Pod 删除事件
func (pw *PodWatcher) handlePodDelete(pod *corev1.Pod) {
log.Printf("Pod %s was deleted, final phase: %s", pod.Name, pod.Status.Phase)
}
// 检查 Pod 是否就绪
func isPodReady(pod *corev1.Pod) bool {
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady {
return condition.Status == corev1.ConditionTrue
}
}
return false
}
// 启动监听
func (pw *PodWatcher) Start(ctx context.Context) {
pw.AddEventHandlers()
go pw.informer.Run(ctx.Done())
// 等待缓存同步
if !cache.WaitForCacheSync(ctx.Done(), pw.informer.HasSynced) {
log.Fatal("Failed to sync cache")
}
log.Println("Pod watcher started successfully")
}
// 使用 Field Selector 的自定义 Informer
func (pw *PodWatcher) CreateCustomInformer(namespace string) cache.SharedIndexInformer {
listWatcher := cache.NewListWatchFromClient(
pw.clientset.CoreV1().RESTClient(),
"pods",
namespace,
fields.Everything(),
)
informer := cache.NewSharedIndexInformer(
listWatcher,
&corev1.Pod{},
time.Second*30,
cache.Indexers{},
)
return informer
}
工作队列和控制器模式 #
实现一个简单的控制器:
// controller-example.go
package main
import (
"context"
"fmt"
"log"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type PodController struct {
clientset *kubernetes.Clientset
podInformer cache.SharedIndexInformer
workqueue workqueue.RateLimitingInterface
}
func NewPodController(config *rest.Config) (*PodController, error) {
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
factory := informers.NewSharedInformerFactory(clientset, time.Second*30)
podInformer := factory.Core().V1().Pods().Informer()
controller := &PodController{
clientset: clientset,
podInformer: podInformer,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pods"),
}
// 添加事件处理器
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueuePod,
UpdateFunc: func(old, new interface{}) {
controller.enqueuePod(new)
},
DeleteFunc: controller.enqueuePod,
})
return controller, nil
}
// 将 Pod 加入工作队列
func (c *PodController) enqueuePod(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}
// 启动控制器
func (c *PodController) Run(ctx context.Context, workers int) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
log.Println("Starting Pod controller")
go c.podInformer.Run(ctx.Done())
// 等待缓存同步
if !cache.WaitForCacheSync(ctx.Done(), c.podInformer.HasSynced) {
return fmt.Errorf("failed to wait for caches to sync")
}
log.Println("Starting workers")
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
log.Println("Started workers")
<-ctx.Done()
log.Println("Shutting down workers")
return nil
}
// 工作协程
func (c *PodController) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
// 处理工作队列中的下一个项目
func (c *PodController) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.syncHandler(ctx, key); err != nil {
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.workqueue.Forget(obj)
log.Printf("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
// 同步处理器
func (c *PodController) syncHandler(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
// 从缓存中获取 Pod
pod, err := c.podInformer.GetIndexer().GetByKey(key)
if err != nil {
return err
}
if pod == nil {
// Pod 已被删除
log.Printf("Pod %s/%s has been deleted", namespace, name)
return nil
}
podObj := pod.(*corev1.Pod)
return c.processPod(ctx, podObj)
}
// 处理 Pod 逻辑
func (c *PodController) processPod(ctx context.Context, pod *corev1.Pod) error {
log.Printf("Processing pod: %s/%s, Phase: %s", pod.Namespace, pod.Name, pod.Status.Phase)
// 示例:为没有标签的 Pod 添加标签
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
if _, exists := pod.Labels["managed-by"]; !exists {
pod.Labels["managed-by"] = "pod-controller"
_, err := c.clientset.CoreV1().Pods(pod.Namespace).Update(
ctx, pod, metav1.UpdateOptions{})
if err != nil {
if errors.IsConflict(err) {
// 资源版本冲突,重新入队
return fmt.Errorf("conflict updating pod %s/%s", pod.Namespace, pod.Name)
}
return err
}
log.Printf("Added managed-by label to pod %s/%s", pod.Namespace, pod.Name)
}
return nil
}
认证和授权 #
服务账户和 RBAC #
# rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: pod-controller
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: pod-controller
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: pod-controller
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: pod-controller
subjects:
- kind: ServiceAccount
name: pod-controller
namespace: default
集群内认证 #
// in-cluster-auth.go
package main
import (
"context"
"log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
func main() {
// 使用集群内配置
config, err := rest.InClusterConfig()
if err != nil {
log.Fatal(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal(err)
}
// 获取当前命名空间
namespace := getCurrentNamespace()
pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.Fatal(err)
}
log.Printf("Found %d pods in namespace %s", len(pods.Items), namespace)
}
func getCurrentNamespace() string {
// 从服务账户令牌文件中读取命名空间
if data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil {
return string(data)
}
return "default"
}
完整应用示例 #
Pod 监控应用 #
// pod-monitor.go
package main
import (
"context"
"encoding/json"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
type PodMonitor struct {
clientset *kubernetes.Clientset
informer cache.SharedIndexInformer
podStats map[string]*PodInfo
}
type PodInfo struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
Phase string `json:"phase"`
Ready bool `json:"ready"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
func NewPodMonitor(config *rest.Config) (*PodMonitor, error) {
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
factory := informers.NewSharedInformerFactory(clientset, time.Second*30)
podInformer := factory.Core().V1().Pods().Informer()
monitor := &PodMonitor{
clientset: clientset,
informer: podInformer,
podStats: make(map[string]*PodInfo),
}
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: monitor.onPodAdd,
UpdateFunc: monitor.onPodUpdate,
DeleteFunc: monitor.onPodDelete,
})
return monitor, nil
}
func (pm *PodMonitor) onPodAdd(obj interface{}) {
pod := obj.(*corev1.Pod)
key := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
pm.podStats[key] = &PodInfo{
Name: pod.Name,
Namespace: pod.Namespace,
Phase: string(pod.Status.Phase),
Ready: isPodReady(pod),
CreatedAt: pod.CreationTimestamp.Time,
UpdatedAt: time.Now(),
}
log.Printf("Pod added: %s", key)
}
func (pm *PodMonitor) onPodUpdate(oldObj, newObj interface{}) {
pod := newObj.(*corev1.Pod)
key := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
if info, exists := pm.podStats[key]; exists {
info.Phase = string(pod.Status.Phase)
info.Ready = isPodReady(pod)
info.UpdatedAt = time.Now()
}
log.Printf("Pod updated: %s", key)
}
func (pm *PodMonitor) onPodDelete(obj interface{}) {
pod := obj.(*corev1.Pod)
key := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
delete(pm.podStats, key)
log.Printf("Pod deleted: %s", key)
}
func (pm *PodMonitor) Start(ctx context.Context) {
go pm.informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), pm.informer.HasSynced) {
log.Fatal("Failed to sync cache")
}
log.Println("Pod monitor started")
}
func (pm *PodMonitor) GetStats() map[string]*PodInfo {
return pm.podStats
}
// HTTP 服务器
func (pm *PodMonitor) StartHTTPServer(port string) {
http.HandleFunc("/pods", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(pm.GetStats())
})
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})
log.Printf("HTTP server starting on port %s", port)
log.Fatal(http.ListenAndServe(":"+port, nil))
}
func main() {
config, err := rest.InClusterConfig()
if err != nil {
log.Fatal(err)
}
monitor, err := NewPodMonitor(config)
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 启动监控
monitor.Start(ctx)
// 启动 HTTP 服务器
go monitor.StartHTTPServer("8080")
// 等待信号
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down...")
cancel()
}
通过掌握 client-go 的使用,您可以开发出功能强大的 Kubernetes 应用,为后续的 Operator 开发和自定义控制器实现奠定坚实的基础。