5.5.3 配置版本管理

5.5.3 配置版本管理 #

配置版本管理是分布式配置系统的重要组成部分,它提供了配置变更的历史记录、回滚能力、灰度发布和 A/B 测试等功能。良好的版本管理机制能够确保配置变更的可追溯性和系统的稳定性。

配置版本模型设计 #

版本数据结构 #

// 配置版本信息
type ConfigVersion struct {
    ID          string            `json:"id"`
    ConfigKey   string            `json:"config_key"`
    Version     int64             `json:"version"`
    Value       string            `json:"value"`
    Metadata    map[string]string `json:"metadata"`
    ChangeType  ChangeType        `json:"change_type"`
    Description string            `json:"description"`
    CreatedBy   string            `json:"created_by"`
    CreatedAt   time.Time         `json:"created_at"`
    Status      VersionStatus     `json:"status"`
    Tags        []string          `json:"tags"`
}

type ChangeType int

const (
    ChangeTypeCreate ChangeType = iota
    ChangeTypeUpdate
    ChangeTypeDelete
)

type VersionStatus int

const (
    VersionStatusDraft VersionStatus = iota
    VersionStatusActive
    VersionStatusArchived
    VersionStatusRolledBack
)

// 配置发布记录
type ConfigRelease struct {
    ID          string                 `json:"id"`
    Name        string                 `json:"name"`
    Description string                 `json:"description"`
    Environment string                 `json:"environment"`
    Namespace   string                 `json:"namespace"`
    Application string                 `json:"application"`
    Versions    map[string]int64       `json:"versions"` // config_key -> version
    Strategy    ReleaseStrategy        `json:"strategy"`
    Status      ReleaseStatus          `json:"status"`
    CreatedBy   string                 `json:"created_by"`
    CreatedAt   time.Time              `json:"created_at"`
    ReleasedAt  *time.Time             `json:"released_at,omitempty"`
    RolledBackAt *time.Time            `json:"rolled_back_at,omitempty"`
}

type ReleaseStrategy struct {
    Type       ReleaseType            `json:"type"`
    Parameters map[string]interface{} `json:"parameters"`
}

type ReleaseType int

const (
    ReleaseTypeImmediate ReleaseType = iota
    ReleaseTypeCanary
    ReleaseTypeBlueGreen
    ReleaseTypeABTest
)

type ReleaseStatus int

const (
    ReleaseStatusPending ReleaseStatus = iota
    ReleaseStatusInProgress
    ReleaseStatusCompleted
    ReleaseStatusFailed
    ReleaseStatusRolledBack
)

版本管理器实现 #

// 配置版本管理器
type ConfigVersionManager struct {
    storage       Storage
    versionStorage VersionStorage
    releaseManager *ReleaseManager
    logger        *log.Logger
}

// 版本存储接口
type VersionStorage interface {
    SaveVersion(version *ConfigVersion) error
    GetVersion(configKey string, version int64) (*ConfigVersion, error)
    GetVersionHistory(configKey string, limit int) ([]*ConfigVersion, error)
    GetLatestVersion(configKey string) (*ConfigVersion, error)
    DeleteVersion(configKey string, version int64) error
}

func NewConfigVersionManager(storage Storage, versionStorage VersionStorage) *ConfigVersionManager {
    return &ConfigVersionManager{
        storage:        storage,
        versionStorage: versionStorage,
        releaseManager: NewReleaseManager(versionStorage),
        logger:         log.New(os.Stdout, "[VersionManager] ", log.LstdFlags),
    }
}

