5.4.4 服务端负载均衡

5.4.4 服务端负载均衡 #

服务端负载均衡是一种将负载均衡逻辑集中在专门的负载均衡器或代理服务器上的架构模式。客户端只需要知道负载均衡器的地址,由负载均衡器负责将请求分发到后端的服务实例。这种模式在传统架构中应用广泛,在微服务架构中也有重要作用。

服务端负载均衡架构 #

架构模式 #

┌─────────────┐    ┌─────────────────┐    ┌─────────────┐
│   Client    │───▶│  Load Balancer  │───▶│  Service    │
│             │    │   (Proxy)       │    │  Instance 1 │
└─────────────┘    │                 │    └─────────────┘
                   │                 │    ┌─────────────┐
                   │                 │───▶│  Service    │
                   │                 │    │  Instance 2 │
                   │                 │    └─────────────┘
                   │                 │    ┌─────────────┐
                   │                 │───▶│  Service    │
                   └─────────────────┘    │  Instance N │
                                          └─────────────┘

核心组件 #

  1. 负载均衡器:接收客户端请求并分发到后端服务
  2. 健康检查:监控后端服务的健康状态
  3. 服务发现:动态发现和管理后端服务实例
  4. 配置管理:管理负载均衡策略和规则

反向代理负载均衡器实现 #

核心代理服务器 #

// proxy/server.go
package proxy

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "net/http/httputil"
    "net/url"
    "sync"
    "time"

    "github.com/example/server-lb/balancer"
    "github.com/example/server-lb/discovery"
    "github.com/example/server-lb/health"
)

// ProxyServer 代理服务器
type ProxyServer struct {
    // 核心组件
    loadBalancer    balancer.LoadBalancer
    discoveryClient discovery.ServiceDiscovery
    healthChecker   health.HealthChecker

    // 服务映射
    services map[string]*ServiceConfig
    mutex    sync.RWMutex

    // HTTP服务器
    server *http.Server

    // 配置
    config ProxyConfig

    // 控制
    ctx    context.Context
    cancel context.CancelFunc
}

// ProxyConfig 代理配置
type ProxyConfig struct {
    ListenAddr          string
    HealthCheckInterval time.Duration
    DiscoveryInterval   time.Duration
    RequestTimeout      time.Duration
    MaxIdleConns        int
    IdleConnTimeout     time.Duration
}

// ServiceConfig 服务配置
type ServiceConfig struct {
    Name         string
    PathPrefix   string
    BalancerType balancer.BalancerType
    HealthCheck  HealthCheckConfig
    Instances    []*balancer.Instance

    // 运行时状态
    proxy *httputil.ReverseProxy
}

// HealthCheckConfig 健康检查配置
type HealthCheckConfig struct {
    Enabled  bool
    Path     string
    Interval time.Duration
    Timeout  time.Duration
}

// NewProxyServer 创建代理服务器 func NewProxyServer( discoveryClient discovery.ServiceDiscovery, config ProxyConfig, ) *ProxyServer { ctx, cancel := context.WithCancel(context.Background())

// 创建负载均衡器
factory := balancer.NewBalancerFactory()
lb, _ := factory.CreateBalancer(balancer.BalancerConfig{
    Type: balancer.RoundRobin,
})

// 创建健康检查器
healthChecker := health.NewHTTPHealthChecker(time.Second * 5)

proxy := &ProxyServer{
    loadBalancer:    lb,
    discoveryClient: discoveryClient,
    healthChecker:   healthChecker,
    services:        make(map[string]*ServiceConfig),
    config:          config,
    ctx:             ctx,
    cancel:          cancel,
}

// 创建HTTP服务器
mux := http.NewServeMux()
mux.HandleFunc("/", proxy.handleRequest)
mux.HandleFunc("/health", proxy.handleHealth)
mux.HandleFunc("/stats", proxy.handleStats)

proxy.server = &http.Server{
    Addr:    config.ListenAddr,
    Handler: mux,
}

return proxy

}

// Start 启动代理服务器 func (p *ProxyServer) Start() error { // 启动后台任务 go p.startDiscovery() go p.startHealthCheck()

log.Printf("Proxy server starting on %s", p.config.ListenAddr)
return p.server.ListenAndServe()

}

