5.4.1 服务注册与发现

5.4.1 服务注册与发现 #

服务发现是微服务架构的基础设施组件,它解决了在动态环境中服务实例如何相互发现和通信的问题。在传统的单体应用中,服务间的调用通过静态配置的 IP 和端口进行,但在微服务环境中,服务实例会动态创建和销毁,需要一个自动化的机制来管理服务的位置信息。

服务发现基础概念 #

服务发现的核心问题 #

在微服务架构中,服务发现需要解决以下核心问题:

  1. 服务注册:服务实例启动时如何向注册中心声明自己的存在
  2. 服务发现:客户端如何找到可用的服务实例
  3. 健康检查:如何确保返回的服务实例是健康可用的
  4. 负载均衡:如何在多个服务实例间分配请求
  5. 故障处理:如何处理服务实例的故障和恢复

服务发现模式 #

客户端发现模式

┌─────────────┐    1. 查询服务    ┌─────────────────┐
│   Client    │ ───────────────→ │ Service Registry │
│             │ ←─────────────── │                 │
└─────────────┘    2. 返回实例    └─────────────────┘
       │
       │ 3. 直接调用
       ▼
┌─────────────┐
│   Service   │
│  Instance   │
└─────────────┘

服务端发现模式

┌─────────────┐    1. 请求      ┌─────────────────┐
│   Client    │ ─────────────→ │  Load Balancer  │
│             │                │   (Router)      │
└─────────────┘                └─────────────────┘
                                       │
                                       │ 2. 路由请求
                                       ▼
                               ┌─────────────┐
                               │   Service   │
                               │  Instance   │
                               └─────────────┘

服务注册中心实现 #

基础服务注册中心 #

// registry/service_registry.go
package registry

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// ServiceInstance 服务实例信息
type ServiceInstance struct {
    ID       string            `json:"id"`
    Name     string            `json:"name"`
    Address  string            `json:"address"`
    Port     int               `json:"port"`
    Metadata map[string]string `json:"metadata"`
    Health   HealthStatus      `json:"health"`

    // 内部字段
    LastHeartbeat time.Time `json:"last_heartbeat"`
    RegisterTime  time.Time `json:"register_time"`
}

// HealthStatus 健康状态
type HealthStatus string

const (
    HealthStatusHealthy   HealthStatus = "healthy"
    HealthStatusUnhealthy HealthStatus = "unhealthy"
    HealthStatusUnknown   HealthStatus = "unknown"
)

// ServiceRegistry 服务注册中心接口
type ServiceRegistry interface {
    Register(ctx context.Context, instance *ServiceInstance) error
    Deregister(ctx context.Context, instanceID string) error
    Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error)
    Watch(ctx context.Context, serviceName string) (<-chan []*ServiceInstance, error)
    Heartbeat(ctx context.Context, instanceID string) error
}

// InMemoryRegistry 内存实现的服务注册中心
type InMemoryRegistry struct {
    services map[string]map[string]*ServiceInstance
    watchers map[string][]chan []*ServiceInstance
    mutex    sync.RWMutex

    // 健康检查配置
    heartbeatTimeout time.Duration
    cleanupInterval  time.Duration
}

// NewInMemoryRegistry 创建内存注册中心
func NewInMemoryRegistry() *InMemoryRegistry {
    registry := &InMemoryRegistry{
        services:         make(map[string]map[string]*ServiceInstance),
        watchers:         make(map[string][]chan []*ServiceInstance),
        heartbeatTimeout: time.Minute * 2,
        cleanupInterval:  time.Second * 30,
    }

    // 启动清理协程
    go registry.startCleanup()

    return registry
}

// Register 注册服务实例
func (r *InMemoryRegistry) Register(ctx context.Context, instance *ServiceInstance) error {
    r.mutex.Lock()
    defer r.mutex.Unlock()

    // 初始化服务映射
    if r.services[instance.Name] == nil {
        r.services[instance.Name] = make(map[string]*ServiceInstance)
    }

    // 设置注册时间和心跳时间
    now := time.Now()
    instance.RegisterTime = now
    instance.LastHeartbeat = now
    instance.Health = HealthStatusHealthy

    // 注册实例
    r.services[instance.Name][instance.ID] = instance

    // 通知观察者
    r.notifyWatchers(instance.Name)

    return nil
}