// 创建新版本
func (m *ConfigVersionManager) CreateVersion(configKey, value, description, createdBy string, metadata map[string]string) (*ConfigVersion, error) {
    // 获取当前最新版本号
    latestVersion, err := m.versionStorage.GetLatestVersion(configKey)
    var newVersionNumber int64 = 1

    if err == nil && latestVersion != nil {
        newVersionNumber = latestVersion.Version + 1
    }

    version := &ConfigVersion{
        ID:          uuid.New().String(),
        ConfigKey:   configKey,
        Version:     newVersionNumber,
        Value:       value,
        Metadata:    metadata,
        ChangeType:  ChangeTypeUpdate,
        Description: description,
        CreatedBy:   createdBy,
        CreatedAt:   time.Now(),
        Status:      VersionStatusDraft,
    }

    // 如果是第一个版本,设置为创建类型
    if newVersionNumber == 1 {
        version.ChangeType = ChangeTypeCreate
    }

    if err := m.versionStorage.SaveVersion(version); err != nil {
        return nil, err
    }

    m.logger.Printf("创建配置版本: %s v%d", configKey, newVersionNumber)
    return version, nil
}

// 激活版本
func (m *ConfigVersionManager) ActivateVersion(configKey string, version int64, activatedBy string) error {
    // 获取指定版本
    configVersion, err := m.versionStorage.GetVersion(configKey, version)
    if err != nil {
        return err
    }

    if configVersion.Status != VersionStatusDraft {
        return fmt.Errorf("只能激活草稿状态的版本")
    }

    // 将当前激活版本设为归档状态
    if currentVersion, err := m.getCurrentActiveVersion(configKey); err == nil {
        currentVersion.Status = VersionStatusArchived
        m.versionStorage.SaveVersion(currentVersion)
    }

    // 激活新版本
    configVersion.Status = VersionStatusActive
    if err := m.versionStorage.SaveVersion(configVersion); err != nil {
        return err
    }

    // 更新配置存储
    configItem := &ConfigItem{
        Key:       configKey,
        Value:     configVersion.Value,
        Version:   configVersion.Version,
        Metadata:  configVersion.Metadata,
        UpdatedAt: time.Now(),
        UpdatedBy: activatedBy,
    }

    if err := m.storage.Set(configKey, configItem); err != nil {
        return err
    }

    m.logger.Printf("激活配置版本: %s v%d", configKey, version)
    return nil
}

// 回滚到指定版本
func (m *ConfigVersionManager) RollbackToVersion(configKey string, targetVersion int64, rolledBackBy string) error {
    // 获取目标版本
    targetConfigVersion, err := m.versionStorage.GetVersion(configKey, targetVersion)
    if err != nil {
        return err
    }

    // 获取当前激活版本
    currentVersion, err := m.getCurrentActiveVersion(configKey)
    if err != nil {
        return err
    }

    // 标记当前版本为回滚状态
    currentVersion.Status = VersionStatusRolledBack
    if err := m.versionStorage.SaveVersion(currentVersion); err != nil {
        return err
    }

    // 激活目标版本
    targetConfigVersion.Status = VersionStatusActive
    if err := m.versionStorage.SaveVersion(targetConfigVersion); err != nil {
        return err
    }

    // 更新配置存储
    configItem := &ConfigItem{
        Key:       configKey,
        Value:     targetConfigVersion.Value,
        Version:   targetConfigVersion.Version,
        Metadata:  targetConfigVersion.Metadata,
        UpdatedAt: time.Now(),
        UpdatedBy: rolledBackBy,
    }

    if err := m.storage.Set(configKey, configItem); err != nil {
        return err
    }

    m.logger.Printf("回滚配置版本: %s 从 v%d 到 v%d", configKey, currentVersion.Version, targetVersion)
    return nil
}

// 获取当前激活版本
func (m *ConfigVersionManager) getCurrentActiveVersion(configKey string) (*ConfigVersion, error) {
    history, err := m.versionStorage.GetVersionHistory(configKey, 100)
    if err != nil {
        return nil, err
    }

    for _, version := range history {
        if version.Status == VersionStatusActive {
            return version, nil
        }
    }

    return nil, fmt.Errorf("未找到激活版本")
}

