5.5.2 动态配置更新

5.5.2 动态配置更新 #

动态配置更新是配置中心的核心功能之一,它允许在不重启服务的情况下实时更新配置。这对于提高系统的可用性和运维效率至关重要。本节将详细介绍动态配置更新的实现机制和最佳实践。

配置更新模式 #

推送模式 (Push Mode) #

推送模式是配置中心主动将配置变更推送给客户端的方式:

// 配置推送服务
type ConfigPushService struct {
    clients    sync.Map // 存储所有连接的客户端
    storage    Storage
    notifier   Notifier
    logger     *log.Logger
}

// 客户端连接信息
type ClientConnection struct {
    ID          string
    Environment string
    Namespace   string
    Application string
    Connection  *websocket.Conn
    LastPing    time.Time
    Subscriptions map[string]bool // 订阅的配置键
}

func NewConfigPushService(storage Storage, notifier Notifier) *ConfigPushService {
    service := &ConfigPushService{
        storage:  storage,
        notifier: notifier,
        logger:   log.New(os.Stdout, "[ConfigPush] ", log.LstdFlags),
    }

    // 启动配置变更监听
    go service.watchConfigChanges()
    // 启动客户端健康检查
    go service.healthCheck()

    return service
}

// WebSocket连接处理
func (s *ConfigPushService) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
    upgrader := websocket.Upgrader{
        CheckOrigin: func(r *http.Request) bool {
            return true // 生产环境需要严格的Origin检查
        },
    }

    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        s.logger.Printf("WebSocket升级失败: %v", err)
        return
    }
    defer conn.Close()

    // 解析客户端信息
    client := s.parseClientInfo(r)
    client.Connection = conn
    client.LastPing = time.Now()

    s.clients.Store(client.ID, client)
    defer s.clients.Delete(client.ID)

    s.logger.Printf("客户端连接: %s", client.ID)

    // 发送初始配置
    s.sendInitialConfig(client)

    // 处理客户端消息
    for {
        var msg ClientMessage
        if err := conn.ReadJSON(&msg); err != nil {
            if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
                s.logger.Printf("WebSocket错误: %v", err)
            }
            break
        }

        s.handleClientMessage(client, &msg)
    }
}

type ClientMessage struct {
    Type    string      `json:"type"`
    Payload interface{} `json:"payload"`
}

// 处理客户端消息
func (s *ConfigPushService) handleClientMessage(client *ClientConnection, msg *ClientMessage) {
    switch msg.Type {
    case "ping":
        client.LastPing = time.Now()
        s.sendMessage(client, &ServerMessage{
            Type: "pong",
            Timestamp: time.Now(),
        })

    case "subscribe":
        if payload, ok := msg.Payload.(map[string]interface{}); ok {
            if keys, ok := payload["keys"].([]interface{}); ok {
                for _, key := range keys {
                    if keyStr, ok := key.(string); ok {
                        client.Subscriptions[keyStr] = true
                    }
                }
            }
        }

    case "unsubscribe":
        if payload, ok := msg.Payload.(map[string]interface{}); ok {
            if keys, ok := payload["keys"].([]interface{}); ok {
                for _, key := range keys {
                    if keyStr, ok := key.(string); ok {
                        delete(client.Subscriptions, keyStr)
                    }
                }
            }
        }
    }
}

type ServerMessage struct {
    Type      string      `json:"type"`
    Key       string      `json:"key,omitempty"`
    Value     interface{} `json:"value,omitempty"`
    Version   int64       `json:"version,omitempty"`
    Timestamp time.Time   `json:"timestamp"`
}

// 监听配置变更
func (s *ConfigPushService) watchConfigChanges() {
    eventChan, err := s.storage.Watch("")
    if err != nil {
        s.logger.Printf("监听配置变更失败: %v", err)
        return
    }

    for event := range eventChan {
        s.broadcastConfigChange(event)
    }
}