// Deregister 注销服务实例
func (r *InMemoryRegistry) Deregister(ctx context.Context, instanceID string) error {
    r.mutex.Lock()
    defer r.mutex.Unlock()

    // 查找并删除实例
    for serviceName, instances := range r.services {
        if _, exists := instances[instanceID]; exists {
            delete(instances, instanceID)

            // 如果服务没有实例了,删除服务
            if len(instances) == 0 {
                delete(r.services, serviceName)
            }

            // 通知观察者
            r.notifyWatchers(serviceName)
            return nil
        }
    }

    return fmt.Errorf("instance %s not found", instanceID)
}

// Discover 发现服务实例
func (r *InMemoryRegistry) Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error) {
    r.mutex.RLock()
    defer r.mutex.RUnlock()

    instances := r.services[serviceName]
    if instances == nil {
        return []*ServiceInstance{}, nil
    }

    // 只返回健康的实例
    var healthyInstances []*ServiceInstance
    for _, instance := range instances {
        if instance.Health == HealthStatusHealthy {
            healthyInstances = append(healthyInstances, instance)
        }
    }

    return healthyInstances, nil
}

// Watch 监听服务变化
func (r *InMemoryRegistry) Watch(ctx context.Context, serviceName string) (<-chan []*ServiceInstance, error) {
    r.mutex.Lock()
    defer r.mutex.Unlock()

    ch := make(chan []*ServiceInstance, 1)
    r.watchers[serviceName] = append(r.watchers[serviceName], ch)

    // 发送当前状态
    if instances := r.services[serviceName]; instances != nil {
        var healthyInstances []*ServiceInstance
        for _, instance := range instances {
            if instance.Health == HealthStatusHealthy {
                healthyInstances = append(healthyInstances, instance)
            }
        }
        select {
        case ch <- healthyInstances:
        default:
        }
    }

    return ch, nil
}

// Heartbeat 心跳检查
func (r *InMemoryRegistry) Heartbeat(ctx context.Context, instanceID string) error {
    r.mutex.Lock()
    defer r.mutex.Unlock()

    // 查找实例并更新心跳时间
    for serviceName, instances := range r.services {
        if instance, exists := instances[instanceID]; exists {
            instance.LastHeartbeat = time.Now()
            instance.Health = HealthStatusHealthy

            // 通知观察者
            r.notifyWatchers(serviceName)
            return nil
        }
    }

    return fmt.Errorf("instance %s not found", instanceID)
}

// notifyWatchers 通知观察者
func (r *InMemoryRegistry) notifyWatchers(serviceName string) {
    watchers := r.watchers[serviceName]
    if len(watchers) == 0 {
        return
    }

    instances := r.services[serviceName]
    var healthyInstances []*ServiceInstance
    for _, instance := range instances {
        if instance.Health == HealthStatusHealthy {
            healthyInstances = append(healthyInstances, instance)
        }
    }

    // 通知所有观察者
    for _, watcher := range watchers {
        select {
        case watcher <- healthyInstances:
        default:
            // 如果通道阻塞,跳过这个观察者
        }
    }
}

// startCleanup 启动清理协程
func (r *InMemoryRegistry) startCleanup() {
    ticker := time.NewTicker(r.cleanupInterval)
    defer ticker.Stop()

    for range ticker.C {
        r.cleanup()
    }
}

// cleanup 清理不健康的实例
func (r *InMemoryRegistry) cleanup() {
    r.mutex.Lock()
    defer r.mutex.Unlock()

    now := time.Now()

    for serviceName, instances := range r.services {
        changed := false

        for instanceID, instance := range instances {
            // 检查心跳超时
            if now.Sub(instance.LastHeartbeat) > r.heartbeatTimeout {
                if instance.Health == HealthStatusHealthy {
                    instance.Health = HealthStatusUnhealthy
                    changed = true
                }

                // 如果超时太久,直接删除实例
                if now.Sub(instance.LastHeartbeat) > r.heartbeatTimeout*2 {
                    delete(instances, instanceID)
                    changed = true
                }
            }
        }

        // 如果服务没有实例了,删除服务
        if len(instances) == 0 {
            delete(r.services, serviceName)
            changed = true
        }

        // 通知观察者
        if changed {
            r.notifyWatchers(serviceName)
        }
    }
}

服务注册客户端 #

// client/registry_client.go
package client

import (
    "context"
    "fmt"
    "log"
    "net"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/example/service-discovery/registry"
)