// 比较版本差异
func (m *ConfigVersionManager) CompareVersions(configKey string, version1, version2 int64) (*VersionDiff, error) {
    v1, err := m.versionStorage.GetVersion(configKey, version1)
    if err != nil {
        return nil, err
    }

    v2, err := m.versionStorage.GetVersion(configKey, version2)
    if err != nil {
        return nil, err
    }

    diff := &VersionDiff{
        ConfigKey: configKey,
        Version1:  v1,
        Version2:  v2,
        Changes:   m.calculateDiff(v1.Value, v2.Value),
    }

    return diff, nil
}

type VersionDiff struct {
    ConfigKey string          `json:"config_key"`
    Version1  *ConfigVersion  `json:"version1"`
    Version2  *ConfigVersion  `json:"version2"`
    Changes   []DiffChange    `json:"changes"`
}

type DiffChange struct {
    Type     DiffType    `json:"type"`
    Path     string      `json:"path"`
    OldValue interface{} `json:"old_value,omitempty"`
    NewValue interface{} `json:"new_value,omitempty"`
}

type DiffType int

const (
    DiffTypeAdd DiffType = iota
    DiffTypeRemove
    DiffTypeModify
)

发布策略实现 #

灰度发布 #

// 灰度发布管理器
type CanaryReleaseManager struct {
    versionManager *ConfigVersionManager
    storage        Storage
    logger         *log.Logger
}

// 灰度发布配置
type CanaryConfig struct {
    Percentage    int               `json:"percentage"`     // 灰度流量百分比
    Rules         []CanaryRule      `json:"rules"`          // 灰度规则
    Duration      time.Duration     `json:"duration"`       // 灰度持续时间
    AutoPromote   bool              `json:"auto_promote"`   // 自动全量发布
    RollbackRules []RollbackRule    `json:"rollback_rules"` // 自动回滚规则
}

type CanaryRule struct {
    Type      CanaryRuleType    `json:"type"`
    Field     string            `json:"field"`
    Operator  string            `json:"operator"`
    Value     interface{}       `json:"value"`
}

type CanaryRuleType int

const (
    CanaryRuleTypeHeader CanaryRuleType = iota
    CanaryRuleTypeUserID
    CanaryRuleTypeIP
    CanaryRuleTypeRegion
)

type RollbackRule struct {
    Metric    string  `json:"metric"`
    Threshold float64 `json:"threshold"`
    Duration  time.Duration `json:"duration"`
}

func NewCanaryReleaseManager(versionManager *ConfigVersionManager, storage Storage) *CanaryReleaseManager {
    return &CanaryReleaseManager{
        versionManager: versionManager,
        storage:        storage,
        logger:         log.New(os.Stdout, "[CanaryRelease] ", log.LstdFlags),
    }
}

// 开始灰度发布
func (m *CanaryReleaseManager) StartCanaryRelease(release *ConfigRelease, config *CanaryConfig) error {
    release.Status = ReleaseStatusInProgress
    release.Strategy = ReleaseStrategy{
        Type: ReleaseTypeCanary,
        Parameters: map[string]interface{}{
            "config": config,
        },
    }

    // 创建灰度发布记录
    canaryRelease := &CanaryRelease{
        ID:        uuid.New().String(),
        ReleaseID: release.ID,
        Config:    config,
        Status:    CanaryStatusActive,
        StartedAt: time.Now(),
    }

    if err := m.saveCanaryRelease(canaryRelease); err != nil {
        return err
    }

    // 启动灰度发布监控
    go m.monitorCanaryRelease(canaryRelease)

    m.logger.Printf("开始灰度发布: %s", release.ID)
    return nil
}

type CanaryRelease struct {
    ID        string        `json:"id"`
    ReleaseID string        `json:"release_id"`
    Config    *CanaryConfig `json:"config"`
    Status    CanaryStatus  `json:"status"`
    StartedAt time.Time     `json:"started_at"`
    EndedAt   *time.Time    `json:"ended_at,omitempty"`
    Metrics   *CanaryMetrics `json:"metrics,omitempty"`
}