// Stop 停止代理服务器 func (p *ProxyServer) Stop() error { p.cancel()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

return p.server.Shutdown(ctx)

}

// handleRequest 处理请求 func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) { // 根据路径前缀匹配服务 service := p.matchService(r.URL.Path) if service == nil { http.Error(w, “Service not found”, http.StatusNotFound) return }

// 选择后端实例
instance, err := p.selectInstance(service)
if err != nil {
    http.Error(w, "No available instances", http.StatusServiceUnavailable)
    return
}

// 创建反向代理
if service.proxy == nil {
    service.proxy = p.createReverseProxy(service, instance)
}

// 代理请求
service.proxy.ServeHTTP(w, r)

}

// matchService 匹配服务 func (p *ProxyServer) matchService(path string) *ServiceConfig { p.mutex.RLock() defer p.mutex.RUnlock()

var bestMatch *ServiceConfig
maxLen := 0

for _, service := range p.services {
    if len(service.PathPrefix) > maxLen &&
       strings.HasPrefix(path, service.PathPrefix) {
        bestMatch = service
        maxLen = len(service.PathPrefix)
    }
}

return bestMatch

}

// selectInstance 选择实例 func (p *ProxyServer) selectInstance(service *ServiceConfig) (*balancer.Instance, error) { // 更新负载均衡器实例列表 p.loadBalancer.UpdateInstances(service.Instances)

// 选择实例
return p.loadBalancer.Select(context.Background())

}

// createReverseProxy 创建反向代理 func (p *ProxyServer) createReverseProxy(service *ServiceConfig, instance *balancer.Instance) *httputil.ReverseProxy { target := &url.URL{ Scheme: “http”, Host: fmt.Sprintf("%s:%d", instance.Address, instance.Port), }

proxy := httputil.NewSingleHostReverseProxy(target)

// 自定义Director
originalDirector := proxy.Director
proxy.Director = func(req *http.Request) {
    originalDirector(req)

    // 移除路径前缀
    if service.PathPrefix != "/" {
        req.URL.Path = strings.TrimPrefix(req.URL.Path, service.PathPrefix)
        if req.URL.Path == "" {
            req.URL.Path = "/"
        }
    }

    // 设置请求头
    req.Header.Set("X-Forwarded-Host", req.Header.Get("Host"))
    req.Header.Set("X-Forwarded-Proto", "http")
    req.Header.Set("X-Real-IP", req.RemoteAddr)
}

// 自定义错误处理
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
    log.Printf("Proxy error: %v", err)

    // 标记实例失败
    p.loadBalancer.MarkFailure(instance.ID)

    http.Error(w, "Bad Gateway", http.StatusBadGateway)
}

return proxy

}

API 网关实现 #

网关核心功能 #

// gateway/gateway.go
package gateway

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "strings"
    "sync"
    "time"

    "github.com/example/server-lb/auth"
    "github.com/example/server-lb/ratelimit"
    "github.com/example/server-lb/proxy"
)

// APIGateway API网关
type APIGateway struct {
    // 核心组件
    proxyServer   *proxy.ProxyServer
    authenticator auth.Authenticator
    rateLimiter   ratelimit.RateLimiter

    // 路由配置
    routes map[string]*RouteConfig
    mutex  sync.RWMutex

    // 中间件
    middlewares []Middleware

    // 配置
    config GatewayConfig
}

// GatewayConfig 网关配置
type GatewayConfig struct {
    ListenAddr      string
    EnableAuth      bool
    EnableRateLimit bool
    CorsEnabled     bool
    CorsOrigins     []string
}

// RouteConfig 路由配置
type RouteConfig struct {
    Path        string
    Methods     []string
    ServiceName string
    AuthRequired bool
    RateLimit   *RateLimitConfig
    Timeout     time.Duration
    Retries     int
}

// RateLimitConfig 限流配置
type RateLimitConfig struct {
    RequestsPerSecond int
    BurstSize         int
}

// Middleware 中间件接口
type Middleware interface {
    Process(next http.HandlerFunc) http.HandlerFunc
}

