5.4.2 负载均衡算法

5.4.2 负载均衡算法 #

负载均衡是分布式系统中的关键组件,它将客户端请求分配到多个服务器实例上,以提高系统的可用性、可扩展性和性能。本节将深入探讨各种负载均衡算法的原理和 Go 语言实现。

负载均衡基础理论 #

负载均衡的目标 #

  1. 提高可用性:避免单点故障,提供冗余
  2. 提升性能:分散负载,减少响应时间
  3. 增强扩展性:支持水平扩展
  4. 资源优化:充分利用服务器资源

负载均衡分类 #

按实现位置分类

  • 客户端负载均衡:在客户端实现负载均衡逻辑
  • 服务端负载均衡:通过代理服务器实现负载均衡

按算法类型分类

  • 静态算法:不考虑服务器当前状态
  • 动态算法:根据服务器实时状态调整

负载均衡器接口设计 #

核心接口定义 #

// balancer/interface.go
package balancer

import (
    "context"
    "errors"
)

var (
    ErrNoAvailableInstance = errors.New("no available instance")
    ErrInstanceNotFound    = errors.New("instance not found")
)

// Instance 服务实例
type Instance struct {
    ID       string
    Address  string
    Port     int
    Weight   int
    Metadata map[string]string

    // 运行时状态
    Active      bool
    Connections int
    ResponseTime int64 // 毫秒
    ErrorCount   int64
    LastUsed     int64 // Unix 时间戳
}

// LoadBalancer 负载均衡器接口
type LoadBalancer interface {
    // 选择实例
    Select(ctx context.Context) (*Instance, error)

    // 更新实例列表
    UpdateInstances(instances []*Instance) error

    // 标记实例状态
    MarkSuccess(instanceID string)
    MarkFailure(instanceID string)

    // 获取统计信息
    GetStats() map[string]interface{}
}

// WeightedInstance 带权重的实例
type WeightedInstance struct {
    *Instance
    CurrentWeight int
    EffectiveWeight int
}

轮询算法实现 #

简单轮询(Round Robin) #

// balancer/round_robin.go
package balancer

import (
    "context"
    "sync"
    "sync/atomic"
)

// RoundRobinBalancer 轮询负载均衡器
type RoundRobinBalancer struct {
    instances []*Instance
    current   uint64
    mutex     sync.RWMutex
}

// NewRoundRobinBalancer 创建轮询负载均衡器
func NewRoundRobinBalancer() *RoundRobinBalancer {
    return &RoundRobinBalancer{
        instances: make([]*Instance, 0),
    }
}

// Select 选择实例
func (r *RoundRobinBalancer) Select(ctx context.Context) (*Instance, error) {
    r.mutex.RLock()
    defer r.mutex.RUnlock()

    if len(r.instances) == 0 {
        return nil, ErrNoAvailableInstance
    }

    // 过滤活跃实例
    activeInstances := make([]*Instance, 0)
    for _, instance := range r.instances {
        if instance.Active {
            activeInstances = append(activeInstances, instance)
        }
    }

    if len(activeInstances) == 0 {
        return nil, ErrNoAvailableInstance
    }

    // 原子操作获取下一个索引
    next := atomic.AddUint64(&r.current, 1)
    index := (next - 1) % uint64(len(activeInstances))

    return activeInstances[index], nil
}

// UpdateInstances 更新实例列表
func (r *RoundRobinBalancer) UpdateInstances(instances []*Instance) error {
    r.mutex.Lock()
    defer r.mutex.Unlock()

    r.instances = make([]*Instance, len(instances))
    copy(r.instances, instances)

    return nil
}

// MarkSuccess 标记成功
func (r *RoundRobinBalancer) MarkSuccess(instanceID string) {
    r.mutex.RLock()
    defer r.mutex.RUnlock()

    for _, instance := range r.instances {
        if instance.ID == instanceID {
            instance.ErrorCount = 0
            break
        }
    }
}

// MarkFailure 标记失败
func (r *RoundRobinBalancer) MarkFailure(instanceID string) {
    r.mutex.RLock()
    defer r.mutex.RUnlock()

    for _, instance := range r.instances {
        if instance.ID == instanceID {
            atomic.AddInt64(&instance.ErrorCount, 1)
            // 如果错误次数过多,标记为不活跃
            if instance.ErrorCount > 3 {
                instance.Active = false
            }
            break
        }
    }
}