type CanaryStatus int

const (
    CanaryStatusActive CanaryStatus = iota
    CanaryStatusPromoted
    CanaryStatusRolledBack
)

type CanaryMetrics struct {
    TotalRequests   int64   `json:"total_requests"`
    CanaryRequests  int64   `json:"canary_requests"`
    ErrorRate       float64 `json:"error_rate"`
    ResponseTime    float64 `json:"response_time"`
    SuccessRate     float64 `json:"success_rate"`
}

// 监控灰度发布
func (m *CanaryReleaseManager) monitorCanaryRelease(canary *CanaryRelease) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    timeout := time.After(canary.Config.Duration)

    for {
        select {
        case <-ticker.C:
            // 收集指标
            metrics, err := m.collectCanaryMetrics(canary)
            if err != nil {
                m.logger.Printf("收集灰度指标失败: %v", err)
                continue
            }

            canary.Metrics = metrics

            // 检查回滚条件
            if m.shouldRollback(canary) {
                m.rollbackCanaryRelease(canary)
                return
            }

        case <-timeout:
            // 灰度时间到,检查是否自动全量发布
            if canary.Config.AutoPromote && !m.shouldRollback(canary) {
                m.promoteCanaryRelease(canary)
            }
            return
        }
    }
}

// 检查是否应该回滚
func (m *CanaryReleaseManager) shouldRollback(canary *CanaryRelease) bool {
    if canary.Metrics == nil {
        return false
    }

    for _, rule := range canary.Config.RollbackRules {
        switch rule.Metric {
        case "error_rate":
            if canary.Metrics.ErrorRate > rule.Threshold {
                return true
            }
        case "response_time":
            if canary.Metrics.ResponseTime > rule.Threshold {
                return true
            }
        case "success_rate":
            if canary.Metrics.SuccessRate < rule.Threshold {
                return true
            }
        }
    }

    return false
}

// 全量发布
func (m *CanaryReleaseManager) promoteCanaryRelease(canary *CanaryRelease) error {
    canary.Status = CanaryStatusPromoted
    now := time.Now()
    canary.EndedAt = &now

    // 更新所有配置到新版本
    // 实现省略...

    m.logger.Printf("灰度发布全量发布: %s", canary.ID)
    return m.saveCanaryRelease(canary)
}

// 回滚灰度发布
func (m *CanaryReleaseManager) rollbackCanaryRelease(canary *CanaryRelease) error {
    canary.Status = CanaryStatusRolledBack
    now := time.Now()
    canary.EndedAt = &now

    // 回滚所有配置到之前版本
    // 实现省略...

    m.logger.Printf("灰度发布回滚: %s", canary.ID)
    return m.saveCanaryRelease(canary)
}

A/B 测试 #

// A/B测试管理器
type ABTestManager struct {
    versionManager *ConfigVersionManager
    storage        Storage
    logger         *log.Logger
}

// A/B测试配置
type ABTestConfig struct {
    Name        string            `json:"name"`
    Description string            `json:"description"`
    Variants    []ABTestVariant   `json:"variants"`
    TrafficSplit map[string]int   `json:"traffic_split"` // variant_id -> percentage
    Rules       []ABTestRule      `json:"rules"`
    Duration    time.Duration     `json:"duration"`
    Metrics     []string          `json:"metrics"`
}

type ABTestVariant struct {
    ID          string            `json:"id"`
    Name        string            `json:"name"`
    Description string            `json:"description"`
    Configs     map[string]int64  `json:"configs"` // config_key -> version
}

type ABTestRule struct {
    Field     string      `json:"field"`
    Operator  string      `json:"operator"`
    Value     interface{} `json:"value"`
    VariantID string      `json:"variant_id"`
}

func NewABTestManager(versionManager *ConfigVersionManager, storage Storage) *ABTestManager {
    return &ABTestManager{
        versionManager: versionManager,
        storage:        storage,
        logger:         log.New(os.Stdout, "[ABTest] ", log.LstdFlags),
    }
}