// 广播配置变更
func (s *ConfigPushService) broadcastConfigChange(event *ConfigEvent) {
    s.clients.Range(func(key, value interface{}) bool {
        client := value.(*ClientConnection)

        // 检查客户端是否订阅了该配置
        if !s.shouldNotifyClient(client, event.Key) {
            return true
        }

        message := &ServerMessage{
            Type:      "config_change",
            Key:       event.Key,
            Timestamp: event.Timestamp,
        }

        switch event.Type {
        case EventTypeCreate, EventTypeUpdate:
            message.Value = event.NewValue.Value
            message.Version = event.NewValue.Version
        case EventTypeDelete:
            message.Value = nil
        }

        s.sendMessage(client, message)
        return true
    })
}

// 检查是否应该通知客户端
func (s *ConfigPushService) shouldNotifyClient(client *ClientConnection, configKey string) bool {
    // 检查配置是否属于客户端的环境、命名空间和应用
    parts := strings.Split(configKey, "/")
    if len(parts) < 3 {
        return false
    }

    env, namespace, app := parts[0], parts[1], parts[2]

    if client.Environment != env || client.Namespace != namespace || client.Application != app {
        return false
    }

    // 检查客户端是否订阅了该配置
    if len(client.Subscriptions) > 0 {
        return client.Subscriptions[configKey]
    }

    return true
}

// 发送消息给客户端
func (s *ConfigPushService) sendMessage(client *ClientConnection, message *ServerMessage) {
    if err := client.Connection.WriteJSON(message); err != nil {
        s.logger.Printf("发送消息失败 [%s]: %v", client.ID, err)
        s.clients.Delete(client.ID)
    }
}

拉取模式 (Pull Mode) #

拉取模式是客户端主动向配置中心请求配置的方式:

// 配置拉取客户端
type ConfigPullClient struct {
    serverURL   string
    environment string
    namespace   string
    application string
    client      *http.Client
    cache       sync.Map
    watchers    sync.Map
    pullInterval time.Duration
    logger      *log.Logger
}

func NewConfigPullClient(serverURL, env, namespace, app string) *ConfigPullClient {
    return &ConfigPullClient{
        serverURL:    serverURL,
        environment:  env,
        namespace:    namespace,
        application:  app,
        client:       &http.Client{Timeout: 10 * time.Second},
        pullInterval: 30 * time.Second,
        logger:       log.New(os.Stdout, "[ConfigPull] ", log.LstdFlags),
    }
}

// 启动配置拉取
func (c *ConfigPullClient) Start() {
    // 立即拉取一次配置
    c.pullAllConfigs()

    // 启动定时拉取
    ticker := time.NewTicker(c.pullInterval)
    go func() {
        for range ticker.C {
            c.pullAllConfigs()
        }
    }()
}

// 拉取所有配置
func (c *ConfigPullClient) pullAllConfigs() {
    url := fmt.Sprintf("%s/api/v1/configs/%s/%s/%s",
        c.serverURL, c.environment, c.namespace, c.application)

    resp, err := c.client.Get(url)
    if err != nil {
        c.logger.Printf("拉取配置失败: %v", err)
        return
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        c.logger.Printf("拉取配置失败, 状态码: %d", resp.StatusCode)
        return
    }

    var configs map[string]*ConfigItem
    if err := json.NewDecoder(resp.Body).Decode(&configs); err != nil {
        c.logger.Printf("解析配置失败: %v", err)
        return
    }

    // 检查配置变更
    for key, newConfig := range configs {
        if oldConfig, exists := c.cache.Load(key); exists {
            if oldItem := oldConfig.(*ConfigItem); oldItem.Version != newConfig.Version {
                c.notifyWatchers(key, oldItem, newConfig)
            }
        } else {
            c.notifyWatchers(key, nil, newConfig)
        }
        c.cache.Store(key, newConfig)
    }

    // 检查删除的配置
    c.cache.Range(func(key, value interface{}) bool {
        if _, exists := configs[key.(string)]; !exists {
            c.notifyWatchers(key.(string), value.(*ConfigItem), nil)
            c.cache.Delete(key)
        }
        return true
    })
}