// GetStats 获取统计信息
func (r *RoundRobinBalancer) GetStats() map[string]interface{} {
    r.mutex.RLock()
    defer r.mutex.RUnlock()

    activeCount := 0
    totalConnections := 0

    for _, instance := range r.instances {
        if instance.Active {
            activeCount++
        }
        totalConnections += instance.Connections
    }

    return map[string]interface{}{
        "algorithm":          "round_robin",
        "total_instances":    len(r.instances),
        "active_instances":   activeCount,
        "total_connections":  totalConnections,
        "current_position":   atomic.LoadUint64(&r.current),
    }
}

加权轮询(Weighted Round Robin) #

// balancer/weighted_round_robin.go
package balancer

import (
    "context"
    "sync"
)

// WeightedRoundRobinBalancer 加权轮询负载均衡器
type WeightedRoundRobinBalancer struct {
    instances []*WeightedInstance
    mutex     sync.RWMutex
}

// NewWeightedRoundRobinBalancer 创建加权轮询负载均衡器
func NewWeightedRoundRobinBalancer() *WeightedRoundRobinBalancer {
    return &WeightedRoundRobinBalancer{
        instances: make([]*WeightedInstance, 0),
    }
}

// Select 选择实例
func (w *WeightedRoundRobinBalancer) Select(ctx context.Context) (*Instance, error) {
    w.mutex.Lock()
    defer w.mutex.Unlock()

    if len(w.instances) == 0 {
        return nil, ErrNoAvailableInstance
    }

    // 过滤活跃实例
    activeInstances := make([]*WeightedInstance, 0)
    for _, instance := range w.instances {
        if instance.Active {
            activeInstances = append(activeInstances, instance)
        }
    }

    if len(activeInstances) == 0 {
        return nil, ErrNoAvailableInstance
    }

    // 计算总权重
    totalWeight := 0
    for _, instance := range activeInstances {
        totalWeight += instance.EffectiveWeight
    }

    if totalWeight <= 0 {
        return nil, ErrNoAvailableInstance
    }

    // 选择权重最高的实例
    var selected *WeightedInstance
    maxCurrentWeight := -1

    for _, instance := range activeInstances {
        // 增加当前权重
        instance.CurrentWeight += instance.EffectiveWeight

        // 选择当前权重最高的实例
        if selected == nil || instance.CurrentWeight > maxCurrentWeight {
            selected = instance
            maxCurrentWeight = instance.CurrentWeight
        }
    }

    // 减少选中实例的当前权重
    selected.CurrentWeight -= totalWeight

    return selected.Instance, nil
}

// UpdateInstances 更新实例列表
func (w *WeightedRoundRobinBalancer) UpdateInstances(instances []*Instance) error {
    w.mutex.Lock()
    defer w.mutex.Unlock()

    w.instances = make([]*WeightedInstance, len(instances))
    for i, instance := range instances {
        weight := instance.Weight
        if weight <= 0 {
            weight = 1 // 默认权重为1
        }

        w.instances[i] = &WeightedInstance{
            Instance:        instance,
            CurrentWeight:   0,
            EffectiveWeight: weight,
        }
    }

    return nil
}

// MarkSuccess 标记成功
func (w *WeightedRoundRobinBalancer) MarkSuccess(instanceID string) {
    w.mutex.Lock()
    defer w.mutex.Unlock()

    for _, instance := range w.instances {
        if instance.ID == instanceID {
            instance.ErrorCount = 0
            // 恢复有效权重
            if instance.EffectiveWeight < instance.Weight {
                instance.EffectiveWeight++
            }
            break
        }
    }
}

// MarkFailure 标记失败
func (w *WeightedRoundRobinBalancer) MarkFailure(instanceID string) {
    w.mutex.Lock()
    defer w.mutex.Unlock()

    for _, instance := range w.instances {
        if instance.ID == instanceID {
            instance.ErrorCount++
            // 降低有效权重
            if instance.EffectiveWeight > 0 {
                instance.EffectiveWeight--
            }
            // 如果错误次数过多,标记为不活跃
            if instance.ErrorCount > 3 {
                instance.Active = false
            }
            break
        }
    }
}

// GetStats 获取统计信息
func (w *WeightedRoundRobinBalancer) GetStats() map[string]interface{} {
    w.mutex.RLock()
    defer w.mutex.RUnlock()

    activeCount := 0
    totalWeight := 0

    for _, instance := range w.instances {
        if instance.Active {
            activeCount++
            totalWeight += instance.EffectiveWeight
        }
    }

    return map[string]interface{}{
        "algorithm":        "weighted_round_robin",
        "total_instances":  len(w.instances),
        "active_instances": activeCount,
        "total_weight":     totalWeight,
    }
}