// NewAPIGateway 创建API网关
func NewAPIGateway(
    proxyServer *proxy.ProxyServer,
    config GatewayConfig,
) *APIGateway {
    gateway := &APIGateway{
        proxyServer: proxyServer,
        routes:      make(map[string]*RouteConfig),
        config:      config,
    }

    // 初始化组件
    if config.EnableAuth {
        gateway.authenticator = auth.NewJWTAuthenticator()
    }

    if config.EnableRateLimit {
        gateway.rateLimiter = ratelimit.NewTokenBucketLimiter()
    }

    // 添加默认中间件
    gateway.addDefaultMiddlewares()

    return gateway
}

// addDefaultMiddlewares 添加默认中间件 func (g *APIGateway) addDefaultMiddlewares() { // CORS 中间件 if g.config.CorsEnabled { g.middlewares = append(g.middlewares, &CORSMiddleware{ Origins: g.config.CorsOrigins, }) }

// 日志中间件
g.middlewares = append(g.middlewares, &LoggingMiddleware{})

// 认证中间件
if g.config.EnableAuth {
    g.middlewares = append(g.middlewares, &AuthMiddleware{
        authenticator: g.authenticator,
    })
}

// 限流中间件
if g.config.EnableRateLimit {
    g.middlewares = append(g.middlewares, &RateLimitMiddleware{
        rateLimiter: g.rateLimiter,
    })
}

}

// AddRoute 添加路由 func (g *APIGateway) AddRoute(route *RouteConfig) { g.mutex.Lock() defer g.mutex.Unlock()

g.routes[route.Path] = route

}

// handleGatewayRequest 处理网关请求 func (g *APIGateway) handleGatewayRequest(w http.ResponseWriter, r *http.Request) { // 匹配路由 route := g.matchRoute(r.URL.Path, r.Method) if route == nil { http.Error(w, “Route not found”, http.StatusNotFound) return }

// 构建中间件链
handler := g.buildMiddlewareChain(route, g.handleProxyRequest)

// 执行请求
handler(w, r)

}

// matchRoute 匹配路由 func (g *APIGateway) matchRoute(path, method string) *RouteConfig { g.mutex.RLock() defer g.mutex.RUnlock()

for routePath, route := range g.routes {
    if g.pathMatches(path, routePath) && g.methodMatches(method, route.Methods) {
        return route
    }
}

return nil

}

// buildMiddlewareChain 构建中间件链 func (g *APIGateway) buildMiddlewareChain(route *RouteConfig, final http.HandlerFunc) http.HandlerFunc { handler := final

// 从后往前构建中间件链
for i := len(g.middlewares) - 1; i >= 0; i-- {
    handler = g.middlewares[i].Process(handler)
}

return handler

}

// handleProxyRequest 处理代理请求 func (g *APIGateway) handleProxyRequest(w http.ResponseWriter, r *http.Request) { // 转发到代理服务器 g.proxyServer.ServeHTTP(w, r) }

中间件实现 #

CORS 中间件 #

// middleware/cors.go
package middleware

import (
    "net/http"
    "strings"
)

// CORSMiddleware CORS中间件
type CORSMiddleware struct {
    Origins []string
}

// Process 处理CORS
func (c *CORSMiddleware) Process(next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        origin := r.Header.Get("Origin")

        // 检查Origin是否允许
        if c.isOriginAllowed(origin) {
            w.Header().Set("Access-Control-Allow-Origin", origin)
        }

        w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
        w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
        w.Header().Set("Access-Control-Max-Age", "86400")

        // 处理预检请求
        if r.Method == "OPTIONS" {
            w.WriteHeader(http.StatusOK)
            return
        }

        next(w, r)
    }
}

// isOriginAllowed 检查Origin是否允许
func (c *CORSMiddleware) isOriginAllowed(origin string) bool {
    if len(c.Origins) == 0 {
        return true // 允许所有Origin
    }

    for _, allowed := range c.Origins {
        if allowed == "*" || allowed == origin {
            return true
        }
    }

    return false
}

认证中间件 #

// middleware/auth.go
package middleware

import (
    "context"
    "net/http"
    "strings"

    "github.com/example/server-lb/auth"
)

// AuthMiddleware 认证中间件
type AuthMiddleware struct {
    authenticator auth.Authenticator
}