// RegistryClient 注册中心客户端
type RegistryClient struct {
    registry     registry.ServiceRegistry
    instance     *registry.ServiceInstance
    heartbeatInterval time.Duration

    // 控制协程
    ctx    context.Context
    cancel context.CancelFunc
}

// NewRegistryClient 创建注册中心客户端
func NewRegistryClient(reg registry.ServiceRegistry, serviceName, address string, port int) *RegistryClient {
    ctx, cancel := context.WithCancel(context.Background())

    // 生成实例 ID
    hostname, _ := os.Hostname()
    instanceID := fmt.Sprintf("%s-%s-%d-%d", serviceName, hostname, port, time.Now().Unix())

    instance := &registry.ServiceInstance{
        ID:       instanceID,
        Name:     serviceName,
        Address:  address,
        Port:     port,
        Metadata: make(map[string]string),
    }

    return &RegistryClient{
        registry:          reg,
        instance:          instance,
        heartbeatInterval: time.Second * 30,
        ctx:               ctx,
        cancel:            cancel,
    }
}

// Start 启动客户端
func (c *RegistryClient) Start() error {
    // 注册服务实例
    if err := c.registry.Register(c.ctx, c.instance); err != nil {
        return fmt.Errorf("failed to register service: %v", err)
    }

    log.Printf("Service registered: %s (%s:%d)", c.instance.ID, c.instance.Address, c.instance.Port)

    // 启动心跳
    go c.startHeartbeat()

    // 监听退出信号
    go c.handleShutdown()

    return nil
}

// Stop 停止客户端
func (c *RegistryClient) Stop() error {
    c.cancel()

    // 注销服务实例
    if err := c.registry.Deregister(context.Background(), c.instance.ID); err != nil {
        log.Printf("Failed to deregister service: %v", err)
        return err
    }

    log.Printf("Service deregistered: %s", c.instance.ID)
    return nil
}

// SetMetadata 设置元数据
func (c *RegistryClient) SetMetadata(key, value string) {
    c.instance.Metadata[key] = value
}

// startHeartbeat 启动心跳
func (c *RegistryClient) startHeartbeat() {
    ticker := time.NewTicker(c.heartbeatInterval)
    defer ticker.Stop()

    for {
        select {
        case <-c.ctx.Done():
            return
        case <-ticker.C:
            if err := c.registry.Heartbeat(c.ctx, c.instance.ID); err != nil {
                log.Printf("Heartbeat failed: %v", err)
            }
        }
    }
}

// handleShutdown 处理优雅关闭
func (c *RegistryClient) handleShutdown() {
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

    select {
    case <-sigCh:
        log.Println("Received shutdown signal")
        c.Stop()
    case <-c.ctx.Done():
        return
    }
}

健康检查机制 #

HTTP 健康检查 #

// health/http_checker.go
package health

import (
    "context"
    "fmt"
    "net/http"
    "time"
)

// HTTPHealthChecker HTTP 健康检查器
type HTTPHealthChecker struct {
    client  *http.Client
    timeout time.Duration
}

// NewHTTPHealthChecker 创建 HTTP 健康检查器
func NewHTTPHealthChecker(timeout time.Duration) *HTTPHealthChecker {
    return &HTTPHealthChecker{
        client: &http.Client{
            Timeout: timeout,
        },
        timeout: timeout,
    }
}

// Check 执行健康检查
func (h *HTTPHealthChecker) Check(ctx context.Context, address string, port int, path string) error {
    url := fmt.Sprintf("http://%s:%d%s", address, port, path)

    req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
    if err != nil {
        return err
    }

    resp, err := h.client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    if resp.StatusCode < 200 || resp.StatusCode >= 300 {
        return fmt.Errorf("health check failed with status: %d", resp.StatusCode)
    }

    return nil
}

TCP 健康检查 #

// health/tcp_checker.go
package health

import (
    "context"
    "fmt"
    "net"
    "time"
)

// TCPHealthChecker TCP 健康检查器
type TCPHealthChecker struct {
    timeout time.Duration
}

// NewTCPHealthChecker 创建 TCP 健康检查器
func NewTCPHealthChecker(timeout time.Duration) *TCPHealthChecker {
    return &TCPHealthChecker{
        timeout: timeout,
    }
}