随机算法实现 #

简单随机(Random) #

// balancer/random.go
package balancer

import (
    "context"
    "math/rand"
    "sync"
    "time"
)

// RandomBalancer 随机负载均衡器
type RandomBalancer struct {
    instances []*Instance
    random    *rand.Rand
    mutex     sync.RWMutex
}

// NewRandomBalancer 创建随机负载均衡器
func NewRandomBalancer() *RandomBalancer {
    return &RandomBalancer{
        instances: make([]*Instance, 0),
        random:    rand.New(rand.NewSource(time.Now().UnixNano())),
    }
}

// Select 选择实例
func (r *RandomBalancer) Select(ctx context.Context) (*Instance, error) {
    r.mutex.RLock()
    defer r.mutex.RUnlock()

    if len(r.instances) == 0 {
        return nil, ErrNoAvailableInstance
    }

    // 过滤活跃实例
    activeInstances := make([]*Instance, 0)
    for _, instance := range r.instances {
        if instance.Active {
            activeInstances = append(activeInstances, instance)
        }
    }

    if len(activeInstances) == 0 {
        return nil, ErrNoAvailableInstance
    }

    // 随机选择
    index := r.random.Intn(len(activeInstances))
    return activeInstances[index], nil
}

// UpdateInstances 更新实例列表
func (r *RandomBalancer) UpdateInstances(instances []*Instance) error {
    r.mutex.Lock()
    defer r.mutex.Unlock()

    r.instances = make([]*Instance, len(instances))
    copy(r.instances, instances)

    return nil
}

// MarkSuccess 标记成功
func (r *RandomBalancer) MarkSuccess(instanceID string) {
    r.mutex.RLock()
    defer r.mutex.RUnlock()

    for _, instance := range r.instances {
        if instance.ID == instanceID {
            instance.ErrorCount = 0
            break
        }
    }
}

// MarkFailure 标记失败
func (r *RandomBalancer) MarkFailure(instanceID string) {
    r.mutex.RLock()
    defer r.mutex.RUnlock()

    for _, instance := range r.instances {
        if instance.ID == instanceID {
            instance.ErrorCount++
            if instance.ErrorCount > 3 {
                instance.Active = false
            }
            break
        }
    }
}

// GetStats 获取统计信息
func (r *RandomBalancer) GetStats() map[string]interface{} {
    r.mutex.RLock()
    defer r.mutex.RUnlock()

    activeCount := 0
    for _, instance := range r.instances {
        if instance.Active {
            activeCount++
        }
    }

    return map[string]interface{}{
        "algorithm":        "random",
        "total_instances":  len(r.instances),
        "active_instances": activeCount,
    }
}

加权随机(Weighted Random) #

// balancer/weighted_random.go
package balancer

import (
    "context"
    "math/rand"
    "sync"
    "time"
)

// WeightedRandomBalancer 加权随机负载均衡器
type WeightedRandomBalancer struct {
    instances []*Instance
    random    *rand.Rand
    mutex     sync.RWMutex
}

// NewWeightedRandomBalancer 创建加权随机负载均衡器
func NewWeightedRandomBalancer() *WeightedRandomBalancer {
    return &WeightedRandomBalancer{
        instances: make([]*Instance, 0),
        random:    rand.New(rand.NewSource(time.Now().UnixNano())),
    }
}

// Select 选择实例
func (w *WeightedRandomBalancer) Select(ctx context.Context) (*Instance, error) {
    w.mutex.RLock()
    defer w.mutex.RUnlock()

    if len(w.instances) == 0 {
        return nil, ErrNoAvailableInstance
    }

    // 过滤活跃实例并计算总权重
    activeInstances := make([]*Instance, 0)
    totalWeight := 0

    for _, instance := range w.instances {
        if instance.Active {
            activeInstances = append(activeInstances, instance)
            weight := instance.Weight
            if weight <= 0 {
                weight = 1
            }
            totalWeight += weight
        }
    }

    if len(activeInstances) == 0 || totalWeight <= 0 {
        return nil, ErrNoAvailableInstance
    }

    // 生成随机数
    randomWeight := w.random.Intn(totalWeight)

    // 根据权重选择实例
    currentWeight := 0
    for _, instance := range activeInstances {
        weight := instance.Weight
        if weight <= 0 {
            weight = 1
        }
        currentWeight += weight

        if randomWeight < currentWeight {
            return instance, nil
        }
    }

    // 理论上不应该到达这里,但为了安全返回最后一个实例
    return activeInstances[len(activeInstances)-1], nil
}