// Process 处理认证
func (a *AuthMiddleware) Process(next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        // 获取Authorization头
        authHeader := r.Header.Get("Authorization")
        if authHeader == "" {
            http.Error(w, "Missing authorization header", http.StatusUnauthorized)
            return
        }

        // 解析Bearer token
        parts := strings.SplitN(authHeader, " ", 2)
        if len(parts) != 2 || parts[0] != "Bearer" {
            http.Error(w, "Invalid authorization header", http.StatusUnauthorized)
            return
        }

        token := parts[1]

        // 验证token
        claims, err := a.authenticator.ValidateToken(token)
        if err != nil {
            http.Error(w, "Invalid token", http.StatusUnauthorized)
            return
        }

        // 将用户信息添加到上下文
        ctx := context.WithValue(r.Context(), "user", claims)
        r = r.WithContext(ctx)

        next(w, r)
    }
}

限流中间件 #

// middleware/ratelimit.go
package middleware

import (
    "net/http"
    "strconv"

    "github.com/example/server-lb/ratelimit"
)

// RateLimitMiddleware 限流中间件
type RateLimitMiddleware struct {
    rateLimiter ratelimit.RateLimiter
}

// Process 处理限流
func (r *RateLimitMiddleware) Process(next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, req *http.Request) {
        // 获取客户端IP
        clientIP := r.getClientIP(req)

        // 检查限流
        allowed, remaining, resetTime := r.rateLimiter.Allow(clientIP)

        // 设置限流响应头
        w.Header().Set("X-RateLimit-Remaining", strconv.Itoa(remaining))
        w.Header().Set("X-RateLimit-Reset", strconv.FormatInt(resetTime, 10))

        if !allowed {
            w.Header().Set("Retry-After", strconv.FormatInt(resetTime, 10))
            http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
            return
        }

        next(w, req)
    }
}

// getClientIP 获取客户端IP
func (r *RateLimitMiddleware) getClientIP(req *http.Request) string {
    // 检查X-Forwarded-For头
    if xff := req.Header.Get("X-Forwarded-For"); xff != "" {
        return strings.Split(xff, ",")[0]
    }

    // 检查X-Real-IP头
    if xri := req.Header.Get("X-Real-IP"); xri != "" {
        return xri
    }

    // 使用RemoteAddr
    return strings.Split(req.RemoteAddr, ":")[0]
}

流量控制与限流 #

令牌桶限流器 #

// ratelimit/token_bucket.go
package ratelimit

import (
    "sync"
    "time"
)

// TokenBucketLimiter 令牌桶限流器
type TokenBucketLimiter struct {
    buckets map[string]*TokenBucket
    mutex   sync.RWMutex

    // 配置
    capacity     int           // 桶容量
    refillRate   int           // 令牌补充速率(每秒)
    refillPeriod time.Duration // 补充周期
}

// TokenBucket 令牌桶
type TokenBucket struct {
    capacity     int
    tokens       int
    refillRate   int
    lastRefill   time.Time
    mutex        sync.Mutex
}

// NewTokenBucketLimiter 创建令牌桶限流器
func NewTokenBucketLimiter(capacity, refillRate int) *TokenBucketLimiter {
    limiter := &TokenBucketLimiter{
        buckets:      make(map[string]*TokenBucket),
        capacity:     capacity,
        refillRate:   refillRate,
        refillPeriod: time.Second,
    }

    // 启动令牌补充协程
    go limiter.startRefillRoutine()

    return limiter
}

// Allow 检查是否允许请求
func (t *TokenBucketLimiter) Allow(key string) (bool, int, int64) {
    bucket := t.getBucket(key)

    bucket.mutex.Lock()
    defer bucket.mutex.Unlock()

    // 补充令牌
    t.refillBucket(bucket)

    // 检查是否有可用令牌
    if bucket.tokens > 0 {
        bucket.tokens--
        return true, bucket.tokens, time.Now().Add(time.Second).Unix()
    }

    return false, 0, time.Now().Add(time.Second).Unix()
}