// 创建A/B测试
func (m *ABTestManager) CreateABTest(config *ABTestConfig) (*ABTest, error) {
    // 验证流量分配
    totalPercentage := 0
    for _, percentage := range config.TrafficSplit {
        totalPercentage += percentage
    }

    if totalPercentage != 100 {
        return nil, fmt.Errorf("流量分配总和必须为100%%")
    }

    abTest := &ABTest{
        ID:        uuid.New().String(),
        Config:    config,
        Status:    ABTestStatusActive,
        StartedAt: time.Now(),
        Results:   make(map[string]*ABTestResult),
    }

    // 初始化每个变体的结果
    for _, variant := range config.Variants {
        abTest.Results[variant.ID] = &ABTestResult{
            VariantID: variant.ID,
            Metrics:   make(map[string]float64),
        }
    }

    if err := m.saveABTest(abTest); err != nil {
        return nil, err
    }

    // 启动A/B测试监控
    go m.monitorABTest(abTest)

    m.logger.Printf("创建A/B测试: %s", abTest.ID)
    return abTest, nil
}

type ABTest struct {
    ID        string                    `json:"id"`
    Config    *ABTestConfig             `json:"config"`
    Status    ABTestStatus              `json:"status"`
    StartedAt time.Time                 `json:"started_at"`
    EndedAt   *time.Time                `json:"ended_at,omitempty"`
    Results   map[string]*ABTestResult  `json:"results"`
}

type ABTestStatus int

const (
    ABTestStatusActive ABTestStatus = iota
    ABTestStatusCompleted
    ABTestStatusStopped
)

type ABTestResult struct {
    VariantID       string             `json:"variant_id"`
    TotalRequests   int64              `json:"total_requests"`
    Conversions     int64              `json:"conversions"`
    ConversionRate  float64            `json:"conversion_rate"`
    Metrics         map[string]float64 `json:"metrics"`
    StatSignificant bool               `json:"stat_significant"`
}

// 获取A/B测试变体
func (m *ABTestManager) GetVariantForUser(testID, userID string, context map[string]interface{}) (string, error) {
    abTest, err := m.getABTest(testID)
    if err != nil {
        return "", err
    }

    if abTest.Status != ABTestStatusActive {
        return "", fmt.Errorf("A/B测试未激活")
    }

    // 检查规则匹配
    for _, rule := range abTest.Config.Rules {
        if m.matchRule(rule, context) {
            return rule.VariantID, nil
        }
    }

    // 基于用户ID进行哈希分配
    hash := m.hashUser(userID, testID)
    return m.selectVariantByHash(abTest, hash), nil
}

// 用户哈希
func (m *ABTestManager) hashUser(userID, testID string) uint32 {
    h := fnv.New32a()
    h.Write([]byte(userID + testID))
    return h.Sum32()
}

// 基于哈希选择变体
func (m *ABTestManager) selectVariantByHash(abTest *ABTest, hash uint32) string {
    percentage := int(hash % 100)
    cumulative := 0

    for variantID, split := range abTest.Config.TrafficSplit {
        cumulative += split
        if percentage < cumulative {
            return variantID
        }
    }

    // 默认返回第一个变体
    if len(abTest.Config.Variants) > 0 {
        return abTest.Config.Variants[0].ID
    }

    return ""
}

// 记录A/B测试事件
func (m *ABTestManager) RecordEvent(testID, variantID, eventType string, value float64) error {
    abTest, err := m.getABTest(testID)
    if err != nil {
        return err
    }

    result, exists := abTest.Results[variantID]
    if !exists {
        return fmt.Errorf("变体不存在: %s", variantID)
    }

    switch eventType {
    case "request":
        result.TotalRequests++
    case "conversion":
        result.Conversions++
        result.ConversionRate = float64(result.Conversions) / float64(result.TotalRequests)
    default:
        result.Metrics[eventType] = value
    }

    return m.saveABTest(abTest)
}