// UpdateInstances 更新实例列表
func (w *WeightedRandomBalancer) UpdateInstances(instances []*Instance) error {
    w.mutex.Lock()
    defer w.mutex.Unlock()

    w.instances = make([]*Instance, len(instances))
    copy(w.instances, instances)

    return nil
}

// MarkSuccess 标记成功
func (w *WeightedRandomBalancer) MarkSuccess(instanceID string) {
    w.mutex.RLock()
    defer w.mutex.RUnlock()

    for _, instance := range w.instances {
        if instance.ID == instanceID {
            instance.ErrorCount = 0
            break
        }
    }
}

// MarkFailure 标记失败
func (w *WeightedRandomBalancer) MarkFailure(instanceID string) {
    w.mutex.RLock()
    defer w.mutex.RUnlock()

    for _, instance := range w.instances {
        if instance.ID == instanceID {
            instance.ErrorCount++
            if instance.ErrorCount > 3 {
                instance.Active = false
            }
            break
        }
    }
}

// GetStats 获取统计信息
func (w *WeightedRandomBalancer) GetStats() map[string]interface{} {
    w.mutex.RLock()
    defer w.mutex.RUnlock()

    activeCount := 0
    totalWeight := 0

    for _, instance := range w.instances {
        if instance.Active {
            activeCount++
            weight := instance.Weight
            if weight <= 0 {
                weight = 1
            }
            totalWeight += weight
        }
    }

    return map[string]interface{}{
        "algorithm":        "weighted_random",
        "total_instances":  len(w.instances),
        "active_instances": activeCount,
        "total_weight":     totalWeight,
    }
}

最少连接算法 #

最少连接(Least Connections) #

// balancer/least_connections.go
package balancer

import (
    "context"
    "sync"
)

// LeastConnectionsBalancer 最少连接负载均衡器
type LeastConnectionsBalancer struct {
    instances []*Instance
    mutex     sync.RWMutex
}

// NewLeastConnectionsBalancer 创建最少连接负载均衡器
func NewLeastConnectionsBalancer() *LeastConnectionsBalancer {
    return &LeastConnectionsBalancer{
        instances: make([]*Instance, 0),
    }
}

// Select 选择实例
func (l *LeastConnectionsBalancer) Select(ctx context.Context) (*Instance, error) {
    l.mutex.RLock()
    defer l.mutex.RUnlock()

    if len(l.instances) == 0 {
        return nil, ErrNoAvailableInstance
    }

    // 找到连接数最少的活跃实例
    var selected *Instance
    minConnections := -1

    for _, instance := range l.instances {
        if !instance.Active {
            continue
        }

        if selected == nil || instance.Connections < minConnections {
            selected = instance
            minConnections = instance.Connections
        }
    }

    if selected == nil {
        return nil, ErrNoAvailableInstance
    }

    // 增加连接数
    selected.Connections++

    return selected, nil
}

// UpdateInstances 更新实例列表
func (l *LeastConnectionsBalancer) UpdateInstances(instances []*Instance) error {
    l.mutex.Lock()
    defer l.mutex.Unlock()

    l.instances = make([]*Instance, len(instances))
    copy(l.instances, instances)

    return nil
}

// MarkSuccess 标记成功
func (l *LeastConnectionsBalancer) MarkSuccess(instanceID string) {
    l.mutex.Lock()
    defer l.mutex.Unlock()

    for _, instance := range l.instances {
        if instance.ID == instanceID {
            instance.ErrorCount = 0
            // 减少连接数
            if instance.Connections > 0 {
                instance.Connections--
            }
            break
        }
    }
}

// MarkFailure 标记失败
func (l *LeastConnectionsBalancer) MarkFailure(instanceID string) {
    l.mutex.Lock()
    defer l.mutex.Unlock()

    for _, instance := range l.instances {
        if instance.ID == instanceID {
            instance.ErrorCount++
            // 减少连接数
            if instance.Connections > 0 {
                instance.Connections--
            }
            if instance.ErrorCount > 3 {
                instance.Active = false
            }
            break
        }
    }
}