// 长轮询实现
func (c *ConfigPullClient) LongPoll(keys []string, timeout time.Duration) {
    url := fmt.Sprintf("%s/api/v1/configs/watch", c.serverURL)

    requestBody := map[string]interface{}{
        "environment": c.environment,
        "namespace":   c.namespace,
        "application": c.application,
        "keys":        keys,
        "timeout":     int(timeout.Seconds()),
    }

    data, _ := json.Marshal(requestBody)

    for {
        req, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
        if err != nil {
            c.logger.Printf("创建长轮询请求失败: %v", err)
            time.Sleep(5 * time.Second)
            continue
        }

        req.Header.Set("Content-Type", "application/json")

        resp, err := c.client.Do(req)
        if err != nil {
            c.logger.Printf("长轮询请求失败: %v", err)
            time.Sleep(5 * time.Second)
            continue
        }

        if resp.StatusCode == http.StatusOK {
            var changes []ConfigChange
            if err := json.NewDecoder(resp.Body).Decode(&changes); err == nil {
                for _, change := range changes {
                    c.handleConfigChange(&change)
                }
            }
        }

        resp.Body.Close()

        if resp.StatusCode != http.StatusNotModified {
            time.Sleep(time.Second)
        }
    }
}

type ConfigChange struct {
    Key      string      `json:"key"`
    Type     string      `json:"type"`
    OldValue *ConfigItem `json:"old_value"`
    NewValue *ConfigItem `json:"new_value"`
}

func (c *ConfigPullClient) handleConfigChange(change *ConfigChange) {
    switch change.Type {
    case "create", "update":
        c.cache.Store(change.Key, change.NewValue)
        c.notifyWatchers(change.Key, change.OldValue, change.NewValue)
    case "delete":
        c.cache.Delete(change.Key)
        c.notifyWatchers(change.Key, change.OldValue, nil)
    }
}

配置监听和回调机制 #

配置监听器 #

// 配置监听器接口
type ConfigWatcher interface {
    OnConfigChange(key string, oldValue, newValue *ConfigItem)
}

// 函数式监听器
type ConfigWatcherFunc func(key string, oldValue, newValue *ConfigItem)

func (f ConfigWatcherFunc) OnConfigChange(key string, oldValue, newValue *ConfigItem) {
    f(key, oldValue, newValue)
}

// 配置管理器
type ConfigManager struct {
    client   ConfigClient
    watchers sync.Map // key -> []ConfigWatcher
    configs  sync.Map // key -> *ConfigItem
    mutex    sync.RWMutex
}

func NewConfigManager(client ConfigClient) *ConfigManager {
    return &ConfigManager{
        client: client,
    }
}

// 注册配置监听器
func (m *ConfigManager) Watch(key string, watcher ConfigWatcher) {
    m.mutex.Lock()
    defer m.mutex.Unlock()

    var watchers []ConfigWatcher
    if existing, ok := m.watchers.Load(key); ok {
        watchers = existing.([]ConfigWatcher)
    }

    watchers = append(watchers, watcher)
    m.watchers.Store(key, watchers)
}

// 取消监听
func (m *ConfigManager) Unwatch(key string, watcher ConfigWatcher) {
    m.mutex.Lock()
    defer m.mutex.Unlock()

    if existing, ok := m.watchers.Load(key); ok {
        watchers := existing.([]ConfigWatcher)
        for i, w := range watchers {
            if w == watcher {
                watchers = append(watchers[:i], watchers[i+1:]...)
                break
            }
        }

        if len(watchers) == 0 {
            m.watchers.Delete(key)
        } else {
            m.watchers.Store(key, watchers)
        }
    }
}

// 通知监听器
func (m *ConfigManager) notifyWatchers(key string, oldValue, newValue *ConfigItem) {
    if watchers, ok := m.watchers.Load(key); ok {
        for _, watcher := range watchers.([]ConfigWatcher) {
            go func(w ConfigWatcher) {
                defer func() {
                    if r := recover(); r != nil {
                        log.Printf("配置监听器异常: %v", r)
                    }
                }()
                w.OnConfigChange(key, oldValue, newValue)
            }(watcher)
        }
    }
}

// 获取配置
func (m *ConfigManager) GetConfig(key string) (*ConfigItem, bool) {
    if value, ok := m.configs.Load(key); ok {
        return value.(*ConfigItem), true
    }
    return nil, false
}

// 获取字符串配置
func (m *ConfigManager) GetString(key, defaultValue string) string {
    if config, ok := m.GetConfig(key); ok {
        return config.Value
    }
    return defaultValue
}

