5.4.4 服务端负载均衡 #
服务端负载均衡是一种将负载均衡逻辑集中在专门的负载均衡器或代理服务器上的架构模式。客户端只需要知道负载均衡器的地址,由负载均衡器负责将请求分发到后端的服务实例。这种模式在传统架构中应用广泛,在微服务架构中也有重要作用。
服务端负载均衡架构 #
架构模式 #
┌─────────────┐ ┌─────────────────┐ ┌─────────────┐
│ Client │───▶│ Load Balancer │───▶│ Service │
│ │ │ (Proxy) │ │ Instance 1 │
└─────────────┘ │ │ └─────────────┘
│ │ ┌─────────────┐
│ │───▶│ Service │
│ │ │ Instance 2 │
│ │ └─────────────┘
│ │ ┌─────────────┐
│ │───▶│ Service │
└─────────────────┘ │ Instance N │
└─────────────┘
核心组件 #
- 负载均衡器:接收客户端请求并分发到后端服务
- 健康检查:监控后端服务的健康状态
- 服务发现:动态发现和管理后端服务实例
- 配置管理:管理负载均衡策略和规则
反向代理负载均衡器实现 #
核心代理服务器 #
// proxy/server.go
package proxy
import (
"context"
"fmt"
"log"
"net/http"
"net/http/httputil"
"net/url"
"sync"
"time"
"github.com/example/server-lb/balancer"
"github.com/example/server-lb/discovery"
"github.com/example/server-lb/health"
)
// ProxyServer 代理服务器
type ProxyServer struct {
// 核心组件
loadBalancer balancer.LoadBalancer
discoveryClient discovery.ServiceDiscovery
healthChecker health.HealthChecker
// 服务映射
services map[string]*ServiceConfig
mutex sync.RWMutex
// HTTP服务器
server *http.Server
// 配置
config ProxyConfig
// 控制
ctx context.Context
cancel context.CancelFunc
}
// ProxyConfig 代理配置
type ProxyConfig struct {
ListenAddr string
HealthCheckInterval time.Duration
DiscoveryInterval time.Duration
RequestTimeout time.Duration
MaxIdleConns int
IdleConnTimeout time.Duration
}
// ServiceConfig 服务配置
type ServiceConfig struct {
Name string
PathPrefix string
BalancerType balancer.BalancerType
HealthCheck HealthCheckConfig
Instances []*balancer.Instance
// 运行时状态
proxy *httputil.ReverseProxy
}
// HealthCheckConfig 健康检查配置
type HealthCheckConfig struct {
Enabled bool
Path string
Interval time.Duration
Timeout time.Duration
}
// NewProxyServer 创建代理服务器 func NewProxyServer( discoveryClient discovery.ServiceDiscovery, config ProxyConfig, ) *ProxyServer { ctx, cancel := context.WithCancel(context.Background())
// 创建负载均衡器
factory := balancer.NewBalancerFactory()
lb, _ := factory.CreateBalancer(balancer.BalancerConfig{
Type: balancer.RoundRobin,
})
// 创建健康检查器
healthChecker := health.NewHTTPHealthChecker(time.Second * 5)
proxy := &ProxyServer{
loadBalancer: lb,
discoveryClient: discoveryClient,
healthChecker: healthChecker,
services: make(map[string]*ServiceConfig),
config: config,
ctx: ctx,
cancel: cancel,
}
// 创建HTTP服务器
mux := http.NewServeMux()
mux.HandleFunc("/", proxy.handleRequest)
mux.HandleFunc("/health", proxy.handleHealth)
mux.HandleFunc("/stats", proxy.handleStats)
proxy.server = &http.Server{
Addr: config.ListenAddr,
Handler: mux,
}
return proxy
}
// Start 启动代理服务器 func (p *ProxyServer) Start() error { // 启动后台任务 go p.startDiscovery() go p.startHealthCheck()
log.Printf("Proxy server starting on %s", p.config.ListenAddr)
return p.server.ListenAndServe()
}
// Stop 停止代理服务器 func (p *ProxyServer) Stop() error { p.cancel()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
return p.server.Shutdown(ctx)
}
// handleRequest 处理请求 func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) { // 根据路径前缀匹配服务 service := p.matchService(r.URL.Path) if service == nil { http.Error(w, “Service not found”, http.StatusNotFound) return }
// 选择后端实例
instance, err := p.selectInstance(service)
if err != nil {
http.Error(w, "No available instances", http.StatusServiceUnavailable)
return
}
// 创建反向代理
if service.proxy == nil {
service.proxy = p.createReverseProxy(service, instance)
}
// 代理请求
service.proxy.ServeHTTP(w, r)
}
// matchService 匹配服务 func (p *ProxyServer) matchService(path string) *ServiceConfig { p.mutex.RLock() defer p.mutex.RUnlock()
var bestMatch *ServiceConfig
maxLen := 0
for _, service := range p.services {
if len(service.PathPrefix) > maxLen &&
strings.HasPrefix(path, service.PathPrefix) {
bestMatch = service
maxLen = len(service.PathPrefix)
}
}
return bestMatch
}
// selectInstance 选择实例 func (p *ProxyServer) selectInstance(service *ServiceConfig) (*balancer.Instance, error) { // 更新负载均衡器实例列表 p.loadBalancer.UpdateInstances(service.Instances)
// 选择实例
return p.loadBalancer.Select(context.Background())
}
// createReverseProxy 创建反向代理 func (p *ProxyServer) createReverseProxy(service *ServiceConfig, instance *balancer.Instance) *httputil.ReverseProxy { target := &url.URL{ Scheme: “http”, Host: fmt.Sprintf("%s:%d", instance.Address, instance.Port), }
proxy := httputil.NewSingleHostReverseProxy(target)
// 自定义Director
originalDirector := proxy.Director
proxy.Director = func(req *http.Request) {
originalDirector(req)
// 移除路径前缀
if service.PathPrefix != "/" {
req.URL.Path = strings.TrimPrefix(req.URL.Path, service.PathPrefix)
if req.URL.Path == "" {
req.URL.Path = "/"
}
}
// 设置请求头
req.Header.Set("X-Forwarded-Host", req.Header.Get("Host"))
req.Header.Set("X-Forwarded-Proto", "http")
req.Header.Set("X-Real-IP", req.RemoteAddr)
}
// 自定义错误处理
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
log.Printf("Proxy error: %v", err)
// 标记实例失败
p.loadBalancer.MarkFailure(instance.ID)
http.Error(w, "Bad Gateway", http.StatusBadGateway)
}
return proxy
}
API 网关实现 #
网关核心功能 #
// gateway/gateway.go
package gateway
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"sync"
"time"
"github.com/example/server-lb/auth"
"github.com/example/server-lb/ratelimit"
"github.com/example/server-lb/proxy"
)
// APIGateway API网关
type APIGateway struct {
// 核心组件
proxyServer *proxy.ProxyServer
authenticator auth.Authenticator
rateLimiter ratelimit.RateLimiter
// 路由配置
routes map[string]*RouteConfig
mutex sync.RWMutex
// 中间件
middlewares []Middleware
// 配置
config GatewayConfig
}
// GatewayConfig 网关配置
type GatewayConfig struct {
ListenAddr string
EnableAuth bool
EnableRateLimit bool
CorsEnabled bool
CorsOrigins []string
}
// RouteConfig 路由配置
type RouteConfig struct {
Path string
Methods []string
ServiceName string
AuthRequired bool
RateLimit *RateLimitConfig
Timeout time.Duration
Retries int
}
// RateLimitConfig 限流配置
type RateLimitConfig struct {
RequestsPerSecond int
BurstSize int
}
// Middleware 中间件接口
type Middleware interface {
Process(next http.HandlerFunc) http.HandlerFunc
}
// NewAPIGateway 创建API网关
func NewAPIGateway(
proxyServer *proxy.ProxyServer,
config GatewayConfig,
) *APIGateway {
gateway := &APIGateway{
proxyServer: proxyServer,
routes: make(map[string]*RouteConfig),
config: config,
}
// 初始化组件
if config.EnableAuth {
gateway.authenticator = auth.NewJWTAuthenticator()
}
if config.EnableRateLimit {
gateway.rateLimiter = ratelimit.NewTokenBucketLimiter()
}
// 添加默认中间件
gateway.addDefaultMiddlewares()
return gateway
}
// addDefaultMiddlewares 添加默认中间件 func (g *APIGateway) addDefaultMiddlewares() { // CORS 中间件 if g.config.CorsEnabled { g.middlewares = append(g.middlewares, &CORSMiddleware{ Origins: g.config.CorsOrigins, }) }
// 日志中间件
g.middlewares = append(g.middlewares, &LoggingMiddleware{})
// 认证中间件
if g.config.EnableAuth {
g.middlewares = append(g.middlewares, &AuthMiddleware{
authenticator: g.authenticator,
})
}
// 限流中间件
if g.config.EnableRateLimit {
g.middlewares = append(g.middlewares, &RateLimitMiddleware{
rateLimiter: g.rateLimiter,
})
}
}
// AddRoute 添加路由 func (g *APIGateway) AddRoute(route *RouteConfig) { g.mutex.Lock() defer g.mutex.Unlock()
g.routes[route.Path] = route
}
// handleGatewayRequest 处理网关请求 func (g *APIGateway) handleGatewayRequest(w http.ResponseWriter, r *http.Request) { // 匹配路由 route := g.matchRoute(r.URL.Path, r.Method) if route == nil { http.Error(w, “Route not found”, http.StatusNotFound) return }
// 构建中间件链
handler := g.buildMiddlewareChain(route, g.handleProxyRequest)
// 执行请求
handler(w, r)
}
// matchRoute 匹配路由 func (g *APIGateway) matchRoute(path, method string) *RouteConfig { g.mutex.RLock() defer g.mutex.RUnlock()
for routePath, route := range g.routes {
if g.pathMatches(path, routePath) && g.methodMatches(method, route.Methods) {
return route
}
}
return nil
}
// buildMiddlewareChain 构建中间件链 func (g *APIGateway) buildMiddlewareChain(route *RouteConfig, final http.HandlerFunc) http.HandlerFunc { handler := final
// 从后往前构建中间件链
for i := len(g.middlewares) - 1; i >= 0; i-- {
handler = g.middlewares[i].Process(handler)
}
return handler
}
// handleProxyRequest 处理代理请求 func (g *APIGateway) handleProxyRequest(w http.ResponseWriter, r *http.Request) { // 转发到代理服务器 g.proxyServer.ServeHTTP(w, r) }
中间件实现 #
CORS 中间件 #
// middleware/cors.go
package middleware
import (
"net/http"
"strings"
)
// CORSMiddleware CORS中间件
type CORSMiddleware struct {
Origins []string
}
// Process 处理CORS
func (c *CORSMiddleware) Process(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
origin := r.Header.Get("Origin")
// 检查Origin是否允许
if c.isOriginAllowed(origin) {
w.Header().Set("Access-Control-Allow-Origin", origin)
}
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
w.Header().Set("Access-Control-Max-Age", "86400")
// 处理预检请求
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
return
}
next(w, r)
}
}
// isOriginAllowed 检查Origin是否允许
func (c *CORSMiddleware) isOriginAllowed(origin string) bool {
if len(c.Origins) == 0 {
return true // 允许所有Origin
}
for _, allowed := range c.Origins {
if allowed == "*" || allowed == origin {
return true
}
}
return false
}
认证中间件 #
// middleware/auth.go
package middleware
import (
"context"
"net/http"
"strings"
"github.com/example/server-lb/auth"
)
// AuthMiddleware 认证中间件
type AuthMiddleware struct {
authenticator auth.Authenticator
}
// Process 处理认证
func (a *AuthMiddleware) Process(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// 获取Authorization头
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
http.Error(w, "Missing authorization header", http.StatusUnauthorized)
return
}
// 解析Bearer token
parts := strings.SplitN(authHeader, " ", 2)
if len(parts) != 2 || parts[0] != "Bearer" {
http.Error(w, "Invalid authorization header", http.StatusUnauthorized)
return
}
token := parts[1]
// 验证token
claims, err := a.authenticator.ValidateToken(token)
if err != nil {
http.Error(w, "Invalid token", http.StatusUnauthorized)
return
}
// 将用户信息添加到上下文
ctx := context.WithValue(r.Context(), "user", claims)
r = r.WithContext(ctx)
next(w, r)
}
}
限流中间件 #
// middleware/ratelimit.go
package middleware
import (
"net/http"
"strconv"
"github.com/example/server-lb/ratelimit"
)
// RateLimitMiddleware 限流中间件
type RateLimitMiddleware struct {
rateLimiter ratelimit.RateLimiter
}
// Process 处理限流
func (r *RateLimitMiddleware) Process(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// 获取客户端IP
clientIP := r.getClientIP(req)
// 检查限流
allowed, remaining, resetTime := r.rateLimiter.Allow(clientIP)
// 设置限流响应头
w.Header().Set("X-RateLimit-Remaining", strconv.Itoa(remaining))
w.Header().Set("X-RateLimit-Reset", strconv.FormatInt(resetTime, 10))
if !allowed {
w.Header().Set("Retry-After", strconv.FormatInt(resetTime, 10))
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
return
}
next(w, req)
}
}
// getClientIP 获取客户端IP
func (r *RateLimitMiddleware) getClientIP(req *http.Request) string {
// 检查X-Forwarded-For头
if xff := req.Header.Get("X-Forwarded-For"); xff != "" {
return strings.Split(xff, ",")[0]
}
// 检查X-Real-IP头
if xri := req.Header.Get("X-Real-IP"); xri != "" {
return xri
}
// 使用RemoteAddr
return strings.Split(req.RemoteAddr, ":")[0]
}
流量控制与限流 #
令牌桶限流器 #
// ratelimit/token_bucket.go
package ratelimit
import (
"sync"
"time"
)
// TokenBucketLimiter 令牌桶限流器
type TokenBucketLimiter struct {
buckets map[string]*TokenBucket
mutex sync.RWMutex
// 配置
capacity int // 桶容量
refillRate int // 令牌补充速率(每秒)
refillPeriod time.Duration // 补充周期
}
// TokenBucket 令牌桶
type TokenBucket struct {
capacity int
tokens int
refillRate int
lastRefill time.Time
mutex sync.Mutex
}
// NewTokenBucketLimiter 创建令牌桶限流器
func NewTokenBucketLimiter(capacity, refillRate int) *TokenBucketLimiter {
limiter := &TokenBucketLimiter{
buckets: make(map[string]*TokenBucket),
capacity: capacity,
refillRate: refillRate,
refillPeriod: time.Second,
}
// 启动令牌补充协程
go limiter.startRefillRoutine()
return limiter
}
// Allow 检查是否允许请求
func (t *TokenBucketLimiter) Allow(key string) (bool, int, int64) {
bucket := t.getBucket(key)
bucket.mutex.Lock()
defer bucket.mutex.Unlock()
// 补充令牌
t.refillBucket(bucket)
// 检查是否有可用令牌
if bucket.tokens > 0 {
bucket.tokens--
return true, bucket.tokens, time.Now().Add(time.Second).Unix()
}
return false, 0, time.Now().Add(time.Second).Unix()
}
// getBucket 获取或创建令牌桶
func (t *TokenBucketLimiter) getBucket(key string) *TokenBucket {
t.mutex.RLock()
bucket, exists := t.buckets[key]
t.mutex.RUnlock()
if exists {
return bucket
}
t.mutex.Lock()
defer t.mutex.Unlock()
// 双重检查
if bucket, exists := t.buckets[key]; exists {
return bucket
}
// 创建新的令牌桶
bucket = &TokenBucket{
capacity: t.capacity,
tokens: t.capacity,
refillRate: t.refillRate,
lastRefill: time.Now(),
}
t.buckets[key] = bucket
return bucket
}
// refillBucket 补充令牌
func (t *TokenBucketLimiter) refillBucket(bucket *TokenBucket) {
now := time.Now()
elapsed := now.Sub(bucket.lastRefill)
if elapsed >= t.refillPeriod {
tokensToAdd := int(elapsed/t.refillPeriod) * bucket.refillRate
bucket.tokens = min(bucket.capacity, bucket.tokens+tokensToAdd)
bucket.lastRefill = now
}
}
// startRefillRoutine 启动令牌补充协程
func (t *TokenBucketLimiter) startRefillRoutine() {
ticker := time.NewTicker(t.refillPeriod)
defer ticker.Stop()
for range ticker.C {
t.mutex.RLock()
for _, bucket := range t.buckets {
bucket.mutex.Lock()
t.refillBucket(bucket)
bucket.mutex.Unlock()
}
t.mutex.RUnlock()
}
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
高可用架构设计 #
主备模式实现 #
// ha/master_slave.go
package ha
import (
"context"
"log"
"sync"
"time"
"github.com/example/server-lb/discovery"
)
// HAManager 高可用管理器
type HAManager struct {
// 状态
isMaster bool
nodeID string
// 组件
discoveryClient discovery.ServiceDiscovery
// 配置
config HAConfig
// 控制
ctx context.Context
cancel context.CancelFunc
mutex sync.RWMutex
// 回调
onBecameMaster func()
onBecameSlave func()
}
// HAConfig 高可用配置
type HAConfig struct {
NodeID string
ElectionPath string
HeartbeatInterval time.Duration
ElectionTimeout time.Duration
}
// NewHAManager 创建高可用管理器
func NewHAManager(
discoveryClient discovery.ServiceDiscovery,
config HAConfig,
) *HAManager {
ctx, cancel := context.WithCancel(context.Background())
return &HAManager{
nodeID: config.NodeID,
discoveryClient: discoveryClient,
config: config,
ctx: ctx,
cancel: cancel,
}
}
// Start 启动高可用管理
func (h *HAManager) Start() error {
go h.startElection()
go h.startHeartbeat()
return nil
}
// Stop 停止高可用管理
func (h *HAManager) Stop() {
h.cancel()
}
// IsMaster 检查是否为主节点
func (h *HAManager) IsMaster() bool {
h.mutex.RLock()
defer h.mutex.RUnlock()
return h.isMaster
}
// SetCallbacks 设置回调函数
func (h *HAManager) SetCallbacks(onMaster, onSlave func()) {
h.onBecameMaster = onMaster
h.onBecameSlave = onSlave
}
// startElection 启动选举
func (h *HAManager) startElection() {
ticker := time.NewTicker(h.config.ElectionTimeout)
defer ticker.Stop()
for {
select {
case <-h.ctx.Done():
return
case <-ticker.C:
h.performElection()
}
}
}
// performElection 执行选举
func (h *HAManager) performElection() {
// 尝试获取领导权
success := h.tryAcquireLeadership()
h.mutex.Lock()
wasMaster := h.isMaster
h.isMaster = success
h.mutex.Unlock()
// 状态变化回调
if success && !wasMaster {
log.Printf("Node %s became master", h.nodeID)
if h.onBecameMaster != nil {
h.onBecameMaster()
}
} else if !success && wasMaster {
log.Printf("Node %s became slave", h.nodeID)
if h.onBecameSlave != nil {
h.onBecameSlave()
}
}
}
// tryAcquireLeadership 尝试获取领导权
func (h *HAManager) tryAcquireLeadership() bool {
// 这里简化实现,实际应该使用分布式锁
// 比如基于etcd的选举机制
return true // 简化返回
}
// startHeartbeat 启动心跳
func (h *HAManager) startHeartbeat() {
ticker := time.NewTicker(h.config.HeartbeatInterval)
defer ticker.Stop()
for {
select {
case <-h.ctx.Done():
return
case <-ticker.C:
if h.IsMaster() {
h.sendHeartbeat()
}
}
}
}
// sendHeartbeat 发送心跳
func (h *HAManager) sendHeartbeat() {
// 发送心跳到服务发现系统
// 实际实现中应该更新节点状态
log.Printf("Master node %s sending heartbeat", h.nodeID)
}
完整应用示例 #
服务端负载均衡器启动 #
// main.go
package main
import (
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/example/server-lb/discovery"
"github.com/example/server-lb/gateway"
"github.com/example/server-lb/ha"
"github.com/example/server-lb/proxy"
)
func main() {
// 创建服务发现客户端
discoveryClient := discovery.NewConsulDiscovery("localhost:8500")
// 创建代理服务器配置
proxyConfig := proxy.ProxyConfig{
ListenAddr: ":8080",
HealthCheckInterval: time.Second * 10,
DiscoveryInterval: time.Second * 30,
RequestTimeout: time.Second * 30,
MaxIdleConns: 100,
IdleConnTimeout: time.Second * 90,
}
// 创建代理服务器
proxyServer := proxy.NewProxyServer(discoveryClient, proxyConfig)
// 配置服务路由
proxyServer.AddService(&proxy.ServiceConfig{
Name: "user-service",
PathPrefix: "/api/users",
BalancerType: "weighted_round_robin",
HealthCheck: proxy.HealthCheckConfig{
Enabled: true,
Path: "/health",
Interval: time.Second * 10,
Timeout: time.Second * 5,
},
})
proxyServer.AddService(&proxy.ServiceConfig{
Name: "order-service",
PathPrefix: "/api/orders",
BalancerType: "least_connections",
HealthCheck: proxy.HealthCheckConfig{
Enabled: true,
Path: "/health",
Interval: time.Second * 10,
Timeout: time.Second * 5,
},
})
// 创建API网关配置
gatewayConfig := gateway.GatewayConfig{
ListenAddr: ":8081",
EnableAuth: true,
EnableRateLimit: true,
CorsEnabled: true,
CorsOrigins: []string{"*"},
}
// 创建API网关
apiGateway := gateway.NewAPIGateway(proxyServer, gatewayConfig)
// 配置网关路由
apiGateway.AddRoute(&gateway.RouteConfig{
Path: "/api/users",
Methods: []string{"GET", "POST", "PUT", "DELETE"},
ServiceName: "user-service",
AuthRequired: true,
RateLimit: &gateway.RateLimitConfig{
RequestsPerSecond: 100,
BurstSize: 10,
},
Timeout: time.Second * 30,
Retries: 3,
})
// 创建高可用管理器
haConfig := ha.HAConfig{
NodeID: "lb-node-1",
ElectionPath: "/lb/election",
HeartbeatInterval: time.Second * 5,
ElectionTimeout: time.Second * 15,
}
haManager := ha.NewHAManager(discoveryClient, haConfig)
// 设置高可用回调
haManager.SetCallbacks(
func() {
log.Println("Became master, starting services...")
go func() {
if err := proxyServer.Start(); err != nil {
log.Printf("Proxy server error: %v", err)
}
}()
go func() {
if err := apiGateway.Start(); err != nil {
log.Printf("API gateway error: %v", err)
}
}()
},
func() {
log.Println("Became slave, stopping services...")
proxyServer.Stop()
apiGateway.Stop()
},
)
// 启动高可用管理
if err := haManager.Start(); err != nil {
log.Fatal("Failed to start HA manager:", err)
}
// 等待退出信号
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down...")
// 优雅关闭
haManager.Stop()
proxyServer.Stop()
apiGateway.Stop()
log.Println("Server stopped")
}
配置文件示例 #
# config.yaml
server:
listen_addr: ":8080"
request_timeout: "30s"
max_idle_conns: 100
idle_conn_timeout: "90s"
services:
- name: "user-service"
path_prefix: "/api/users"
balancer_type: "weighted_round_robin"
health_check:
enabled: true
path: "/health"
interval: "10s"
timeout: "5s"
- name: "order-service"
path_prefix: "/api/orders"
balancer_type: "least_connections"
health_check:
enabled: true
path: "/health"
interval: "10s"
timeout: "5s"
gateway:
listen_addr: ":8081"
enable_auth: true
enable_rate_limit: true
cors_enabled: true
cors_origins: ["*"]
routes:
- path: "/api/users"
methods: ["GET", "POST", "PUT", "DELETE"]
service_name: "user-service"
auth_required: true
rate_limit:
requests_per_second: 100
burst_size: 10
timeout: "30s"
retries: 3
discovery:
type: "consul"
address: "localhost:8500"
interval: "30s"
high_availability:
node_id: "lb-node-1"
election_path: "/lb/election"
heartbeat_interval: "5s"
election_timeout: "15s"
监控和指标 #
// metrics/collector.go
package metrics
import (
"sync"
"time"
)
// MetricsCollector 指标收集器
type MetricsCollector struct {
// 请求指标
TotalRequests int64
SuccessRequests int64
FailedRequests int64
AvgResponseTime time.Duration
// 服务指标
ActiveServices int
HealthyInstances int
TotalInstances int
// 限流指标
RateLimitHits int64
RateLimitMisses int64
mutex sync.RWMutex
}
// NewMetricsCollector 创建指标收集器
func NewMetricsCollector() *MetricsCollector {
return &MetricsCollector{}
}
// RecordRequest 记录请求
func (m *MetricsCollector) RecordRequest(success bool, responseTime time.Duration) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.TotalRequests++
if success {
m.SuccessRequests++
} else {
m.FailedRequests++
}
// 更新平均响应时间
if m.AvgResponseTime == 0 {
m.AvgResponseTime = responseTime
} else {
m.AvgResponseTime = (m.AvgResponseTime + responseTime) / 2
}
}
// RecordRateLimit 记录限流
func (m *MetricsCollector) RecordRateLimit(hit bool) {
m.mutex.Lock()
defer m.mutex.Unlock()
if hit {
m.RateLimitHits++
} else {
m.RateLimitMisses++
}
}
// GetMetrics 获取指标
func (m *MetricsCollector) GetMetrics() map[string]interface{} {
m.mutex.RLock()
defer m.mutex.RUnlock()
successRate := float64(0)
if m.TotalRequests > 0 {
successRate = float64(m.SuccessRequests) / float64(m.TotalRequests) * 100
}
return map[string]interface{}{
"total_requests": m.TotalRequests,
"success_requests": m.SuccessRequests,
"failed_requests": m.FailedRequests,
"success_rate": successRate,
"avg_response_time": m.AvgResponseTime.Milliseconds(),
"active_services": m.ActiveServices,
"healthy_instances": m.HealthyInstances,
"total_instances": m.TotalInstances,
"rate_limit_hits": m.RateLimitHits,
"rate_limit_misses": m.RateLimitMisses,
}
}
通过实现这个完整的服务端负载均衡系统,我们为微服务架构提供了集中式的流量管理、安全控制和高可用保障。系统包含了反向代理、API 网关、中间件、限流、高可用等关键功能,能够有效支撑大规模微服务部署的需求。