5.5.3 配置版本管理 #
配置版本管理是分布式配置系统的重要组成部分,它提供了配置变更的历史记录、回滚能力、灰度发布和 A/B 测试等功能。良好的版本管理机制能够确保配置变更的可追溯性和系统的稳定性。
配置版本模型设计 #
版本数据结构 #
// 配置版本信息
type ConfigVersion struct {
ID string `json:"id"`
ConfigKey string `json:"config_key"`
Version int64 `json:"version"`
Value string `json:"value"`
Metadata map[string]string `json:"metadata"`
ChangeType ChangeType `json:"change_type"`
Description string `json:"description"`
CreatedBy string `json:"created_by"`
CreatedAt time.Time `json:"created_at"`
Status VersionStatus `json:"status"`
Tags []string `json:"tags"`
}
type ChangeType int
const (
ChangeTypeCreate ChangeType = iota
ChangeTypeUpdate
ChangeTypeDelete
)
type VersionStatus int
const (
VersionStatusDraft VersionStatus = iota
VersionStatusActive
VersionStatusArchived
VersionStatusRolledBack
)
// 配置发布记录
type ConfigRelease struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Environment string `json:"environment"`
Namespace string `json:"namespace"`
Application string `json:"application"`
Versions map[string]int64 `json:"versions"` // config_key -> version
Strategy ReleaseStrategy `json:"strategy"`
Status ReleaseStatus `json:"status"`
CreatedBy string `json:"created_by"`
CreatedAt time.Time `json:"created_at"`
ReleasedAt *time.Time `json:"released_at,omitempty"`
RolledBackAt *time.Time `json:"rolled_back_at,omitempty"`
}
type ReleaseStrategy struct {
Type ReleaseType `json:"type"`
Parameters map[string]interface{} `json:"parameters"`
}
type ReleaseType int
const (
ReleaseTypeImmediate ReleaseType = iota
ReleaseTypeCanary
ReleaseTypeBlueGreen
ReleaseTypeABTest
)
type ReleaseStatus int
const (
ReleaseStatusPending ReleaseStatus = iota
ReleaseStatusInProgress
ReleaseStatusCompleted
ReleaseStatusFailed
ReleaseStatusRolledBack
)
版本管理器实现 #
// 配置版本管理器
type ConfigVersionManager struct {
storage Storage
versionStorage VersionStorage
releaseManager *ReleaseManager
logger *log.Logger
}
// 版本存储接口
type VersionStorage interface {
SaveVersion(version *ConfigVersion) error
GetVersion(configKey string, version int64) (*ConfigVersion, error)
GetVersionHistory(configKey string, limit int) ([]*ConfigVersion, error)
GetLatestVersion(configKey string) (*ConfigVersion, error)
DeleteVersion(configKey string, version int64) error
}
func NewConfigVersionManager(storage Storage, versionStorage VersionStorage) *ConfigVersionManager {
return &ConfigVersionManager{
storage: storage,
versionStorage: versionStorage,
releaseManager: NewReleaseManager(versionStorage),
logger: log.New(os.Stdout, "[VersionManager] ", log.LstdFlags),
}
}
// 创建新版本
func (m *ConfigVersionManager) CreateVersion(configKey, value, description, createdBy string, metadata map[string]string) (*ConfigVersion, error) {
// 获取当前最新版本号
latestVersion, err := m.versionStorage.GetLatestVersion(configKey)
var newVersionNumber int64 = 1
if err == nil && latestVersion != nil {
newVersionNumber = latestVersion.Version + 1
}
version := &ConfigVersion{
ID: uuid.New().String(),
ConfigKey: configKey,
Version: newVersionNumber,
Value: value,
Metadata: metadata,
ChangeType: ChangeTypeUpdate,
Description: description,
CreatedBy: createdBy,
CreatedAt: time.Now(),
Status: VersionStatusDraft,
}
// 如果是第一个版本,设置为创建类型
if newVersionNumber == 1 {
version.ChangeType = ChangeTypeCreate
}
if err := m.versionStorage.SaveVersion(version); err != nil {
return nil, err
}
m.logger.Printf("创建配置版本: %s v%d", configKey, newVersionNumber)
return version, nil
}
// 激活版本
func (m *ConfigVersionManager) ActivateVersion(configKey string, version int64, activatedBy string) error {
// 获取指定版本
configVersion, err := m.versionStorage.GetVersion(configKey, version)
if err != nil {
return err
}
if configVersion.Status != VersionStatusDraft {
return fmt.Errorf("只能激活草稿状态的版本")
}
// 将当前激活版本设为归档状态
if currentVersion, err := m.getCurrentActiveVersion(configKey); err == nil {
currentVersion.Status = VersionStatusArchived
m.versionStorage.SaveVersion(currentVersion)
}
// 激活新版本
configVersion.Status = VersionStatusActive
if err := m.versionStorage.SaveVersion(configVersion); err != nil {
return err
}
// 更新配置存储
configItem := &ConfigItem{
Key: configKey,
Value: configVersion.Value,
Version: configVersion.Version,
Metadata: configVersion.Metadata,
UpdatedAt: time.Now(),
UpdatedBy: activatedBy,
}
if err := m.storage.Set(configKey, configItem); err != nil {
return err
}
m.logger.Printf("激活配置版本: %s v%d", configKey, version)
return nil
}
// 回滚到指定版本
func (m *ConfigVersionManager) RollbackToVersion(configKey string, targetVersion int64, rolledBackBy string) error {
// 获取目标版本
targetConfigVersion, err := m.versionStorage.GetVersion(configKey, targetVersion)
if err != nil {
return err
}
// 获取当前激活版本
currentVersion, err := m.getCurrentActiveVersion(configKey)
if err != nil {
return err
}
// 标记当前版本为回滚状态
currentVersion.Status = VersionStatusRolledBack
if err := m.versionStorage.SaveVersion(currentVersion); err != nil {
return err
}
// 激活目标版本
targetConfigVersion.Status = VersionStatusActive
if err := m.versionStorage.SaveVersion(targetConfigVersion); err != nil {
return err
}
// 更新配置存储
configItem := &ConfigItem{
Key: configKey,
Value: targetConfigVersion.Value,
Version: targetConfigVersion.Version,
Metadata: targetConfigVersion.Metadata,
UpdatedAt: time.Now(),
UpdatedBy: rolledBackBy,
}
if err := m.storage.Set(configKey, configItem); err != nil {
return err
}
m.logger.Printf("回滚配置版本: %s 从 v%d 到 v%d", configKey, currentVersion.Version, targetVersion)
return nil
}
// 获取当前激活版本
func (m *ConfigVersionManager) getCurrentActiveVersion(configKey string) (*ConfigVersion, error) {
history, err := m.versionStorage.GetVersionHistory(configKey, 100)
if err != nil {
return nil, err
}
for _, version := range history {
if version.Status == VersionStatusActive {
return version, nil
}
}
return nil, fmt.Errorf("未找到激活版本")
}
// 比较版本差异
func (m *ConfigVersionManager) CompareVersions(configKey string, version1, version2 int64) (*VersionDiff, error) {
v1, err := m.versionStorage.GetVersion(configKey, version1)
if err != nil {
return nil, err
}
v2, err := m.versionStorage.GetVersion(configKey, version2)
if err != nil {
return nil, err
}
diff := &VersionDiff{
ConfigKey: configKey,
Version1: v1,
Version2: v2,
Changes: m.calculateDiff(v1.Value, v2.Value),
}
return diff, nil
}
type VersionDiff struct {
ConfigKey string `json:"config_key"`
Version1 *ConfigVersion `json:"version1"`
Version2 *ConfigVersion `json:"version2"`
Changes []DiffChange `json:"changes"`
}
type DiffChange struct {
Type DiffType `json:"type"`
Path string `json:"path"`
OldValue interface{} `json:"old_value,omitempty"`
NewValue interface{} `json:"new_value,omitempty"`
}
type DiffType int
const (
DiffTypeAdd DiffType = iota
DiffTypeRemove
DiffTypeModify
)
发布策略实现 #
灰度发布 #
// 灰度发布管理器
type CanaryReleaseManager struct {
versionManager *ConfigVersionManager
storage Storage
logger *log.Logger
}
// 灰度发布配置
type CanaryConfig struct {
Percentage int `json:"percentage"` // 灰度流量百分比
Rules []CanaryRule `json:"rules"` // 灰度规则
Duration time.Duration `json:"duration"` // 灰度持续时间
AutoPromote bool `json:"auto_promote"` // 自动全量发布
RollbackRules []RollbackRule `json:"rollback_rules"` // 自动回滚规则
}
type CanaryRule struct {
Type CanaryRuleType `json:"type"`
Field string `json:"field"`
Operator string `json:"operator"`
Value interface{} `json:"value"`
}
type CanaryRuleType int
const (
CanaryRuleTypeHeader CanaryRuleType = iota
CanaryRuleTypeUserID
CanaryRuleTypeIP
CanaryRuleTypeRegion
)
type RollbackRule struct {
Metric string `json:"metric"`
Threshold float64 `json:"threshold"`
Duration time.Duration `json:"duration"`
}
func NewCanaryReleaseManager(versionManager *ConfigVersionManager, storage Storage) *CanaryReleaseManager {
return &CanaryReleaseManager{
versionManager: versionManager,
storage: storage,
logger: log.New(os.Stdout, "[CanaryRelease] ", log.LstdFlags),
}
}
// 开始灰度发布
func (m *CanaryReleaseManager) StartCanaryRelease(release *ConfigRelease, config *CanaryConfig) error {
release.Status = ReleaseStatusInProgress
release.Strategy = ReleaseStrategy{
Type: ReleaseTypeCanary,
Parameters: map[string]interface{}{
"config": config,
},
}
// 创建灰度发布记录
canaryRelease := &CanaryRelease{
ID: uuid.New().String(),
ReleaseID: release.ID,
Config: config,
Status: CanaryStatusActive,
StartedAt: time.Now(),
}
if err := m.saveCanaryRelease(canaryRelease); err != nil {
return err
}
// 启动灰度发布监控
go m.monitorCanaryRelease(canaryRelease)
m.logger.Printf("开始灰度发布: %s", release.ID)
return nil
}
type CanaryRelease struct {
ID string `json:"id"`
ReleaseID string `json:"release_id"`
Config *CanaryConfig `json:"config"`
Status CanaryStatus `json:"status"`
StartedAt time.Time `json:"started_at"`
EndedAt *time.Time `json:"ended_at,omitempty"`
Metrics *CanaryMetrics `json:"metrics,omitempty"`
}
type CanaryStatus int
const (
CanaryStatusActive CanaryStatus = iota
CanaryStatusPromoted
CanaryStatusRolledBack
)
type CanaryMetrics struct {
TotalRequests int64 `json:"total_requests"`
CanaryRequests int64 `json:"canary_requests"`
ErrorRate float64 `json:"error_rate"`
ResponseTime float64 `json:"response_time"`
SuccessRate float64 `json:"success_rate"`
}
// 监控灰度发布
func (m *CanaryReleaseManager) monitorCanaryRelease(canary *CanaryRelease) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
timeout := time.After(canary.Config.Duration)
for {
select {
case <-ticker.C:
// 收集指标
metrics, err := m.collectCanaryMetrics(canary)
if err != nil {
m.logger.Printf("收集灰度指标失败: %v", err)
continue
}
canary.Metrics = metrics
// 检查回滚条件
if m.shouldRollback(canary) {
m.rollbackCanaryRelease(canary)
return
}
case <-timeout:
// 灰度时间到,检查是否自动全量发布
if canary.Config.AutoPromote && !m.shouldRollback(canary) {
m.promoteCanaryRelease(canary)
}
return
}
}
}
// 检查是否应该回滚
func (m *CanaryReleaseManager) shouldRollback(canary *CanaryRelease) bool {
if canary.Metrics == nil {
return false
}
for _, rule := range canary.Config.RollbackRules {
switch rule.Metric {
case "error_rate":
if canary.Metrics.ErrorRate > rule.Threshold {
return true
}
case "response_time":
if canary.Metrics.ResponseTime > rule.Threshold {
return true
}
case "success_rate":
if canary.Metrics.SuccessRate < rule.Threshold {
return true
}
}
}
return false
}
// 全量发布
func (m *CanaryReleaseManager) promoteCanaryRelease(canary *CanaryRelease) error {
canary.Status = CanaryStatusPromoted
now := time.Now()
canary.EndedAt = &now
// 更新所有配置到新版本
// 实现省略...
m.logger.Printf("灰度发布全量发布: %s", canary.ID)
return m.saveCanaryRelease(canary)
}
// 回滚灰度发布
func (m *CanaryReleaseManager) rollbackCanaryRelease(canary *CanaryRelease) error {
canary.Status = CanaryStatusRolledBack
now := time.Now()
canary.EndedAt = &now
// 回滚所有配置到之前版本
// 实现省略...
m.logger.Printf("灰度发布回滚: %s", canary.ID)
return m.saveCanaryRelease(canary)
}
A/B 测试 #
// A/B测试管理器
type ABTestManager struct {
versionManager *ConfigVersionManager
storage Storage
logger *log.Logger
}
// A/B测试配置
type ABTestConfig struct {
Name string `json:"name"`
Description string `json:"description"`
Variants []ABTestVariant `json:"variants"`
TrafficSplit map[string]int `json:"traffic_split"` // variant_id -> percentage
Rules []ABTestRule `json:"rules"`
Duration time.Duration `json:"duration"`
Metrics []string `json:"metrics"`
}
type ABTestVariant struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Configs map[string]int64 `json:"configs"` // config_key -> version
}
type ABTestRule struct {
Field string `json:"field"`
Operator string `json:"operator"`
Value interface{} `json:"value"`
VariantID string `json:"variant_id"`
}
func NewABTestManager(versionManager *ConfigVersionManager, storage Storage) *ABTestManager {
return &ABTestManager{
versionManager: versionManager,
storage: storage,
logger: log.New(os.Stdout, "[ABTest] ", log.LstdFlags),
}
}
// 创建A/B测试
func (m *ABTestManager) CreateABTest(config *ABTestConfig) (*ABTest, error) {
// 验证流量分配
totalPercentage := 0
for _, percentage := range config.TrafficSplit {
totalPercentage += percentage
}
if totalPercentage != 100 {
return nil, fmt.Errorf("流量分配总和必须为100%%")
}
abTest := &ABTest{
ID: uuid.New().String(),
Config: config,
Status: ABTestStatusActive,
StartedAt: time.Now(),
Results: make(map[string]*ABTestResult),
}
// 初始化每个变体的结果
for _, variant := range config.Variants {
abTest.Results[variant.ID] = &ABTestResult{
VariantID: variant.ID,
Metrics: make(map[string]float64),
}
}
if err := m.saveABTest(abTest); err != nil {
return nil, err
}
// 启动A/B测试监控
go m.monitorABTest(abTest)
m.logger.Printf("创建A/B测试: %s", abTest.ID)
return abTest, nil
}
type ABTest struct {
ID string `json:"id"`
Config *ABTestConfig `json:"config"`
Status ABTestStatus `json:"status"`
StartedAt time.Time `json:"started_at"`
EndedAt *time.Time `json:"ended_at,omitempty"`
Results map[string]*ABTestResult `json:"results"`
}
type ABTestStatus int
const (
ABTestStatusActive ABTestStatus = iota
ABTestStatusCompleted
ABTestStatusStopped
)
type ABTestResult struct {
VariantID string `json:"variant_id"`
TotalRequests int64 `json:"total_requests"`
Conversions int64 `json:"conversions"`
ConversionRate float64 `json:"conversion_rate"`
Metrics map[string]float64 `json:"metrics"`
StatSignificant bool `json:"stat_significant"`
}
// 获取A/B测试变体
func (m *ABTestManager) GetVariantForUser(testID, userID string, context map[string]interface{}) (string, error) {
abTest, err := m.getABTest(testID)
if err != nil {
return "", err
}
if abTest.Status != ABTestStatusActive {
return "", fmt.Errorf("A/B测试未激活")
}
// 检查规则匹配
for _, rule := range abTest.Config.Rules {
if m.matchRule(rule, context) {
return rule.VariantID, nil
}
}
// 基于用户ID进行哈希分配
hash := m.hashUser(userID, testID)
return m.selectVariantByHash(abTest, hash), nil
}
// 用户哈希
func (m *ABTestManager) hashUser(userID, testID string) uint32 {
h := fnv.New32a()
h.Write([]byte(userID + testID))
return h.Sum32()
}
// 基于哈希选择变体
func (m *ABTestManager) selectVariantByHash(abTest *ABTest, hash uint32) string {
percentage := int(hash % 100)
cumulative := 0
for variantID, split := range abTest.Config.TrafficSplit {
cumulative += split
if percentage < cumulative {
return variantID
}
}
// 默认返回第一个变体
if len(abTest.Config.Variants) > 0 {
return abTest.Config.Variants[0].ID
}
return ""
}
// 记录A/B测试事件
func (m *ABTestManager) RecordEvent(testID, variantID, eventType string, value float64) error {
abTest, err := m.getABTest(testID)
if err != nil {
return err
}
result, exists := abTest.Results[variantID]
if !exists {
return fmt.Errorf("变体不存在: %s", variantID)
}
switch eventType {
case "request":
result.TotalRequests++
case "conversion":
result.Conversions++
result.ConversionRate = float64(result.Conversions) / float64(result.TotalRequests)
default:
result.Metrics[eventType] = value
}
return m.saveABTest(abTest)
}
// 分析A/B测试结果
func (m *ABTestManager) AnalyzeResults(testID string) (*ABTestAnalysis, error) {
abTest, err := m.getABTest(testID)
if err != nil {
return nil, err
}
analysis := &ABTestAnalysis{
TestID: testID,
Variants: make(map[string]*VariantAnalysis),
Winner: "",
Confidence: 0,
}
// 计算每个变体的统计显著性
for variantID, result := range abTest.Results {
variantAnalysis := &VariantAnalysis{
VariantID: variantID,
ConversionRate: result.ConversionRate,
SampleSize: result.TotalRequests,
Confidence: m.calculateConfidence(result),
}
analysis.Variants[variantID] = variantAnalysis
}
// 确定获胜变体
analysis.Winner = m.determineWinner(analysis.Variants)
return analysis, nil
}
type ABTestAnalysis struct {
TestID string `json:"test_id"`
Variants map[string]*VariantAnalysis `json:"variants"`
Winner string `json:"winner"`
Confidence float64 `json:"confidence"`
}
type VariantAnalysis struct {
VariantID string `json:"variant_id"`
ConversionRate float64 `json:"conversion_rate"`
SampleSize int64 `json:"sample_size"`
Confidence float64 `json:"confidence"`
Improvement float64 `json:"improvement"`
}
配置审计和合规 #
审计日志 #
// 配置审计管理器
type ConfigAuditManager struct {
storage AuditStorage
logger *log.Logger
}
// 审计记录
type AuditRecord struct {
ID string `json:"id"`
Timestamp time.Time `json:"timestamp"`
UserID string `json:"user_id"`
Action AuditAction `json:"action"`
Resource string `json:"resource"`
OldValue interface{} `json:"old_value,omitempty"`
NewValue interface{} `json:"new_value,omitempty"`
Result AuditResult `json:"result"`
Error string `json:"error,omitempty"`
Context map[string]interface{} `json:"context"`
IPAddress string `json:"ip_address"`
UserAgent string `json:"user_agent"`
}
type AuditAction int
const (
AuditActionCreate AuditAction = iota
AuditActionRead
AuditActionUpdate
AuditActionDelete
AuditActionActivate
AuditActionRollback
)
type AuditResult int
const (
AuditResultSuccess AuditResult = iota
AuditResultFailure
AuditResultDenied
)
// 记录审计日志
func (m *ConfigAuditManager) RecordAudit(record *AuditRecord) error {
record.ID = uuid.New().String()
record.Timestamp = time.Now()
if err := m.storage.SaveAuditRecord(record); err != nil {
m.logger.Printf("保存审计记录失败: %v", err)
return err
}
// 异步发送审计事件
go m.sendAuditEvent(record)
return nil
}
// 查询审计日志
func (m *ConfigAuditManager) QueryAuditLogs(query *AuditQuery) ([]*AuditRecord, error) {
return m.storage.QueryAuditRecords(query)
}
type AuditQuery struct {
UserID string `json:"user_id,omitempty"`
Action AuditAction `json:"action,omitempty"`
Resource string `json:"resource,omitempty"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Limit int `json:"limit"`
Offset int `json:"offset"`
}
配置版本管理为分布式配置系统提供了完整的变更追踪、回滚能力和发布策略。通过版本控制、灰度发布、A/B 测试和审计日志等功能,可以确保配置变更的安全性和可控性。在下一节中,我们将探讨配置安全与加密的实现。