// GetStats 获取统计信息
func (l *LeastConnectionsBalancer) GetStats() map[string]interface{} {
    l.mutex.RLock()
    defer l.mutex.RUnlock()

    activeCount := 0
    totalConnections := 0
    minConnections := -1
    maxConnections := 0

    for _, instance := range l.instances {
        if instance.Active {
            activeCount++
            totalConnections += instance.Connections

            if minConnections == -1 || instance.Connections < minConnections {
                minConnections = instance.Connections
            }
            if instance.Connections > maxConnections {
                maxConnections = instance.Connections
            }
        }
    }

    if minConnections == -1 {
        minConnections = 0
    }

    return map[string]interface{}{
        "algorithm":          "least_connections",
        "total_instances":    len(l.instances),
        "active_instances":   activeCount,
        "total_connections":  totalConnections,
        "min_connections":    minConnections,
        "max_connections":    maxConnections,
    }
}

一致性哈希算法 #

一致性哈希实现 #

// balancer/consistent_hash.go
package balancer

import (
    "context"
    "crypto/sha1"
    "fmt"
    "sort"
    "strconv"
    "sync"
)

// ConsistentHashBalancer 一致性哈希负载均衡器
type ConsistentHashBalancer struct {
    instances    []*Instance
    hashRing     map[uint32]*Instance
    sortedHashes []uint32
    virtualNodes int
    mutex        sync.RWMutex
}

// NewConsistentHashBalancer 创建一致性哈希负载均衡器
func NewConsistentHashBalancer(virtualNodes int) *ConsistentHashBalancer {
    if virtualNodes <= 0 {
        virtualNodes = 150 // 默认虚拟节点数
    }

    return &ConsistentHashBalancer{
        instances:    make([]*Instance, 0),
        hashRing:     make(map[uint32]*Instance),
        sortedHashes: make([]uint32, 0),
        virtualNodes: virtualNodes,
    }
}

// Select 选择实例(需要提供键值)
func (c *ConsistentHashBalancer) Select(ctx context.Context) (*Instance, error) {
    // 一致性哈希需要键值,这里从上下文中获取
    key, ok := ctx.Value("hash_key").(string)
    if !ok {
        return nil, fmt.Errorf("hash_key not found in context")
    }

    return c.SelectByKey(key)
}

// SelectByKey 根据键选择实例
func (c *ConsistentHashBalancer) SelectByKey(key string) (*Instance, error) {
    c.mutex.RLock()
    defer c.mutex.RUnlock()

    if len(c.sortedHashes) == 0 {
        return nil, ErrNoAvailableInstance
    }

    hash := c.hashKey(key)

    // 在哈希环上找到第一个大于等于该哈希值的节点
    idx := sort.Search(len(c.sortedHashes), func(i int) bool {
        return c.sortedHashes[i] >= hash
    })

    // 如果没找到,选择第一个节点(环形结构)
    if idx == len(c.sortedHashes) {
        idx = 0
    }

    selectedHash := c.sortedHashes[idx]
    instance := c.hashRing[selectedHash]

    // 如果选中的实例不活跃,尝试找下一个活跃的实例
    if !instance.Active {
        for i := 1; i < len(c.sortedHashes); i++ {
            nextIdx := (idx + i) % len(c.sortedHashes)
            nextHash := c.sortedHashes[nextIdx]
            nextInstance := c.hashRing[nextHash]

            if nextInstance.Active {
                return nextInstance, nil
            }
        }
        return nil, ErrNoAvailableInstance
    }

    return instance, nil
}

// UpdateInstances 更新实例列表
func (c *ConsistentHashBalancer) UpdateInstances(instances []*Instance) error {
    c.mutex.Lock()
    defer c.mutex.Unlock()

    // 清空现有的哈希环
    c.hashRing = make(map[uint32]*Instance)
    c.sortedHashes = make([]uint32, 0)
    c.instances = make([]*Instance, len(instances))
    copy(c.instances, instances)

    // 为每个实例创建虚拟节点
    for _, instance := range instances {
        c.addInstanceToRing(instance)
    }

    // 排序哈希值
    sort.Slice(c.sortedHashes, func(i, j int) bool {
        return c.sortedHashes[i] < c.sortedHashes[j]
    })

    return nil
}