// 获取整数配置
func (m *ConfigManager) GetInt(key string, defaultValue int) int {
    if config, ok := m.GetConfig(key); ok {
        if value, err := strconv.Atoi(config.Value); err == nil {
            return value
        }
    }
    return defaultValue
}

// 获取布尔配置
func (m *ConfigManager) GetBool(key string, defaultValue bool) bool {
    if config, ok := m.GetConfig(key); ok {
        if value, err := strconv.ParseBool(config.Value); err == nil {
            return value
        }
    }
    return defaultValue
}

配置热更新实现 #

// 热更新配置管理器
type HotReloadConfigManager struct {
    *ConfigManager
    reloadHandlers sync.Map // key -> ReloadHandler
}

// 重载处理器接口
type ReloadHandler interface {
    Reload(key string, newValue *ConfigItem) error
    Validate(key string, newValue *ConfigItem) error
}

func NewHotReloadConfigManager(client ConfigClient) *HotReloadConfigManager {
    base := NewConfigManager(client)
    manager := &HotReloadConfigManager{
        ConfigManager: base,
    }

    // 注册全局配置变更监听器
    base.Watch("*", ConfigWatcherFunc(manager.handleConfigChange))

    return manager
}

// 注册重载处理器
func (m *HotReloadConfigManager) RegisterReloadHandler(key string, handler ReloadHandler) {
    m.reloadHandlers.Store(key, handler)
}

// 处理配置变更
func (m *HotReloadConfigManager) handleConfigChange(key string, oldValue, newValue *ConfigItem) {
    if handler, ok := m.reloadHandlers.Load(key); ok {
        reloadHandler := handler.(ReloadHandler)

        // 验证新配置
        if newValue != nil {
            if err := reloadHandler.Validate(key, newValue); err != nil {
                log.Printf("配置验证失败 [%s]: %v", key, err)
                return
            }
        }

        // 执行重载
        if err := reloadHandler.Reload(key, newValue); err != nil {
            log.Printf("配置重载失败 [%s]: %v", key, err)
        } else {
            log.Printf("配置重载成功 [%s]", key)
        }
    }
}

// 数据库连接池重载处理器示例
type DBPoolReloadHandler struct {
    pool *sql.DB
    mu   sync.Mutex
}

func (h *DBPoolReloadHandler) Validate(key string, newValue *ConfigItem) error {
    var config DBConfig
    if err := json.Unmarshal([]byte(newValue.Value), &config); err != nil {
        return fmt.Errorf("解析数据库配置失败: %v", err)
    }

    // 验证配置参数
    if config.MaxOpenConns <= 0 || config.MaxIdleConns <= 0 {
        return fmt.Errorf("数据库连接池参数无效")
    }

    return nil
}

func (h *DBPoolReloadHandler) Reload(key string, newValue *ConfigItem) error {
    h.mu.Lock()
    defer h.mu.Unlock()

    var config DBConfig
    if err := json.Unmarshal([]byte(newValue.Value), &config); err != nil {
        return err
    }

    // 更新连接池配置
    h.pool.SetMaxOpenConns(config.MaxOpenConns)
    h.pool.SetMaxIdleConns(config.MaxIdleConns)
    h.pool.SetConnMaxLifetime(time.Duration(config.ConnMaxLifetime) * time.Second)

    return nil
}

type DBConfig struct {
    MaxOpenConns    int `json:"max_open_conns"`
    MaxIdleConns    int `json:"max_idle_conns"`
    ConnMaxLifetime int `json:"conn_max_lifetime"`
}

配置更新的一致性保证 #

分布式锁实现 #

// 配置更新锁管理器
type ConfigUpdateLockManager struct {
    redis  *redis.Client
    prefix string
    ttl    time.Duration
}

func NewConfigUpdateLockManager(redis *redis.Client) *ConfigUpdateLockManager {
    return &ConfigUpdateLockManager{
        redis:  redis,
        prefix: "config_lock:",
        ttl:    30 * time.Second,
    }
}