// Check 执行 TCP 连接检查
func (t *TCPHealthChecker) Check(ctx context.Context, address string, port int) error {
    dialer := &net.Dialer{
        Timeout: t.timeout,
    }

    conn, err := dialer.DialContext(ctx, "tcp", fmt.Sprintf("%s:%d", address, port))
    if err != nil {
        return err
    }
    defer conn.Close()

    return nil
}

健康检查管理器 #

// health/manager.go
package health

import (
    "context"
    "log"
    "sync"
    "time"

    "github.com/example/service-discovery/registry"
)

// HealthCheckManager 健康检查管理器
type HealthCheckManager struct {
    registry     registry.ServiceRegistry
    httpChecker  *HTTPHealthChecker
    tcpChecker   *TCPHealthChecker

    checkInterval time.Duration

    // 控制协程
    ctx    context.Context
    cancel context.CancelFunc
    wg     sync.WaitGroup
}

// NewHealthCheckManager 创建健康检查管理器
func NewHealthCheckManager(reg registry.ServiceRegistry) *HealthCheckManager {
    ctx, cancel := context.WithCancel(context.Background())

    return &HealthCheckManager{
        registry:      reg,
        httpChecker:   NewHTTPHealthChecker(time.Second * 5),
        tcpChecker:    NewTCPHealthChecker(time.Second * 3),
        checkInterval: time.Second * 10,
        ctx:           ctx,
        cancel:        cancel,
    }
}

// Start 启动健康检查
func (h *HealthCheckManager) Start() {
    h.wg.Add(1)
    go h.runHealthChecks()
}

// Stop 停止健康检查
func (h *HealthCheckManager) Stop() {
    h.cancel()
    h.wg.Wait()
}

// runHealthChecks 运行健康检查
func (h *HealthCheckManager) runHealthChecks() {
    defer h.wg.Done()

    ticker := time.NewTicker(h.checkInterval)
    defer ticker.Stop()

    for {
        select {
        case <-h.ctx.Done():
            return
        case <-ticker.C:
            h.performHealthChecks()
        }
    }
}

// performHealthChecks 执行健康检查
func (h *HealthCheckManager) performHealthChecks() {
    // 这里需要获取所有服务实例并进行健康检查
    // 由于我们的简单实现没有提供获取所有实例的接口,
    // 在实际实现中需要扩展注册中心接口

    // 示例:检查特定服务的健康状态
    instances, err := h.registry.Discover(h.ctx, "example-service")
    if err != nil {
        log.Printf("Failed to discover services: %v", err)
        return
    }

    for _, instance := range instances {
        go h.checkInstance(instance)
    }
}

// checkInstance 检查单个实例
func (h *HealthCheckManager) checkInstance(instance *registry.ServiceInstance) {
    // 根据元数据决定使用哪种健康检查
    checkType := instance.Metadata["health_check_type"]

    var err error
    switch checkType {
    case "http":
        path := instance.Metadata["health_check_path"]
        if path == "" {
            path = "/health"
        }
        err = h.httpChecker.Check(h.ctx, instance.Address, instance.Port, path)
    case "tcp":
        err = h.tcpChecker.Check(h.ctx, instance.Address, instance.Port)
    default:
        // 默认使用 TCP 检查
        err = h.tcpChecker.Check(h.ctx, instance.Address, instance.Port)
    }

    if err != nil {
        log.Printf("Health check failed for %s: %v", instance.ID, err)
        // 在实际实现中,这里应该更新实例的健康状态
    }
}

服务发现客户端 #

服务发现客户端实现 #

// discovery/client.go
package discovery

import (
    "context"
    "fmt"
    "log"
    "math/rand"
    "sync"
    "time"

    "github.com/example/service-discovery/registry"
)

// DiscoveryClient 服务发现客户端
type DiscoveryClient struct {
    registry registry.ServiceRegistry

    // 缓存
    cache map[string][]*registry.ServiceInstance
    mutex sync.RWMutex

    // 监听器
    watchers map[string]context.CancelFunc

    // 配置
    cacheTimeout time.Duration
}

// NewDiscoveryClient 创建服务发现客户端
func NewDiscoveryClient(reg registry.ServiceRegistry) *DiscoveryClient {
    return &DiscoveryClient{
        registry:     reg,
        cache:        make(map[string][]*registry.ServiceInstance),
        watchers:     make(map[string]context.CancelFunc),
        cacheTimeout: time.Minute * 5,
    }
}

