5.4.2 负载均衡算法 #
负载均衡是分布式系统中的关键组件,它将客户端请求分配到多个服务器实例上,以提高系统的可用性、可扩展性和性能。本节将深入探讨各种负载均衡算法的原理和 Go 语言实现。
负载均衡基础理论 #
负载均衡的目标 #
- 提高可用性:避免单点故障,提供冗余
- 提升性能:分散负载,减少响应时间
- 增强扩展性:支持水平扩展
- 资源优化:充分利用服务器资源
负载均衡分类 #
按实现位置分类:
- 客户端负载均衡:在客户端实现负载均衡逻辑
- 服务端负载均衡:通过代理服务器实现负载均衡
按算法类型分类:
- 静态算法:不考虑服务器当前状态
- 动态算法:根据服务器实时状态调整
负载均衡器接口设计 #
核心接口定义 #
// balancer/interface.go
package balancer
import (
"context"
"errors"
)
var (
ErrNoAvailableInstance = errors.New("no available instance")
ErrInstanceNotFound = errors.New("instance not found")
)
// Instance 服务实例
type Instance struct {
ID string
Address string
Port int
Weight int
Metadata map[string]string
// 运行时状态
Active bool
Connections int
ResponseTime int64 // 毫秒
ErrorCount int64
LastUsed int64 // Unix 时间戳
}
// LoadBalancer 负载均衡器接口
type LoadBalancer interface {
// 选择实例
Select(ctx context.Context) (*Instance, error)
// 更新实例列表
UpdateInstances(instances []*Instance) error
// 标记实例状态
MarkSuccess(instanceID string)
MarkFailure(instanceID string)
// 获取统计信息
GetStats() map[string]interface{}
}
// WeightedInstance 带权重的实例
type WeightedInstance struct {
*Instance
CurrentWeight int
EffectiveWeight int
}
轮询算法实现 #
简单轮询(Round Robin) #
// balancer/round_robin.go
package balancer
import (
"context"
"sync"
"sync/atomic"
)
// RoundRobinBalancer 轮询负载均衡器
type RoundRobinBalancer struct {
instances []*Instance
current uint64
mutex sync.RWMutex
}
// NewRoundRobinBalancer 创建轮询负载均衡器
func NewRoundRobinBalancer() *RoundRobinBalancer {
return &RoundRobinBalancer{
instances: make([]*Instance, 0),
}
}
// Select 选择实例
func (r *RoundRobinBalancer) Select(ctx context.Context) (*Instance, error) {
r.mutex.RLock()
defer r.mutex.RUnlock()
if len(r.instances) == 0 {
return nil, ErrNoAvailableInstance
}
// 过滤活跃实例
activeInstances := make([]*Instance, 0)
for _, instance := range r.instances {
if instance.Active {
activeInstances = append(activeInstances, instance)
}
}
if len(activeInstances) == 0 {
return nil, ErrNoAvailableInstance
}
// 原子操作获取下一个索引
next := atomic.AddUint64(&r.current, 1)
index := (next - 1) % uint64(len(activeInstances))
return activeInstances[index], nil
}
// UpdateInstances 更新实例列表
func (r *RoundRobinBalancer) UpdateInstances(instances []*Instance) error {
r.mutex.Lock()
defer r.mutex.Unlock()
r.instances = make([]*Instance, len(instances))
copy(r.instances, instances)
return nil
}
// MarkSuccess 标记成功
func (r *RoundRobinBalancer) MarkSuccess(instanceID string) {
r.mutex.RLock()
defer r.mutex.RUnlock()
for _, instance := range r.instances {
if instance.ID == instanceID {
instance.ErrorCount = 0
break
}
}
}
// MarkFailure 标记失败
func (r *RoundRobinBalancer) MarkFailure(instanceID string) {
r.mutex.RLock()
defer r.mutex.RUnlock()
for _, instance := range r.instances {
if instance.ID == instanceID {
atomic.AddInt64(&instance.ErrorCount, 1)
// 如果错误次数过多,标记为不活跃
if instance.ErrorCount > 3 {
instance.Active = false
}
break
}
}
}
// GetStats 获取统计信息
func (r *RoundRobinBalancer) GetStats() map[string]interface{} {
r.mutex.RLock()
defer r.mutex.RUnlock()
activeCount := 0
totalConnections := 0
for _, instance := range r.instances {
if instance.Active {
activeCount++
}
totalConnections += instance.Connections
}
return map[string]interface{}{
"algorithm": "round_robin",
"total_instances": len(r.instances),
"active_instances": activeCount,
"total_connections": totalConnections,
"current_position": atomic.LoadUint64(&r.current),
}
}
加权轮询(Weighted Round Robin) #
// balancer/weighted_round_robin.go
package balancer
import (
"context"
"sync"
)
// WeightedRoundRobinBalancer 加权轮询负载均衡器
type WeightedRoundRobinBalancer struct {
instances []*WeightedInstance
mutex sync.RWMutex
}
// NewWeightedRoundRobinBalancer 创建加权轮询负载均衡器
func NewWeightedRoundRobinBalancer() *WeightedRoundRobinBalancer {
return &WeightedRoundRobinBalancer{
instances: make([]*WeightedInstance, 0),
}
}
// Select 选择实例
func (w *WeightedRoundRobinBalancer) Select(ctx context.Context) (*Instance, error) {
w.mutex.Lock()
defer w.mutex.Unlock()
if len(w.instances) == 0 {
return nil, ErrNoAvailableInstance
}
// 过滤活跃实例
activeInstances := make([]*WeightedInstance, 0)
for _, instance := range w.instances {
if instance.Active {
activeInstances = append(activeInstances, instance)
}
}
if len(activeInstances) == 0 {
return nil, ErrNoAvailableInstance
}
// 计算总权重
totalWeight := 0
for _, instance := range activeInstances {
totalWeight += instance.EffectiveWeight
}
if totalWeight <= 0 {
return nil, ErrNoAvailableInstance
}
// 选择权重最高的实例
var selected *WeightedInstance
maxCurrentWeight := -1
for _, instance := range activeInstances {
// 增加当前权重
instance.CurrentWeight += instance.EffectiveWeight
// 选择当前权重最高的实例
if selected == nil || instance.CurrentWeight > maxCurrentWeight {
selected = instance
maxCurrentWeight = instance.CurrentWeight
}
}
// 减少选中实例的当前权重
selected.CurrentWeight -= totalWeight
return selected.Instance, nil
}
// UpdateInstances 更新实例列表
func (w *WeightedRoundRobinBalancer) UpdateInstances(instances []*Instance) error {
w.mutex.Lock()
defer w.mutex.Unlock()
w.instances = make([]*WeightedInstance, len(instances))
for i, instance := range instances {
weight := instance.Weight
if weight <= 0 {
weight = 1 // 默认权重为1
}
w.instances[i] = &WeightedInstance{
Instance: instance,
CurrentWeight: 0,
EffectiveWeight: weight,
}
}
return nil
}
// MarkSuccess 标记成功
func (w *WeightedRoundRobinBalancer) MarkSuccess(instanceID string) {
w.mutex.Lock()
defer w.mutex.Unlock()
for _, instance := range w.instances {
if instance.ID == instanceID {
instance.ErrorCount = 0
// 恢复有效权重
if instance.EffectiveWeight < instance.Weight {
instance.EffectiveWeight++
}
break
}
}
}
// MarkFailure 标记失败
func (w *WeightedRoundRobinBalancer) MarkFailure(instanceID string) {
w.mutex.Lock()
defer w.mutex.Unlock()
for _, instance := range w.instances {
if instance.ID == instanceID {
instance.ErrorCount++
// 降低有效权重
if instance.EffectiveWeight > 0 {
instance.EffectiveWeight--
}
// 如果错误次数过多,标记为不活跃
if instance.ErrorCount > 3 {
instance.Active = false
}
break
}
}
}
// GetStats 获取统计信息
func (w *WeightedRoundRobinBalancer) GetStats() map[string]interface{} {
w.mutex.RLock()
defer w.mutex.RUnlock()
activeCount := 0
totalWeight := 0
for _, instance := range w.instances {
if instance.Active {
activeCount++
totalWeight += instance.EffectiveWeight
}
}
return map[string]interface{}{
"algorithm": "weighted_round_robin",
"total_instances": len(w.instances),
"active_instances": activeCount,
"total_weight": totalWeight,
}
}
随机算法实现 #
简单随机(Random) #
// balancer/random.go
package balancer
import (
"context"
"math/rand"
"sync"
"time"
)
// RandomBalancer 随机负载均衡器
type RandomBalancer struct {
instances []*Instance
random *rand.Rand
mutex sync.RWMutex
}
// NewRandomBalancer 创建随机负载均衡器
func NewRandomBalancer() *RandomBalancer {
return &RandomBalancer{
instances: make([]*Instance, 0),
random: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}
// Select 选择实例
func (r *RandomBalancer) Select(ctx context.Context) (*Instance, error) {
r.mutex.RLock()
defer r.mutex.RUnlock()
if len(r.instances) == 0 {
return nil, ErrNoAvailableInstance
}
// 过滤活跃实例
activeInstances := make([]*Instance, 0)
for _, instance := range r.instances {
if instance.Active {
activeInstances = append(activeInstances, instance)
}
}
if len(activeInstances) == 0 {
return nil, ErrNoAvailableInstance
}
// 随机选择
index := r.random.Intn(len(activeInstances))
return activeInstances[index], nil
}
// UpdateInstances 更新实例列表
func (r *RandomBalancer) UpdateInstances(instances []*Instance) error {
r.mutex.Lock()
defer r.mutex.Unlock()
r.instances = make([]*Instance, len(instances))
copy(r.instances, instances)
return nil
}
// MarkSuccess 标记成功
func (r *RandomBalancer) MarkSuccess(instanceID string) {
r.mutex.RLock()
defer r.mutex.RUnlock()
for _, instance := range r.instances {
if instance.ID == instanceID {
instance.ErrorCount = 0
break
}
}
}
// MarkFailure 标记失败
func (r *RandomBalancer) MarkFailure(instanceID string) {
r.mutex.RLock()
defer r.mutex.RUnlock()
for _, instance := range r.instances {
if instance.ID == instanceID {
instance.ErrorCount++
if instance.ErrorCount > 3 {
instance.Active = false
}
break
}
}
}
// GetStats 获取统计信息
func (r *RandomBalancer) GetStats() map[string]interface{} {
r.mutex.RLock()
defer r.mutex.RUnlock()
activeCount := 0
for _, instance := range r.instances {
if instance.Active {
activeCount++
}
}
return map[string]interface{}{
"algorithm": "random",
"total_instances": len(r.instances),
"active_instances": activeCount,
}
}
加权随机(Weighted Random) #
// balancer/weighted_random.go
package balancer
import (
"context"
"math/rand"
"sync"
"time"
)
// WeightedRandomBalancer 加权随机负载均衡器
type WeightedRandomBalancer struct {
instances []*Instance
random *rand.Rand
mutex sync.RWMutex
}
// NewWeightedRandomBalancer 创建加权随机负载均衡器
func NewWeightedRandomBalancer() *WeightedRandomBalancer {
return &WeightedRandomBalancer{
instances: make([]*Instance, 0),
random: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}
// Select 选择实例
func (w *WeightedRandomBalancer) Select(ctx context.Context) (*Instance, error) {
w.mutex.RLock()
defer w.mutex.RUnlock()
if len(w.instances) == 0 {
return nil, ErrNoAvailableInstance
}
// 过滤活跃实例并计算总权重
activeInstances := make([]*Instance, 0)
totalWeight := 0
for _, instance := range w.instances {
if instance.Active {
activeInstances = append(activeInstances, instance)
weight := instance.Weight
if weight <= 0 {
weight = 1
}
totalWeight += weight
}
}
if len(activeInstances) == 0 || totalWeight <= 0 {
return nil, ErrNoAvailableInstance
}
// 生成随机数
randomWeight := w.random.Intn(totalWeight)
// 根据权重选择实例
currentWeight := 0
for _, instance := range activeInstances {
weight := instance.Weight
if weight <= 0 {
weight = 1
}
currentWeight += weight
if randomWeight < currentWeight {
return instance, nil
}
}
// 理论上不应该到达这里,但为了安全返回最后一个实例
return activeInstances[len(activeInstances)-1], nil
}
// UpdateInstances 更新实例列表
func (w *WeightedRandomBalancer) UpdateInstances(instances []*Instance) error {
w.mutex.Lock()
defer w.mutex.Unlock()
w.instances = make([]*Instance, len(instances))
copy(w.instances, instances)
return nil
}
// MarkSuccess 标记成功
func (w *WeightedRandomBalancer) MarkSuccess(instanceID string) {
w.mutex.RLock()
defer w.mutex.RUnlock()
for _, instance := range w.instances {
if instance.ID == instanceID {
instance.ErrorCount = 0
break
}
}
}
// MarkFailure 标记失败
func (w *WeightedRandomBalancer) MarkFailure(instanceID string) {
w.mutex.RLock()
defer w.mutex.RUnlock()
for _, instance := range w.instances {
if instance.ID == instanceID {
instance.ErrorCount++
if instance.ErrorCount > 3 {
instance.Active = false
}
break
}
}
}
// GetStats 获取统计信息
func (w *WeightedRandomBalancer) GetStats() map[string]interface{} {
w.mutex.RLock()
defer w.mutex.RUnlock()
activeCount := 0
totalWeight := 0
for _, instance := range w.instances {
if instance.Active {
activeCount++
weight := instance.Weight
if weight <= 0 {
weight = 1
}
totalWeight += weight
}
}
return map[string]interface{}{
"algorithm": "weighted_random",
"total_instances": len(w.instances),
"active_instances": activeCount,
"total_weight": totalWeight,
}
}
最少连接算法 #
最少连接(Least Connections) #
// balancer/least_connections.go
package balancer
import (
"context"
"sync"
)
// LeastConnectionsBalancer 最少连接负载均衡器
type LeastConnectionsBalancer struct {
instances []*Instance
mutex sync.RWMutex
}
// NewLeastConnectionsBalancer 创建最少连接负载均衡器
func NewLeastConnectionsBalancer() *LeastConnectionsBalancer {
return &LeastConnectionsBalancer{
instances: make([]*Instance, 0),
}
}
// Select 选择实例
func (l *LeastConnectionsBalancer) Select(ctx context.Context) (*Instance, error) {
l.mutex.RLock()
defer l.mutex.RUnlock()
if len(l.instances) == 0 {
return nil, ErrNoAvailableInstance
}
// 找到连接数最少的活跃实例
var selected *Instance
minConnections := -1
for _, instance := range l.instances {
if !instance.Active {
continue
}
if selected == nil || instance.Connections < minConnections {
selected = instance
minConnections = instance.Connections
}
}
if selected == nil {
return nil, ErrNoAvailableInstance
}
// 增加连接数
selected.Connections++
return selected, nil
}
// UpdateInstances 更新实例列表
func (l *LeastConnectionsBalancer) UpdateInstances(instances []*Instance) error {
l.mutex.Lock()
defer l.mutex.Unlock()
l.instances = make([]*Instance, len(instances))
copy(l.instances, instances)
return nil
}
// MarkSuccess 标记成功
func (l *LeastConnectionsBalancer) MarkSuccess(instanceID string) {
l.mutex.Lock()
defer l.mutex.Unlock()
for _, instance := range l.instances {
if instance.ID == instanceID {
instance.ErrorCount = 0
// 减少连接数
if instance.Connections > 0 {
instance.Connections--
}
break
}
}
}
// MarkFailure 标记失败
func (l *LeastConnectionsBalancer) MarkFailure(instanceID string) {
l.mutex.Lock()
defer l.mutex.Unlock()
for _, instance := range l.instances {
if instance.ID == instanceID {
instance.ErrorCount++
// 减少连接数
if instance.Connections > 0 {
instance.Connections--
}
if instance.ErrorCount > 3 {
instance.Active = false
}
break
}
}
}
// GetStats 获取统计信息
func (l *LeastConnectionsBalancer) GetStats() map[string]interface{} {
l.mutex.RLock()
defer l.mutex.RUnlock()
activeCount := 0
totalConnections := 0
minConnections := -1
maxConnections := 0
for _, instance := range l.instances {
if instance.Active {
activeCount++
totalConnections += instance.Connections
if minConnections == -1 || instance.Connections < minConnections {
minConnections = instance.Connections
}
if instance.Connections > maxConnections {
maxConnections = instance.Connections
}
}
}
if minConnections == -1 {
minConnections = 0
}
return map[string]interface{}{
"algorithm": "least_connections",
"total_instances": len(l.instances),
"active_instances": activeCount,
"total_connections": totalConnections,
"min_connections": minConnections,
"max_connections": maxConnections,
}
}
一致性哈希算法 #
一致性哈希实现 #
// balancer/consistent_hash.go
package balancer
import (
"context"
"crypto/sha1"
"fmt"
"sort"
"strconv"
"sync"
)
// ConsistentHashBalancer 一致性哈希负载均衡器
type ConsistentHashBalancer struct {
instances []*Instance
hashRing map[uint32]*Instance
sortedHashes []uint32
virtualNodes int
mutex sync.RWMutex
}
// NewConsistentHashBalancer 创建一致性哈希负载均衡器
func NewConsistentHashBalancer(virtualNodes int) *ConsistentHashBalancer {
if virtualNodes <= 0 {
virtualNodes = 150 // 默认虚拟节点数
}
return &ConsistentHashBalancer{
instances: make([]*Instance, 0),
hashRing: make(map[uint32]*Instance),
sortedHashes: make([]uint32, 0),
virtualNodes: virtualNodes,
}
}
// Select 选择实例(需要提供键值)
func (c *ConsistentHashBalancer) Select(ctx context.Context) (*Instance, error) {
// 一致性哈希需要键值,这里从上下文中获取
key, ok := ctx.Value("hash_key").(string)
if !ok {
return nil, fmt.Errorf("hash_key not found in context")
}
return c.SelectByKey(key)
}
// SelectByKey 根据键选择实例
func (c *ConsistentHashBalancer) SelectByKey(key string) (*Instance, error) {
c.mutex.RLock()
defer c.mutex.RUnlock()
if len(c.sortedHashes) == 0 {
return nil, ErrNoAvailableInstance
}
hash := c.hashKey(key)
// 在哈希环上找到第一个大于等于该哈希值的节点
idx := sort.Search(len(c.sortedHashes), func(i int) bool {
return c.sortedHashes[i] >= hash
})
// 如果没找到,选择第一个节点(环形结构)
if idx == len(c.sortedHashes) {
idx = 0
}
selectedHash := c.sortedHashes[idx]
instance := c.hashRing[selectedHash]
// 如果选中的实例不活跃,尝试找下一个活跃的实例
if !instance.Active {
for i := 1; i < len(c.sortedHashes); i++ {
nextIdx := (idx + i) % len(c.sortedHashes)
nextHash := c.sortedHashes[nextIdx]
nextInstance := c.hashRing[nextHash]
if nextInstance.Active {
return nextInstance, nil
}
}
return nil, ErrNoAvailableInstance
}
return instance, nil
}
// UpdateInstances 更新实例列表
func (c *ConsistentHashBalancer) UpdateInstances(instances []*Instance) error {
c.mutex.Lock()
defer c.mutex.Unlock()
// 清空现有的哈希环
c.hashRing = make(map[uint32]*Instance)
c.sortedHashes = make([]uint32, 0)
c.instances = make([]*Instance, len(instances))
copy(c.instances, instances)
// 为每个实例创建虚拟节点
for _, instance := range instances {
c.addInstanceToRing(instance)
}
// 排序哈希值
sort.Slice(c.sortedHashes, func(i, j int) bool {
return c.sortedHashes[i] < c.sortedHashes[j]
})
return nil
}
// addInstanceToRing 将实例添加到哈希环
func (c *ConsistentHashBalancer) addInstanceToRing(instance *Instance) {
for i := 0; i < c.virtualNodes; i++ {
virtualKey := fmt.Sprintf("%s:%d:%d", instance.Address, instance.Port, i)
hash := c.hashKey(virtualKey)
c.hashRing[hash] = instance
c.sortedHashes = append(c.sortedHashes, hash)
}
}
// hashKey 计算键的哈希值
func (c *ConsistentHashBalancer) hashKey(key string) uint32 {
h := sha1.New()
h.Write([]byte(key))
hashBytes := h.Sum(nil)
// 取前4个字节作为uint32
return uint32(hashBytes[0])<<24 | uint32(hashBytes[1])<<16 |
uint32(hashBytes[2])<<8 | uint32(hashBytes[3])
}
// MarkSuccess 标记成功
func (c *ConsistentHashBalancer) MarkSuccess(instanceID string) {
c.mutex.RLock()
defer c.mutex.RUnlock()
for _, instance := range c.instances {
if instance.ID == instanceID {
instance.ErrorCount = 0
break
}
}
}
// MarkFailure 标记失败
func (c *ConsistentHashBalancer) MarkFailure(instanceID string) {
c.mutex.RLock()
defer c.mutex.RUnlock()
for _, instance := range c.instances {
if instance.ID == instanceID {
instance.ErrorCount++
if instance.ErrorCount > 3 {
instance.Active = false
}
break
}
}
}
// GetStats 获取统计信息
func (c *ConsistentHashBalancer) GetStats() map[string]interface{} {
c.mutex.RLock()
defer c.mutex.RUnlock()
activeCount := 0
for _, instance := range c.instances {
if instance.Active {
activeCount++
}
}
return map[string]interface{}{
"algorithm": "consistent_hash",
"total_instances": len(c.instances),
"active_instances": activeCount,
"virtual_nodes": c.virtualNodes,
"hash_ring_size": len(c.hashRing),
}
}
自适应负载均衡 #
响应时间加权算法 #
// balancer/response_time_weighted.go
package balancer
import (
"context"
"sync"
"time"
)
// ResponseTimeWeightedBalancer 响应时间加权负载均衡器
type ResponseTimeWeightedBalancer struct {
instances []*Instance
mutex sync.RWMutex
// 配置参数
decayFactor float64 // 衰减因子
minWeight int // 最小权重
maxWeight int // 最大权重
}
// NewResponseTimeWeightedBalancer 创建响应时间加权负载均衡器
func NewResponseTimeWeightedBalancer() *ResponseTimeWeightedBalancer {
return &ResponseTimeWeightedBalancer{
instances: make([]*Instance, 0),
decayFactor: 0.9,
minWeight: 1,
maxWeight: 100,
}
}
// Select 选择实例
func (r *ResponseTimeWeightedBalancer) Select(ctx context.Context) (*Instance, error) {
r.mutex.RLock()
defer r.mutex.RUnlock()
if len(r.instances) == 0 {
return nil, ErrNoAvailableInstance
}
// 计算每个实例的动态权重
var selected *Instance
maxScore := float64(-1)
for _, instance := range r.instances {
if !instance.Active {
continue
}
// 计算得分(权重/响应时间)
responseTime := float64(instance.ResponseTime)
if responseTime <= 0 {
responseTime = 1 // 避免除零
}
weight := float64(instance.Weight)
if weight <= 0 {
weight = 1
}
// 考虑错误率的影响
errorPenalty := float64(instance.ErrorCount) * 0.1
score := weight / (responseTime + errorPenalty)
if selected == nil || score > maxScore {
selected = instance
maxScore = score
}
}
if selected == nil {
return nil, ErrNoAvailableInstance
}
// 记录使用时间
selected.LastUsed = time.Now().Unix()
return selected, nil
}
// UpdateInstances 更新实例列表
func (r *ResponseTimeWeightedBalancer) UpdateInstances(instances []*Instance) error {
r.mutex.Lock()
defer r.mutex.Unlock()
r.instances = make([]*Instance, len(instances))
copy(r.instances, instances)
return nil
}
// UpdateResponseTime 更新响应时间
func (r *ResponseTimeWeightedBalancer) UpdateResponseTime(instanceID string, responseTime int64) {
r.mutex.Lock()
defer r.mutex.Unlock()
for _, instance := range r.instances {
if instance.ID == instanceID {
// 使用指数移动平均更新响应时间
if instance.ResponseTime == 0 {
instance.ResponseTime = responseTime
} else {
oldTime := float64(instance.ResponseTime)
newTime := float64(responseTime)
instance.ResponseTime = int64(oldTime*r.decayFactor + newTime*(1-r.decayFactor))
}
break
}
}
}
// MarkSuccess 标记成功
func (r *ResponseTimeWeightedBalancer) MarkSuccess(instanceID string) {
r.mutex.Lock()
defer r.mutex.Unlock()
for _, instance := range r.instances {
if instance.ID == instanceID {
instance.ErrorCount = 0
break
}
}
}
// MarkFailure 标记失败
func (r *ResponseTimeWeightedBalancer) MarkFailure(instanceID string) {
r.mutex.Lock()
defer r.mutex.Unlock()
for _, instance := range r.instances {
if instance.ID == instanceID {
instance.ErrorCount++
if instance.ErrorCount > 3 {
instance.Active = false
}
break
}
}
}
// GetStats 获取统计信息
func (r *ResponseTimeWeightedBalancer) GetStats() map[string]interface{} {
r.mutex.RLock()
defer r.mutex.RUnlock()
activeCount := 0
totalResponseTime := int64(0)
minResponseTime := int64(-1)
maxResponseTime := int64(0)
for _, instance := range r.instances {
if instance.Active {
activeCount++
totalResponseTime += instance.ResponseTime
if minResponseTime == -1 || instance.ResponseTime < minResponseTime {
minResponseTime = instance.ResponseTime
}
if instance.ResponseTime > maxResponseTime {
maxResponseTime = instance.ResponseTime
}
}
}
avgResponseTime := int64(0)
if activeCount > 0 {
avgResponseTime = totalResponseTime / int64(activeCount)
}
if minResponseTime == -1 {
minResponseTime = 0
}
return map[string]interface{}{
"algorithm": "response_time_weighted",
"total_instances": len(r.instances),
"active_instances": activeCount,
"avg_response_time": avgResponseTime,
"min_response_time": minResponseTime,
"max_response_time": maxResponseTime,
"decay_factor": r.decayFactor,
}
}
负载均衡器工厂 #
工厂模式实现 #
// balancer/factory.go
package balancer
import (
"fmt"
"strings"
)
// BalancerType 负载均衡器类型
type BalancerType string
const (
RoundRobin BalancerType = "round_robin"
WeightedRoundRobin BalancerType = "weighted_round_robin"
Random BalancerType = "random"
WeightedRandom BalancerType = "weighted_random"
LeastConnections BalancerType = "least_connections"
ConsistentHash BalancerType = "consistent_hash"
ResponseTimeWeighted BalancerType = "response_time_weighted"
)
// BalancerConfig 负载均衡器配置
type BalancerConfig struct {
Type BalancerType
VirtualNodes int // 用于一致性哈希
}
// BalancerFactory 负载均衡器工厂
type BalancerFactory struct{}
// NewBalancerFactory 创建工厂实例
func NewBalancerFactory() *BalancerFactory {
return &BalancerFactory{}
}
// CreateBalancer 创建负载均衡器
func (f *BalancerFactory) CreateBalancer(config BalancerConfig) (LoadBalancer, error) {
switch config.Type {
case RoundRobin:
return NewRoundRobinBalancer(), nil
case WeightedRoundRobin:
return NewWeightedRoundRobinBalancer(), nil
case Random:
return NewRandomBalancer(), nil
case WeightedRandom:
return NewWeightedRandomBalancer(), nil
case LeastConnections:
return NewLeastConnectionsBalancer(), nil
case ConsistentHash:
return NewConsistentHashBalancer(config.VirtualNodes), nil
case ResponseTimeWeighted:
return NewResponseTimeWeightedBalancer(), nil
default:
return nil, fmt.Errorf("unsupported balancer type: %s", config.Type)
}
}
// GetSupportedTypes 获取支持的类型列表
func (f *BalancerFactory) GetSupportedTypes() []BalancerType {
return []BalancerType{
RoundRobin,
WeightedRoundRobin,
Random,
WeightedRandom,
LeastConnections,
ConsistentHash,
ResponseTimeWeighted,
}
}
// ParseBalancerType 解析负载均衡器类型
func (f *BalancerFactory) ParseBalancerType(typeStr string) (BalancerType, error) {
normalized := strings.ToLower(strings.TrimSpace(typeStr))
switch normalized {
case "round_robin", "roundrobin", "rr":
return RoundRobin, nil
case "weighted_round_robin", "weightedroundrobin", "wrr":
return WeightedRoundRobin, nil
case "random", "rand":
return Random, nil
case "weighted_random", "weightedrandom", "wr":
return WeightedRandom, nil
case "least_connections", "leastconnections", "lc":
return LeastConnections, nil
case "consistent_hash", "consistenthash", "ch":
return ConsistentHash, nil
case "response_time_weighted", "responsetimeweighted", "rtw":
return ResponseTimeWeighted, nil
default:
return "", fmt.Errorf("unknown balancer type: %s", typeStr)
}
}
使用示例 #
负载均衡器测试 #
// example/balancer_test.go
package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"github.com/example/load-balancer/balancer"
)
func main() {
// 创建测试实例
instances := []*balancer.Instance{
{ID: "1", Address: "192.168.1.1", Port: 8080, Weight: 3, Active: true},
{ID: "2", Address: "192.168.1.2", Port: 8080, Weight: 2, Active: true},
{ID: "3", Address: "192.168.1.3", Port: 8080, Weight: 1, Active: true},
}
// 测试不同的负载均衡算法
factory := balancer.NewBalancerFactory()
algorithms := []balancer.BalancerType{
balancer.RoundRobin,
balancer.WeightedRoundRobin,
balancer.Random,
balancer.LeastConnections,
}
for _, algorithm := range algorithms {
fmt.Printf("\n=== Testing %s ===\n", algorithm)
config := balancer.BalancerConfig{
Type: algorithm,
}
lb, err := factory.CreateBalancer(config)
if err != nil {
log.Printf("Failed to create balancer: %v", err)
continue
}
// 更新实例列表
lb.UpdateInstances(instances)
// 模拟请求分发
results := make(map[string]int)
for i := 0; i < 100; i++ {
instance, err := lb.Select(context.Background())
if err != nil {
log.Printf("Selection failed: %v", err)
continue
}
results[instance.ID]++
// 模拟请求处理时间
time.Sleep(time.Millisecond * time.Duration(rand.Intn(10)))
// 随机标记成功或失败
if rand.Float32() < 0.9 {
lb.MarkSuccess(instance.ID)
} else {
lb.MarkFailure(instance.ID)
}
}
// 打印结果
fmt.Printf("Distribution: %v\n", results)
fmt.Printf("Stats: %v\n", lb.GetStats())
}
// 测试一致性哈希
fmt.Printf("\n=== Testing Consistent Hash ===\n")
config := balancer.BalancerConfig{
Type: balancer.ConsistentHash,
VirtualNodes: 150,
}
chBalancer, err := factory.CreateBalancer(config)
if err != nil {
log.Printf("Failed to create consistent hash balancer: %v", err)
return
}
chBalancer.UpdateInstances(instances)
// 测试相同键的一致性
testKeys := []string{"user1", "user2", "user3", "user1", "user2"}
for _, key := range testKeys {
ctx := context.WithValue(context.Background(), "hash_key", key)
instance, err := chBalancer.Select(ctx)
if err != nil {
log.Printf("Selection failed: %v", err)
continue
}
fmt.Printf("Key: %s -> Instance: %s\n", key, instance.ID)
}
}
通过实现这些负载均衡算法,我们为微服务架构提供了多种请求分发策略。每种算法都有其适用场景:轮询适合处理能力相近的服务器,加权算法适合处理能力不同的服务器,最少连接适合长连接场景,一致性哈希适合缓存场景,而自适应算法则能根据实时性能动态调整。