// addInstanceToRing 将实例添加到哈希环
func (c *ConsistentHashBalancer) addInstanceToRing(instance *Instance) {
    for i := 0; i < c.virtualNodes; i++ {
        virtualKey := fmt.Sprintf("%s:%d:%d", instance.Address, instance.Port, i)
        hash := c.hashKey(virtualKey)
        c.hashRing[hash] = instance
        c.sortedHashes = append(c.sortedHashes, hash)
    }
}

// hashKey 计算键的哈希值
func (c *ConsistentHashBalancer) hashKey(key string) uint32 {
    h := sha1.New()
    h.Write([]byte(key))
    hashBytes := h.Sum(nil)

    // 取前4个字节作为uint32
    return uint32(hashBytes[0])<<24 | uint32(hashBytes[1])<<16 |
           uint32(hashBytes[2])<<8 | uint32(hashBytes[3])
}

// MarkSuccess 标记成功
func (c *ConsistentHashBalancer) MarkSuccess(instanceID string) {
    c.mutex.RLock()
    defer c.mutex.RUnlock()

    for _, instance := range c.instances {
        if instance.ID == instanceID {
            instance.ErrorCount = 0
            break
        }
    }
}

// MarkFailure 标记失败
func (c *ConsistentHashBalancer) MarkFailure(instanceID string) {
    c.mutex.RLock()
    defer c.mutex.RUnlock()

    for _, instance := range c.instances {
        if instance.ID == instanceID {
            instance.ErrorCount++
            if instance.ErrorCount > 3 {
                instance.Active = false
            }
            break
        }
    }
}

// GetStats 获取统计信息
func (c *ConsistentHashBalancer) GetStats() map[string]interface{} {
    c.mutex.RLock()
    defer c.mutex.RUnlock()

    activeCount := 0
    for _, instance := range c.instances {
        if instance.Active {
            activeCount++
        }
    }

    return map[string]interface{}{
        "algorithm":        "consistent_hash",
        "total_instances":  len(c.instances),
        "active_instances": activeCount,
        "virtual_nodes":    c.virtualNodes,
        "hash_ring_size":   len(c.hashRing),
    }
}

自适应负载均衡 #

响应时间加权算法 #

// balancer/response_time_weighted.go
package balancer

import (
    "context"
    "sync"
    "time"
)

// ResponseTimeWeightedBalancer 响应时间加权负载均衡器
type ResponseTimeWeightedBalancer struct {
    instances []*Instance
    mutex     sync.RWMutex

    // 配置参数
    decayFactor float64 // 衰减因子
    minWeight   int     // 最小权重
    maxWeight   int     // 最大权重
}

// NewResponseTimeWeightedBalancer 创建响应时间加权负载均衡器
func NewResponseTimeWeightedBalancer() *ResponseTimeWeightedBalancer {
    return &ResponseTimeWeightedBalancer{
        instances:   make([]*Instance, 0),
        decayFactor: 0.9,
        minWeight:   1,
        maxWeight:   100,
    }
}

// Select 选择实例
func (r *ResponseTimeWeightedBalancer) Select(ctx context.Context) (*Instance, error) {
    r.mutex.RLock()
    defer r.mutex.RUnlock()

    if len(r.instances) == 0 {
        return nil, ErrNoAvailableInstance
    }

    // 计算每个实例的动态权重
    var selected *Instance
    maxScore := float64(-1)

    for _, instance := range r.instances {
        if !instance.Active {
            continue
        }

        // 计算得分(权重/响应时间)
        responseTime := float64(instance.ResponseTime)
        if responseTime <= 0 {
            responseTime = 1 // 避免除零
        }

        weight := float64(instance.Weight)
        if weight <= 0 {
            weight = 1
        }

        // 考虑错误率的影响
        errorPenalty := float64(instance.ErrorCount) * 0.1
        score := weight / (responseTime + errorPenalty)

        if selected == nil || score > maxScore {
            selected = instance
            maxScore = score
        }
    }

    if selected == nil {
        return nil, ErrNoAvailableInstance
    }

    // 记录使用时间
    selected.LastUsed = time.Now().Unix()

    return selected, nil
}

// UpdateInstances 更新实例列表
func (r *ResponseTimeWeightedBalancer) UpdateInstances(instances []*Instance) error {
    r.mutex.Lock()
    defer r.mutex.Unlock()

    r.instances = make([]*Instance, len(instances))
    copy(r.instances, instances)

    return nil
}