// getBucket 获取或创建令牌桶
func (t *TokenBucketLimiter) getBucket(key string) *TokenBucket {
    t.mutex.RLock()
    bucket, exists := t.buckets[key]
    t.mutex.RUnlock()

    if exists {
        return bucket
    }

    t.mutex.Lock()
    defer t.mutex.Unlock()

    // 双重检查
    if bucket, exists := t.buckets[key]; exists {
        return bucket
    }

    // 创建新的令牌桶
    bucket = &TokenBucket{
        capacity:   t.capacity,
        tokens:     t.capacity,
        refillRate: t.refillRate,
        lastRefill: time.Now(),
    }

    t.buckets[key] = bucket
    return bucket
}

// refillBucket 补充令牌
func (t *TokenBucketLimiter) refillBucket(bucket *TokenBucket) {
    now := time.Now()
    elapsed := now.Sub(bucket.lastRefill)

    if elapsed >= t.refillPeriod {
        tokensToAdd := int(elapsed/t.refillPeriod) * bucket.refillRate
        bucket.tokens = min(bucket.capacity, bucket.tokens+tokensToAdd)
        bucket.lastRefill = now
    }
}

// startRefillRoutine 启动令牌补充协程
func (t *TokenBucketLimiter) startRefillRoutine() {
    ticker := time.NewTicker(t.refillPeriod)
    defer ticker.Stop()

    for range ticker.C {
        t.mutex.RLock()
        for _, bucket := range t.buckets {
            bucket.mutex.Lock()
            t.refillBucket(bucket)
            bucket.mutex.Unlock()
        }
        t.mutex.RUnlock()
    }
}

func min(a, b int) int {
    if a < b {
        return a
    }
    return b
}

高可用架构设计 #

主备模式实现 #

// ha/master_slave.go
package ha

import (
    "context"
    "log"
    "sync"
    "time"

    "github.com/example/server-lb/discovery"
)

// HAManager 高可用管理器
type HAManager struct {
    // 状态
    isMaster bool
    nodeID   string

    // 组件
    discoveryClient discovery.ServiceDiscovery

    // 配置
    config HAConfig

    // 控制
    ctx    context.Context
    cancel context.CancelFunc
    mutex  sync.RWMutex

    // 回调
    onBecameMaster func()
    onBecameSlave  func()
}

// HAConfig 高可用配置
type HAConfig struct {
    NodeID           string
    ElectionPath     string
    HeartbeatInterval time.Duration
    ElectionTimeout   time.Duration
}

// NewHAManager 创建高可用管理器
func NewHAManager(
    discoveryClient discovery.ServiceDiscovery,
    config HAConfig,
) *HAManager {
    ctx, cancel := context.WithCancel(context.Background())

    return &HAManager{
        nodeID:          config.NodeID,
        discoveryClient: discoveryClient,
        config:          config,
        ctx:             ctx,
        cancel:          cancel,
    }
}

// Start 启动高可用管理
func (h *HAManager) Start() error {
    go h.startElection()
    go h.startHeartbeat()

    return nil
}

// Stop 停止高可用管理
func (h *HAManager) Stop() {
    h.cancel()
}

// IsMaster 检查是否为主节点
func (h *HAManager) IsMaster() bool {
    h.mutex.RLock()
    defer h.mutex.RUnlock()
    return h.isMaster
}

// SetCallbacks 设置回调函数
func (h *HAManager) SetCallbacks(onMaster, onSlave func()) {
    h.onBecameMaster = onMaster
    h.onBecameSlave = onSlave
}

// startElection 启动选举
func (h *HAManager) startElection() {
    ticker := time.NewTicker(h.config.ElectionTimeout)
    defer ticker.Stop()

    for {
        select {
        case <-h.ctx.Done():
            return
        case <-ticker.C:
            h.performElection()
        }
    }
}

// performElection 执行选举
func (h *HAManager) performElection() {
    // 尝试获取领导权
    success := h.tryAcquireLeadership()

    h.mutex.Lock()
    wasMaster := h.isMaster
    h.isMaster = success
    h.mutex.Unlock()

    // 状态变化回调
    if success && !wasMaster {
        log.Printf("Node %s became master", h.nodeID)
        if h.onBecameMaster != nil {
            h.onBecameMaster()
        }
    } else if !success && wasMaster {
        log.Printf("Node %s became slave", h.nodeID)
        if h.onBecameSlave != nil {
            h.onBecameSlave()
        }
    }
}