// GetService 获取服务实例
func (d *DiscoveryClient) GetService(serviceName string) ([]*registry.ServiceInstance, error) {
    // 先检查缓存
    d.mutex.RLock()
    if instances, exists := d.cache[serviceName]; exists {
        d.mutex.RUnlock()
        return instances, nil
    }
    d.mutex.RUnlock()

    // 从注册中心获取
    instances, err := d.registry.Discover(context.Background(), serviceName)
    if err != nil {
        return nil, err
    }

    // 更新缓存
    d.mutex.Lock()
    d.cache[serviceName] = instances
    d.mutex.Unlock()

    // 启动监听器(如果还没有)
    d.startWatcher(serviceName)

    return instances, nil
}

// GetRandomInstance 获取随机服务实例
func (d *DiscoveryClient) GetRandomInstance(serviceName string) (*registry.ServiceInstance, error) {
    instances, err := d.GetService(serviceName)
    if err != nil {
        return nil, err
    }

    if len(instances) == 0 {
        return nil, fmt.Errorf("no available instances for service: %s", serviceName)
    }

    // 随机选择一个实例
    index := rand.Intn(len(instances))
    return instances[index], nil
}

// startWatcher 启动服务监听器
func (d *DiscoveryClient) startWatcher(serviceName string) {
    d.mutex.Lock()
    defer d.mutex.Unlock()

    // 如果已经有监听器,直接返回
    if _, exists := d.watchers[serviceName]; exists {
        return
    }

    ctx, cancel := context.WithCancel(context.Background())
    d.watchers[serviceName] = cancel

    go func() {
        defer cancel()

        ch, err := d.registry.Watch(ctx, serviceName)
        if err != nil {
            log.Printf("Failed to watch service %s: %v", serviceName, err)
            return
        }

        for {
            select {
            case <-ctx.Done():
                return
            case instances := <-ch:
                // 更新缓存
                d.mutex.Lock()
                d.cache[serviceName] = instances
                d.mutex.Unlock()

                log.Printf("Service %s instances updated: %d instances", serviceName, len(instances))
            }
        }
    }()
}

// Stop 停止客户端
func (d *DiscoveryClient) Stop() {
    d.mutex.Lock()
    defer d.mutex.Unlock()

    // 停止所有监听器
    for _, cancel := range d.watchers {
        cancel()
    }

    d.watchers = make(map[string]context.CancelFunc)
    d.cache = make(map[string][]*registry.ServiceInstance)
}

实际应用示例 #

HTTP 服务示例 #

// example/http_service.go
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "os"
    "strconv"
    "time"

    "github.com/example/service-discovery/client"
    "github.com/example/service-discovery/discovery"
    "github.com/example/service-discovery/registry"
)

func main() {
    // 从环境变量获取配置
    serviceName := getEnv("SERVICE_NAME", "example-service")
    port, _ := strconv.Atoi(getEnv("PORT", "8080"))
    address := getEnv("ADDRESS", "localhost")

    // 创建注册中心
    reg := registry.NewInMemoryRegistry()

    // 创建注册客户端
    regClient := client.NewRegistryClient(reg, serviceName, address, port)
    regClient.SetMetadata("version", "1.0.0")
    regClient.SetMetadata("health_check_type", "http")
    regClient.SetMetadata("health_check_path", "/health")

    // 创建发现客户端
    discoveryClient := discovery.NewDiscoveryClient(reg)

    // 创建 HTTP 服务器
    mux := http.NewServeMux()

    // 健康检查端点
    mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusOK)
        json.NewEncoder(w).Encode(map[string]string{
            "status": "healthy",
            "time":   time.Now().Format(time.RFC3339),
        })
    })

    // 服务信息端点
    mux.HandleFunc("/info", func(w http.ResponseWriter, r *http.Request) {
        hostname, _ := os.Hostname()
        json.NewEncoder(w).Encode(map[string]interface{}{
            "service":  serviceName,
            "hostname": hostname,
            "address":  address,
            "port":     port,
            "time":     time.Now().Format(time.RFC3339),
        })
    })

    // 调用其他服务的端点
    mux.HandleFunc("/call", func(w http.ResponseWriter, r *http.Request) {
        targetService := r.URL.Query().Get("service")
        if targetService == "" {
            http.Error(w, "service parameter required", http.StatusBadRequest)
            return
        }

        // 发现目标服务
        instance, err := discoveryClient.GetRandomInstance(targetService)
        if err != nil {
            http.Error(w, fmt.Sprintf("Service discovery failed: %v", err), http.StatusServiceUnavailable)
            return
        }

        // 调用目标服务
        url := fmt.Sprintf("http://%s:%d/info", instance.Address, instance.Port)
        resp, err := http.Get(url)
        if err != nil {
            http.Error(w, fmt.Sprintf("Service call failed: %v", err), http.StatusServiceUnavailable)
            return
        }
        defer resp.Body.Close()

        // 返回响应
        w.Header().Set("Content-Type", "application/json")
        w.WriteHeader(resp.StatusCode)

        var result map[string]interface{}
        json.NewDecoder(resp.Body).Decode(&result)
        json.NewEncoder(w).Encode(map[string]interface{}{
            "caller": map[string]interface{}{
                "service": serviceName,
                "address": address,
                "port":    port,
            },
            "target": result,
        })
    })

    // 启动注册客户端
    if err := regClient.Start(); err != nil {
        log.Fatal("Failed to start registry client:", err)
    }
    defer regClient.Stop()

    // 启动 HTTP 服务器
    server := &http.Server{
        Addr:    fmt.Sprintf(":%d", port),
        Handler: mux,
    }

    log.Printf("Starting HTTP server on port %d", port)
    log.Fatal(server.ListenAndServe())
}