// 分析A/B测试结果
func (m *ABTestManager) AnalyzeResults(testID string) (*ABTestAnalysis, error) {
    abTest, err := m.getABTest(testID)
    if err != nil {
        return nil, err
    }

    analysis := &ABTestAnalysis{
        TestID:    testID,
        Variants:  make(map[string]*VariantAnalysis),
        Winner:    "",
        Confidence: 0,
    }

    // 计算每个变体的统计显著性
    for variantID, result := range abTest.Results {
        variantAnalysis := &VariantAnalysis{
            VariantID:      variantID,
            ConversionRate: result.ConversionRate,
            SampleSize:     result.TotalRequests,
            Confidence:     m.calculateConfidence(result),
        }

        analysis.Variants[variantID] = variantAnalysis
    }

    // 确定获胜变体
    analysis.Winner = m.determineWinner(analysis.Variants)

    return analysis, nil
}

type ABTestAnalysis struct {
    TestID     string                      `json:"test_id"`
    Variants   map[string]*VariantAnalysis `json:"variants"`
    Winner     string                      `json:"winner"`
    Confidence float64                     `json:"confidence"`
}

type VariantAnalysis struct {
    VariantID      string  `json:"variant_id"`
    ConversionRate float64 `json:"conversion_rate"`
    SampleSize     int64   `json:"sample_size"`
    Confidence     float64 `json:"confidence"`
    Improvement    float64 `json:"improvement"`
}

配置审计和合规 #

审计日志 #

// 配置审计管理器
type ConfigAuditManager struct {
    storage AuditStorage
    logger  *log.Logger
}

// 审计记录
type AuditRecord struct {
    ID          string                 `json:"id"`
    Timestamp   time.Time              `json:"timestamp"`
    UserID      string                 `json:"user_id"`
    Action      AuditAction            `json:"action"`
    Resource    string                 `json:"resource"`
    OldValue    interface{}            `json:"old_value,omitempty"`
    NewValue    interface{}            `json:"new_value,omitempty"`
    Result      AuditResult            `json:"result"`
    Error       string                 `json:"error,omitempty"`
    Context     map[string]interface{} `json:"context"`
    IPAddress   string                 `json:"ip_address"`
    UserAgent   string                 `json:"user_agent"`
}

type AuditAction int

const (
    AuditActionCreate AuditAction = iota
    AuditActionRead
    AuditActionUpdate
    AuditActionDelete
    AuditActionActivate
    AuditActionRollback
)

type AuditResult int

const (
    AuditResultSuccess AuditResult = iota
    AuditResultFailure
    AuditResultDenied
)

// 记录审计日志
func (m *ConfigAuditManager) RecordAudit(record *AuditRecord) error {
    record.ID = uuid.New().String()
    record.Timestamp = time.Now()

    if err := m.storage.SaveAuditRecord(record); err != nil {
        m.logger.Printf("保存审计记录失败: %v", err)
        return err
    }

    // 异步发送审计事件
    go m.sendAuditEvent(record)

    return nil
}

// 查询审计日志
func (m *ConfigAuditManager) QueryAuditLogs(query *AuditQuery) ([]*AuditRecord, error) {
    return m.storage.QueryAuditRecords(query)
}

type AuditQuery struct {
    UserID      string      `json:"user_id,omitempty"`
    Action      AuditAction `json:"action,omitempty"`
    Resource    string      `json:"resource,omitempty"`
    StartTime   time.Time   `json:"start_time"`
    EndTime     time.Time   `json:"end_time"`
    Limit       int         `json:"limit"`
    Offset      int         `json:"offset"`
}

配置版本管理为分布式配置系统提供了完整的变更追踪、回滚能力和发布策略。通过版本控制、灰度发布、A/B 测试和审计日志等功能,可以确保配置变更的安全性和可控性。在下一节中,我们将探讨配置安全与加密的实现。