// 获取配置更新锁
func (m *ConfigUpdateLockManager) AcquireLock(key string) (*ConfigLock, error) {
    lockKey := m.prefix + key
    lockValue := fmt.Sprintf("%d_%s", time.Now().UnixNano(), uuid.New().String())

    // 尝试获取锁
    result, err := m.redis.SetNX(context.Background(), lockKey, lockValue, m.ttl).Result()
    if err != nil {
        return nil, err
    }

    if !result {
        return nil, ErrLockAcquisitionFailed
    }

    lock := &ConfigLock{
        key:     lockKey,
        value:   lockValue,
        manager: m,
        ttl:     m.ttl,
    }

    // 启动锁续期
    go lock.renewLock()

    return lock, nil
}

type ConfigLock struct {
    key     string
    value   string
    manager *ConfigUpdateLockManager
    ttl     time.Duration
    released int32
}

// 释放锁
func (l *ConfigLock) Release() error {
    if !atomic.CompareAndSwapInt32(&l.released, 0, 1) {
        return nil // 已经释放
    }

    script := `
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
    `

    _, err := l.manager.redis.Eval(context.Background(), script, []string{l.key}, l.value).Result()
    return err
}

// 锁续期
func (l *ConfigLock) renewLock() {
    ticker := time.NewTicker(l.ttl / 3)
    defer ticker.Stop()

    for range ticker.C {
        if atomic.LoadInt32(&l.released) == 1 {
            return
        }

        script := `
            if redis.call("get", KEYS[1]) == ARGV[1] then
                return redis.call("expire", KEYS[1], ARGV[2])
            else
                return 0
            end
        `

        l.manager.redis.Eval(context.Background(), script, []string{l.key}, l.value, int(l.ttl.Seconds()))
    }
}

配置更新事务 #

// 配置更新事务管理器
type ConfigTransactionManager struct {
    storage Storage
    lockManager *ConfigUpdateLockManager
}

// 配置更新事务
type ConfigTransaction struct {
    id      string
    manager *ConfigTransactionManager
    locks   []*ConfigLock
    changes []ConfigChange
    status  TransactionStatus
}

type TransactionStatus int

const (
    TransactionStatusPending TransactionStatus = iota
    TransactionStatusCommitted
    TransactionStatusRolledBack
)

// 开始事务
func (m *ConfigTransactionManager) BeginTransaction() *ConfigTransaction {
    return &ConfigTransaction{
        id:      uuid.New().String(),
        manager: m,
        status:  TransactionStatusPending,
    }
}

// 添加配置变更
func (t *ConfigTransaction) Set(key string, value *ConfigItem) error {
    if t.status != TransactionStatusPending {
        return ErrTransactionNotPending
    }

    // 获取锁
    lock, err := t.manager.lockManager.AcquireLock(key)
    if err != nil {
        return err
    }

    t.locks = append(t.locks, lock)

    // 记录变更
    t.changes = append(t.changes, ConfigChange{
        Key:      key,
        Type:     "set",
        NewValue: value,
    })

    return nil
}

// 提交事务
func (t *ConfigTransaction) Commit() error {
    if t.status != TransactionStatusPending {
        return ErrTransactionNotPending
    }

    defer t.releaseLocks()

    // 执行所有变更
    for _, change := range t.changes {
        switch change.Type {
        case "set":
            if err := t.manager.storage.Set(change.Key, change.NewValue); err != nil {
                // 回滚已执行的变更
                t.rollbackChanges()
                return err
            }
        case "delete":
            if err := t.manager.storage.Delete(change.Key); err != nil {
                t.rollbackChanges()
                return err
            }
        }
    }

    t.status = TransactionStatusCommitted
    return nil
}

// 回滚事务
func (t *ConfigTransaction) Rollback() error {
    if t.status != TransactionStatusPending {
        return ErrTransactionNotPending
    }

    defer t.releaseLocks()

    t.rollbackChanges()
    t.status = TransactionStatusRolledBack
    return nil
}

func (t *ConfigTransaction) releaseLocks() {
    for _, lock := range t.locks {
        lock.Release()
    }
    t.locks = nil
}

动态配置更新是配置中心的核心功能,通过推送和拉取两种模式,结合监听器机制和热更新功能,可以实现配置的实时更新。同时,通过分布式锁和事务机制,可以保证配置更新的一致性和可靠性。在下一节中,我们将探讨配置的版本管理机制。