// tryAcquireLeadership 尝试获取领导权
func (h *HAManager) tryAcquireLeadership() bool {
    // 这里简化实现,实际应该使用分布式锁
    // 比如基于etcd的选举机制
    return true // 简化返回
}

// startHeartbeat 启动心跳
func (h *HAManager) startHeartbeat() {
    ticker := time.NewTicker(h.config.HeartbeatInterval)
    defer ticker.Stop()

    for {
        select {
        case <-h.ctx.Done():
            return
        case <-ticker.C:
            if h.IsMaster() {
                h.sendHeartbeat()
            }
        }
    }
}

// sendHeartbeat 发送心跳
func (h *HAManager) sendHeartbeat() {
    // 发送心跳到服务发现系统
    // 实际实现中应该更新节点状态
    log.Printf("Master node %s sending heartbeat", h.nodeID)
}

完整应用示例 #

服务端负载均衡器启动 #

// main.go
package main

import (
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/example/server-lb/discovery"
    "github.com/example/server-lb/gateway"
    "github.com/example/server-lb/ha"
    "github.com/example/server-lb/proxy"
)

func main() {
    // 创建服务发现客户端
    discoveryClient := discovery.NewConsulDiscovery("localhost:8500")

    // 创建代理服务器配置
    proxyConfig := proxy.ProxyConfig{
        ListenAddr:          ":8080",
        HealthCheckInterval: time.Second * 10,
        DiscoveryInterval:   time.Second * 30,
        RequestTimeout:      time.Second * 30,
        MaxIdleConns:        100,
        IdleConnTimeout:     time.Second * 90,
    }

    // 创建代理服务器
    proxyServer := proxy.NewProxyServer(discoveryClient, proxyConfig)

    // 配置服务路由
    proxyServer.AddService(&proxy.ServiceConfig{
        Name:         "user-service",
        PathPrefix:   "/api/users",
        BalancerType: "weighted_round_robin",
        HealthCheck: proxy.HealthCheckConfig{
            Enabled:  true,
            Path:     "/health",
            Interval: time.Second * 10,
            Timeout:  time.Second * 5,
        },
    })

    proxyServer.AddService(&proxy.ServiceConfig{
        Name:         "order-service",
        PathPrefix:   "/api/orders",
        BalancerType: "least_connections",
        HealthCheck: proxy.HealthCheckConfig{
            Enabled:  true,
            Path:     "/health",
            Interval: time.Second * 10,
            Timeout:  time.Second * 5,
        },
    })

    // 创建API网关配置
    gatewayConfig := gateway.GatewayConfig{
        ListenAddr:      ":8081",
        EnableAuth:      true,
        EnableRateLimit: true,
        CorsEnabled:     true,
        CorsOrigins:     []string{"*"},
    }

    // 创建API网关
    apiGateway := gateway.NewAPIGateway(proxyServer, gatewayConfig)

    // 配置网关路由
    apiGateway.AddRoute(&gateway.RouteConfig{
        Path:         "/api/users",
        Methods:      []string{"GET", "POST", "PUT", "DELETE"},
        ServiceName:  "user-service",
        AuthRequired: true,
        RateLimit: &gateway.RateLimitConfig{
            RequestsPerSecond: 100,
            BurstSize:         10,
        },
        Timeout: time.Second * 30,
        Retries: 3,
    })

    // 创建高可用管理器
    haConfig := ha.HAConfig{
        NodeID:            "lb-node-1",
        ElectionPath:      "/lb/election",
        HeartbeatInterval: time.Second * 5,
        ElectionTimeout:   time.Second * 15,
    }

    haManager := ha.NewHAManager(discoveryClient, haConfig)

    // 设置高可用回调
    haManager.SetCallbacks(
        func() {
            log.Println("Became master, starting services...")
            go func() {
                if err := proxyServer.Start(); err != nil {
                    log.Printf("Proxy server error: %v", err)
                }
            }()
            go func() {
                if err := apiGateway.Start(); err != nil {
                    log.Printf("API gateway error: %v", err)
                }
            }()
        },
        func() {
            log.Println("Became slave, stopping services...")
            proxyServer.Stop()
            apiGateway.Stop()
        },
    )

    // 启动高可用管理
    if err := haManager.Start(); err != nil {
        log.Fatal("Failed to start HA manager:", err)
    }

    // 等待退出信号
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    <-sigCh

    log.Println("Shutting down...")

    // 优雅关闭
    haManager.Stop()
    proxyServer.Stop()
    apiGateway.Stop()

    log.Println("Server stopped")
}