// UpdateResponseTime 更新响应时间
func (r *ResponseTimeWeightedBalancer) UpdateResponseTime(instanceID string, responseTime int64) {
    r.mutex.Lock()
    defer r.mutex.Unlock()

    for _, instance := range r.instances {
        if instance.ID == instanceID {
            // 使用指数移动平均更新响应时间
            if instance.ResponseTime == 0 {
                instance.ResponseTime = responseTime
            } else {
                oldTime := float64(instance.ResponseTime)
                newTime := float64(responseTime)
                instance.ResponseTime = int64(oldTime*r.decayFactor + newTime*(1-r.decayFactor))
            }
            break
        }
    }
}

// MarkSuccess 标记成功
func (r *ResponseTimeWeightedBalancer) MarkSuccess(instanceID string) {
    r.mutex.Lock()
    defer r.mutex.Unlock()

    for _, instance := range r.instances {
        if instance.ID == instanceID {
            instance.ErrorCount = 0
            break
        }
    }
}

// MarkFailure 标记失败
func (r *ResponseTimeWeightedBalancer) MarkFailure(instanceID string) {
    r.mutex.Lock()
    defer r.mutex.Unlock()

    for _, instance := range r.instances {
        if instance.ID == instanceID {
            instance.ErrorCount++
            if instance.ErrorCount > 3 {
                instance.Active = false
            }
            break
        }
    }
}

// GetStats 获取统计信息
func (r *ResponseTimeWeightedBalancer) GetStats() map[string]interface{} {
    r.mutex.RLock()
    defer r.mutex.RUnlock()

    activeCount := 0
    totalResponseTime := int64(0)
    minResponseTime := int64(-1)
    maxResponseTime := int64(0)

    for _, instance := range r.instances {
        if instance.Active {
            activeCount++
            totalResponseTime += instance.ResponseTime

            if minResponseTime == -1 || instance.ResponseTime < minResponseTime {
                minResponseTime = instance.ResponseTime
            }
            if instance.ResponseTime > maxResponseTime {
                maxResponseTime = instance.ResponseTime
            }
        }
    }

    avgResponseTime := int64(0)
    if activeCount > 0 {
        avgResponseTime = totalResponseTime / int64(activeCount)
    }

    if minResponseTime == -1 {
        minResponseTime = 0
    }

    return map[string]interface{}{
        "algorithm":           "response_time_weighted",
        "total_instances":     len(r.instances),
        "active_instances":    activeCount,
        "avg_response_time":   avgResponseTime,
        "min_response_time":   minResponseTime,
        "max_response_time":   maxResponseTime,
        "decay_factor":        r.decayFactor,
    }
}

负载均衡器工厂 #

工厂模式实现 #

// balancer/factory.go
package balancer

import (
    "fmt"
    "strings"
)

// BalancerType 负载均衡器类型
type BalancerType string

const (
    RoundRobin              BalancerType = "round_robin"
    WeightedRoundRobin      BalancerType = "weighted_round_robin"
    Random                  BalancerType = "random"
    WeightedRandom          BalancerType = "weighted_random"
    LeastConnections        BalancerType = "least_connections"
    ConsistentHash          BalancerType = "consistent_hash"
    ResponseTimeWeighted    BalancerType = "response_time_weighted"
)

// BalancerConfig 负载均衡器配置
type BalancerConfig struct {
    Type         BalancerType
    VirtualNodes int // 用于一致性哈希
}

// BalancerFactory 负载均衡器工厂
type BalancerFactory struct{}

// NewBalancerFactory 创建工厂实例
func NewBalancerFactory() *BalancerFactory {
    return &BalancerFactory{}
}

// CreateBalancer 创建负载均衡器
func (f *BalancerFactory) CreateBalancer(config BalancerConfig) (LoadBalancer, error) {
    switch config.Type {
    case RoundRobin:
        return NewRoundRobinBalancer(), nil
    case WeightedRoundRobin:
        return NewWeightedRoundRobinBalancer(), nil
    case Random:
        return NewRandomBalancer(), nil
    case WeightedRandom:
        return NewWeightedRandomBalancer(), nil
    case LeastConnections:
        return NewLeastConnectionsBalancer(), nil
    case ConsistentHash:
        return NewConsistentHashBalancer(config.VirtualNodes), nil
    case ResponseTimeWeighted:
        return NewResponseTimeWeightedBalancer(), nil
    default:
        return nil, fmt.Errorf("unsupported balancer type: %s", config.Type)
    }
}