func getEnv(key, defaultValue string) string {
    if value := os.Getenv(key); value != "" {
        return value
    }
    return defaultValue
}

使用示例 #

# 启动第一个服务实例
SERVICE_NAME=user-service PORT=8080 go run example/http_service.go

# 启动第二个服务实例
SERVICE_NAME=user-service PORT=8081 go run example/http_service.go

# 启动另一个服务
SERVICE_NAME=order-service PORT=8082 go run example/http_service.go

# 测试服务发现
curl http://localhost:8080/info
curl http://localhost:8080/call?service=order-service

监控和指标 #

// metrics/registry_metrics.go
package metrics

import (
    "sync"
    "time"
)

// RegistryMetrics 注册中心指标
type RegistryMetrics struct {
    // 服务统计
    TotalServices   int64
    TotalInstances  int64
    HealthyInstances int64

    // 操作统计
    RegisterCount   int64
    DeregisterCount int64
    DiscoverCount   int64
    HeartbeatCount  int64

    // 性能指标
    AvgResponseTime time.Duration

    mutex sync.RWMutex
}

// NewRegistryMetrics 创建指标收集器
func NewRegistryMetrics() *RegistryMetrics {
    return &RegistryMetrics{}
}

// IncrementRegister 增加注册计数
func (m *RegistryMetrics) IncrementRegister() {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    m.RegisterCount++
}

// IncrementDeregister 增加注销计数
func (m *RegistryMetrics) IncrementDeregister() {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    m.DeregisterCount++
}

// IncrementDiscover 增加发现计数
func (m *RegistryMetrics) IncrementDiscover() {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    m.DiscoverCount++
}

// IncrementHeartbeat 增加心跳计数
func (m *RegistryMetrics) IncrementHeartbeat() {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    m.HeartbeatCount++
}

// UpdateServiceStats 更新服务统计
func (m *RegistryMetrics) UpdateServiceStats(services, instances, healthy int64) {
    m.mutex.Lock()
    defer m.mutex.Unlock()

    m.TotalServices = services
    m.TotalInstances = instances
    m.HealthyInstances = healthy
}

// GetMetrics 获取指标快照
func (m *RegistryMetrics) GetMetrics() map[string]interface{} {
    m.mutex.RLock()
    defer m.mutex.RUnlock()

    return map[string]interface{}{
        "total_services":    m.TotalServices,
        "total_instances":   m.TotalInstances,
        "healthy_instances": m.HealthyInstances,
        "register_count":    m.RegisterCount,
        "deregister_count":  m.DeregisterCount,
        "discover_count":    m.DiscoverCount,
        "heartbeat_count":   m.HeartbeatCount,
        "avg_response_time": m.AvgResponseTime.Milliseconds(),
    }
}

通过实现这个完整的服务注册与发现系统,我们为微服务架构提供了基础的服务管理能力。这个系统支持服务的动态注册和发现、健康检查、以及基本的监控功能,为后续的负载均衡实现奠定了基础。