配置文件示例 #

# config.yaml
server:
  listen_addr: ":8080"
  request_timeout: "30s"
  max_idle_conns: 100
  idle_conn_timeout: "90s"

services:
  - name: "user-service"
    path_prefix: "/api/users"
    balancer_type: "weighted_round_robin"
    health_check:
      enabled: true
      path: "/health"
      interval: "10s"
      timeout: "5s"

  - name: "order-service"
    path_prefix: "/api/orders"
    balancer_type: "least_connections"
    health_check:
      enabled: true
      path: "/health"
      interval: "10s"
      timeout: "5s"

gateway:
  listen_addr: ":8081"
  enable_auth: true
  enable_rate_limit: true
  cors_enabled: true
  cors_origins: ["*"]

routes:
  - path: "/api/users"
    methods: ["GET", "POST", "PUT", "DELETE"]
    service_name: "user-service"
    auth_required: true
    rate_limit:
      requests_per_second: 100
      burst_size: 10
    timeout: "30s"
    retries: 3

discovery:
  type: "consul"
  address: "localhost:8500"
  interval: "30s"

high_availability:
  node_id: "lb-node-1"
  election_path: "/lb/election"
  heartbeat_interval: "5s"
  election_timeout: "15s"

监控和指标 #

// metrics/collector.go
package metrics

import (
    "sync"
    "time"
)

// MetricsCollector 指标收集器
type MetricsCollector struct {
    // 请求指标
    TotalRequests    int64
    SuccessRequests  int64
    FailedRequests   int64
    AvgResponseTime  time.Duration

    // 服务指标
    ActiveServices   int
    HealthyInstances int
    TotalInstances   int

    // 限流指标
    RateLimitHits    int64
    RateLimitMisses  int64

    mutex sync.RWMutex
}

// NewMetricsCollector 创建指标收集器
func NewMetricsCollector() *MetricsCollector {
    return &MetricsCollector{}
}

// RecordRequest 记录请求
func (m *MetricsCollector) RecordRequest(success bool, responseTime time.Duration) {
    m.mutex.Lock()
    defer m.mutex.Unlock()

    m.TotalRequests++
    if success {
        m.SuccessRequests++
    } else {
        m.FailedRequests++
    }

    // 更新平均响应时间
    if m.AvgResponseTime == 0 {
        m.AvgResponseTime = responseTime
    } else {
        m.AvgResponseTime = (m.AvgResponseTime + responseTime) / 2
    }
}

// RecordRateLimit 记录限流
func (m *MetricsCollector) RecordRateLimit(hit bool) {
    m.mutex.Lock()
    defer m.mutex.Unlock()

    if hit {
        m.RateLimitHits++
    } else {
        m.RateLimitMisses++
    }
}

// GetMetrics 获取指标
func (m *MetricsCollector) GetMetrics() map[string]interface{} {
    m.mutex.RLock()
    defer m.mutex.RUnlock()

    successRate := float64(0)
    if m.TotalRequests > 0 {
        successRate = float64(m.SuccessRequests) / float64(m.TotalRequests) * 100
    }

    return map[string]interface{}{
        "total_requests":     m.TotalRequests,
        "success_requests":   m.SuccessRequests,
        "failed_requests":    m.FailedRequests,
        "success_rate":       successRate,
        "avg_response_time":  m.AvgResponseTime.Milliseconds(),
        "active_services":    m.ActiveServices,
        "healthy_instances":  m.HealthyInstances,
        "total_instances":    m.TotalInstances,
        "rate_limit_hits":    m.RateLimitHits,
        "rate_limit_misses":  m.RateLimitMisses,
    }
}

通过实现这个完整的服务端负载均衡系统,我们为微服务架构提供了集中式的流量管理、安全控制和高可用保障。系统包含了反向代理、API 网关、中间件、限流、高可用等关键功能,能够有效支撑大规模微服务部署的需求。