// GetSupportedTypes 获取支持的类型列表
func (f *BalancerFactory) GetSupportedTypes() []BalancerType {
    return []BalancerType{
        RoundRobin,
        WeightedRoundRobin,
        Random,
        WeightedRandom,
        LeastConnections,
        ConsistentHash,
        ResponseTimeWeighted,
    }
}

// ParseBalancerType 解析负载均衡器类型
func (f *BalancerFactory) ParseBalancerType(typeStr string) (BalancerType, error) {
    normalized := strings.ToLower(strings.TrimSpace(typeStr))

    switch normalized {
    case "round_robin", "roundrobin", "rr":
        return RoundRobin, nil
    case "weighted_round_robin", "weightedroundrobin", "wrr":
        return WeightedRoundRobin, nil
    case "random", "rand":
        return Random, nil
    case "weighted_random", "weightedrandom", "wr":
        return WeightedRandom, nil
    case "least_connections", "leastconnections", "lc":
        return LeastConnections, nil
    case "consistent_hash", "consistenthash", "ch":
        return ConsistentHash, nil
    case "response_time_weighted", "responsetimeweighted", "rtw":
        return ResponseTimeWeighted, nil
    default:
        return "", fmt.Errorf("unknown balancer type: %s", typeStr)
    }
}

使用示例 #

负载均衡器测试 #

// example/balancer_test.go
package main

import (
    "context"
    "fmt"
    "log"
    "math/rand"
    "time"

    "github.com/example/load-balancer/balancer"
)

func main() {
    // 创建测试实例
    instances := []*balancer.Instance{
        {ID: "1", Address: "192.168.1.1", Port: 8080, Weight: 3, Active: true},
        {ID: "2", Address: "192.168.1.2", Port: 8080, Weight: 2, Active: true},
        {ID: "3", Address: "192.168.1.3", Port: 8080, Weight: 1, Active: true},
    }

    // 测试不同的负载均衡算法
    factory := balancer.NewBalancerFactory()

    algorithms := []balancer.BalancerType{
        balancer.RoundRobin,
        balancer.WeightedRoundRobin,
        balancer.Random,
        balancer.LeastConnections,
    }

    for _, algorithm := range algorithms {
        fmt.Printf("\n=== Testing %s ===\n", algorithm)

        config := balancer.BalancerConfig{
            Type: algorithm,
        }

        lb, err := factory.CreateBalancer(config)
        if err != nil {
            log.Printf("Failed to create balancer: %v", err)
            continue
        }

        // 更新实例列表
        lb.UpdateInstances(instances)

        // 模拟请求分发
        results := make(map[string]int)
        for i := 0; i < 100; i++ {
            instance, err := lb.Select(context.Background())
            if err != nil {
                log.Printf("Selection failed: %v", err)
                continue
            }

            results[instance.ID]++

            // 模拟请求处理时间
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(10)))

            // 随机标记成功或失败
            if rand.Float32() < 0.9 {
                lb.MarkSuccess(instance.ID)
            } else {
                lb.MarkFailure(instance.ID)
            }
        }

        // 打印结果
        fmt.Printf("Distribution: %v\n", results)
        fmt.Printf("Stats: %v\n", lb.GetStats())
    }

    // 测试一致性哈希
    fmt.Printf("\n=== Testing Consistent Hash ===\n")

    config := balancer.BalancerConfig{
        Type:         balancer.ConsistentHash,
        VirtualNodes: 150,
    }

    chBalancer, err := factory.CreateBalancer(config)
    if err != nil {
        log.Printf("Failed to create consistent hash balancer: %v", err)
        return
    }

    chBalancer.UpdateInstances(instances)

    // 测试相同键的一致性
    testKeys := []string{"user1", "user2", "user3", "user1", "user2"}

    for _, key := range testKeys {
        ctx := context.WithValue(context.Background(), "hash_key", key)
        instance, err := chBalancer.Select(ctx)
        if err != nil {
            log.Printf("Selection failed: %v", err)
            continue
        }

        fmt.Printf("Key: %s -> Instance: %s\n", key, instance.ID)
    }
}

通过实现这些负载均衡算法,我们为微服务架构提供了多种请求分发策略。每种算法都有其适用场景:轮询适合处理能力相近的服务器,加权算法适合处理能力不同的服务器,最少连接适合长连接场景,一致性哈希适合缓存场景,而自适应算法则能根据实时性能动态调整。