3.4.4 数据库迁移与事务

3.4.4 数据库迁移与事务 #

数据库迁移和事务处理是企业级应用开发的重要组成部分。本节将深入探讨 GORM 的自动迁移功能、手动迁移管理、事务处理机制以及数据一致性保证策略。

数据库迁移管理 #

GORM 自动迁移 #

package main

import (
    "fmt"
    "log"
    "gorm.io/gorm"
)

// 迁移管理器
type MigrationManager struct {
    db *gorm.DB
}

func NewMigrationManager(db *gorm.DB) *MigrationManager {
    return &MigrationManager{db: db}
}

// 自动迁移所有模型
func (mm *MigrationManager) AutoMigrate() error {
    log.Println("Starting database migration...")

    // 定义需要迁移的模型
    models := []interface{}{
        &User{},
        &UserProfile{},
        &Post{},
        &Comment{},
        &Tag{},
        &Category{},
        &PostTag{},
    }

    // 执行自动迁移
    if err := mm.db.AutoMigrate(models...); err != nil {
        return fmt.Errorf("failed to auto migrate: %w", err)
    }

    log.Println("Database migration completed successfully")
    return nil
}

// 检查表是否存在
func (mm *MigrationManager) HasTable(model interface{}) bool {
    return mm.db.Migrator().HasTable(model)
}

// 创建表
func (mm *MigrationManager) CreateTable(model interface{}) error {
    if mm.HasTable(model) {
        log.Printf("Table for %T already exists", model)
        return nil
    }

    if err := mm.db.Migrator().CreateTable(model); err != nil {
        return fmt.Errorf("failed to create table for %T: %w", model, err)
    }

    log.Printf("Created table for %T", model)
    return nil
}

// 删除表
func (mm *MigrationManager) DropTable(model interface{}) error {
    if !mm.HasTable(model) {
        log.Printf("Table for %T does not exist", model)
        return nil
    }

    if err := mm.db.Migrator().DropTable(model); err != nil {
        return fmt.Errorf("failed to drop table for %T: %w", model, err)
    }

    log.Printf("Dropped table for %T", model)
    return nil
}

// 重命名表
func (mm *MigrationManager) RenameTable(oldName, newName string) error {
    if err := mm.db.Migrator().RenameTable(oldName, newName); err != nil {
        return fmt.Errorf("failed to rename table from %s to %s: %w", oldName, newName, err)
    }

    log.Printf("Renamed table from %s to %s", oldName, newName)
    return nil
}

字段级迁移操作 #

// 字段迁移操作
func (mm *MigrationManager) FieldOperations() error {
    // 检查字段是否存在
    if mm.db.Migrator().HasColumn(&User{}, "middle_name") {
        log.Println("Column middle_name already exists")
    } else {
        // 添加字段
        if err := mm.db.Migrator().AddColumn(&User{}, "middle_name"); err != nil {
            return fmt.Errorf("failed to add column middle_name: %w", err)
        }
        log.Println("Added column middle_name")
    }

    // 修改字段类型
    if err := mm.db.Migrator().AlterColumn(&User{}, "bio"); err != nil {
        return fmt.Errorf("failed to alter column bio: %w", err)
    }

    // 重命名字段
    if err := mm.db.Migrator().RenameColumn(&User{}, "old_name", "new_name"); err != nil {
        return fmt.Errorf("failed to rename column: %w", err)
    }

    // 删除字段
    if err := mm.db.Migrator().DropColumn(&User{}, "unused_field"); err != nil {
        return fmt.Errorf("failed to drop column: %w", err)
    }

    return nil
}

// 索引操作
func (mm *MigrationManager) IndexOperations() error {
    // 创建索引
    if err := mm.db.Migrator().CreateIndex(&User{}, "Email"); err != nil {
        return fmt.Errorf("failed to create index on email: %w", err)
    }

    // 创建复合索引
    if err := mm.db.Migrator().CreateIndex(&User{}, "idx_name_status"); err != nil {
        return fmt.Errorf("failed to create composite index: %w", err)
    }

    // 检查索引是否存在
    if mm.db.Migrator().HasIndex(&User{}, "Email") {
        log.Println("Index on email exists")
    }

    // 删除索引
    if err := mm.db.Migrator().DropIndex(&User{}, "Email"); err != nil {
        return fmt.Errorf("failed to drop index: %w", err)
    }

    return nil
}

// 约束操作
func (mm *MigrationManager) ConstraintOperations() error {
    // 创建外键约束
    if err := mm.db.Migrator().CreateConstraint(&User{}, "Posts"); err != nil {
        return fmt.Errorf("failed to create foreign key constraint: %w", err)
    }

    // 检查约束是否存在
    if mm.db.Migrator().HasConstraint(&User{}, "Posts") {
        log.Println("Foreign key constraint exists")
    }

    // 删除约束
    if err := mm.db.Migrator().DropConstraint(&User{}, "Posts"); err != nil {
        return fmt.Errorf("failed to drop constraint: %w", err)
    }

    return nil
}

版本化迁移系统 #

import (
    "sort"
    "strconv"
    "strings"
    "time"
)

// 迁移版本
type Migration struct {
    Version   string
    Name      string
    Up        func(*gorm.DB) error
    Down      func(*gorm.DB) error
    CreatedAt time.Time
}

// 迁移记录表
type MigrationRecord struct {
    ID        uint      `gorm:"primaryKey"`
    Version   string    `gorm:"uniqueIndex;size:50"`
    Name      string    `gorm:"size:255"`
    AppliedAt time.Time
}

// 版本化迁移管理器
type VersionedMigrationManager struct {
    db         *gorm.DB
    migrations []Migration
}

func NewVersionedMigrationManager(db *gorm.DB) *VersionedMigrationManager {
    vmm := &VersionedMigrationManager{
        db:         db,
        migrations: make([]Migration, 0),
    }

    // 确保迁移记录表存在
    db.AutoMigrate(&MigrationRecord{})

    return vmm
}

// 注册迁移
func (vmm *VersionedMigrationManager) RegisterMigration(migration Migration) {
    vmm.migrations = append(vmm.migrations, migration)
}

// 执行迁移
func (vmm *VersionedMigrationManager) Migrate() error {
    // 按版本号排序
    sort.Slice(vmm.migrations, func(i, j int) bool {
        return vmm.migrations[i].Version < vmm.migrations[j].Version
    })

    for _, migration := range vmm.migrations {
        // 检查是否已经应用
        var record MigrationRecord
        result := vmm.db.Where("version = ?", migration.Version).First(&record)

        if result.Error == nil {
            log.Printf("Migration %s already applied, skipping", migration.Version)
            continue
        }

        if !errors.Is(result.Error, gorm.ErrRecordNotFound) {
            return fmt.Errorf("failed to check migration status: %w", result.Error)
        }

        log.Printf("Applying migration %s: %s", migration.Version, migration.Name)

        // 在事务中执行迁移
        err := vmm.db.Transaction(func(tx *gorm.DB) error {
            // 执行迁移
            if err := migration.Up(tx); err != nil {
                return fmt.Errorf("migration up failed: %w", err)
            }

            // 记录迁移
            record := MigrationRecord{
                Version:   migration.Version,
                Name:      migration.Name,
                AppliedAt: time.Now(),
            }

            if err := tx.Create(&record).Error; err != nil {
                return fmt.Errorf("failed to record migration: %w", err)
            }

            return nil
        })

        if err != nil {
            return fmt.Errorf("failed to apply migration %s: %w", migration.Version, err)
        }

        log.Printf("Successfully applied migration %s", migration.Version)
    }

    return nil
}

// 回滚迁移
func (vmm *VersionedMigrationManager) Rollback(targetVersion string) error {
    // 获取已应用的迁移
    var records []MigrationRecord
    vmm.db.Where("version > ?", targetVersion).Order("version DESC").Find(&records)

    for _, record := range records {
        // 找到对应的迁移
        var migration *Migration
        for _, m := range vmm.migrations {
            if m.Version == record.Version {
                migration = &m
                break
            }
        }

        if migration == nil {
            return fmt.Errorf("migration %s not found", record.Version)
        }

        log.Printf("Rolling back migration %s: %s", migration.Version, migration.Name)

        // 在事务中执行回滚
        err := vmm.db.Transaction(func(tx *gorm.DB) error {
            // 执行回滚
            if err := migration.Down(tx); err != nil {
                return fmt.Errorf("migration down failed: %w", err)
            }

            // 删除迁移记录
            if err := tx.Delete(&record).Error; err != nil {
                return fmt.Errorf("failed to delete migration record: %w", err)
            }

            return nil
        })

        if err != nil {
            return fmt.Errorf("failed to rollback migration %s: %w", migration.Version, err)
        }

        log.Printf("Successfully rolled back migration %s", migration.Version)
    }

    return nil
}

// 获取迁移状态
func (vmm *VersionedMigrationManager) GetStatus() ([]MigrationStatus, error) {
    var records []MigrationRecord
    vmm.db.Order("version").Find(&records)

    recordMap := make(map[string]MigrationRecord)
    for _, record := range records {
        recordMap[record.Version] = record
    }

    var status []MigrationStatus
    for _, migration := range vmm.migrations {
        ms := MigrationStatus{
            Version: migration.Version,
            Name:    migration.Name,
            Applied: false,
        }

        if record, exists := recordMap[migration.Version]; exists {
            ms.Applied = true
            ms.AppliedAt = &record.AppliedAt
        }

        status = append(status, ms)
    }

    return status, nil
}

type MigrationStatus struct {
    Version   string     `json:"version"`
    Name      string     `json:"name"`
    Applied   bool       `json:"applied"`
    AppliedAt *time.Time `json:"applied_at,omitempty"`
}

迁移示例 #

// 定义具体的迁移
func RegisterMigrations(vmm *VersionedMigrationManager) {
    // 迁移 001: 创建用户表
    vmm.RegisterMigration(Migration{
        Version: "001",
        Name:    "create_users_table",
        Up: func(db *gorm.DB) error {
            return db.AutoMigrate(&User{})
        },
        Down: func(db *gorm.DB) error {
            return db.Migrator().DropTable(&User{})
        },
    })

    // 迁移 002: 添加用户资料表
    vmm.RegisterMigration(Migration{
        Version: "002",
        Name:    "create_user_profiles_table",
        Up: func(db *gorm.DB) error {
            return db.AutoMigrate(&UserProfile{})
        },
        Down: func(db *gorm.DB) error {
            return db.Migrator().DropTable(&UserProfile{})
        },
    })

    // 迁移 003: 添加中间名字段
    vmm.RegisterMigration(Migration{
        Version: "003",
        Name:    "add_middle_name_to_users",
        Up: func(db *gorm.DB) error {
            return db.Migrator().AddColumn(&User{}, "middle_name")
        },
        Down: func(db *gorm.DB) error {
            return db.Migrator().DropColumn(&User{}, "middle_name")
        },
    })

    // 迁移 004: 创建索引
    vmm.RegisterMigration(Migration{
        Version: "004",
        Name:    "create_user_email_index",
        Up: func(db *gorm.DB) error {
            return db.Exec("CREATE INDEX idx_users_email ON users(email)").Error
        },
        Down: func(db *gorm.DB) error {
            return db.Exec("DROP INDEX idx_users_email ON users").Error
        },
    })

    // 迁移 005: 数据迁移示例
    vmm.RegisterMigration(Migration{
        Version: "005",
        Name:    "migrate_user_data",
        Up: func(db *gorm.DB) error {
            // 数据迁移逻辑
            return db.Exec(`
                UPDATE users
                SET full_name = CONCAT(first_name, ' ', last_name)
                WHERE full_name IS NULL OR full_name = ''
            `).Error
        },
        Down: func(db *gorm.DB) error {
            // 回滚数据迁移
            return db.Exec("UPDATE users SET full_name = NULL").Error
        },
    })
}

事务处理机制 #

基础事务操作 #

// 事务管理器
type TransactionManager struct {
    db *gorm.DB
}

func NewTransactionManager(db *gorm.DB) *TransactionManager {
    return &TransactionManager{db: db}
}

// 基础事务示例
func (tm *TransactionManager) BasicTransaction() error {
    // 开始事务
    tx := tm.db.Begin()

    // 检查事务是否成功开始
    if tx.Error != nil {
        return fmt.Errorf("failed to begin transaction: %w", tx.Error)
    }

    // 使用 defer 确保事务被正确处理
    defer func() {
        if r := recover(); r != nil {
            tx.Rollback()
            panic(r) // 重新抛出 panic
        }
    }()

    // 执行数据库操作
    user := User{
        Username: "testuser",
        Email:    "[email protected]",
        Status:   "active",
    }

    if err := tx.Create(&user).Error; err != nil {
        tx.Rollback()
        return fmt.Errorf("failed to create user: %w", err)
    }

    // 创建用户资料
    profile := UserProfile{
        UserID: user.ID,
        Bio:    "Test user bio",
    }

    if err := tx.Create(&profile).Error; err != nil {
        tx.Rollback()
        return fmt.Errorf("failed to create profile: %w", err)
    }

    // 提交事务
    if err := tx.Commit().Error; err != nil {
        return fmt.Errorf("failed to commit transaction: %w", err)
    }

    return nil
}

// 使用 Transaction 方法(推荐)
func (tm *TransactionManager) RecommendedTransaction() error {
    return tm.db.Transaction(func(tx *gorm.DB) error {
        // 创建用户
        user := User{
            Username: "testuser2",
            Email:    "[email protected]",
            Status:   "active",
        }

        if err := tx.Create(&user).Error; err != nil {
            return err // 自动回滚
        }

        // 创建用户资料
        profile := UserProfile{
            UserID: user.ID,
            Bio:    "Test user 2 bio",
        }

        if err := tx.Create(&profile).Error; err != nil {
            return err // 自动回滚
        }

        return nil // 自动提交
    })
}

嵌套事务 #

// 嵌套事务示例
func (tm *TransactionManager) NestedTransaction() error {
    return tm.db.Transaction(func(tx *gorm.DB) error {
        // 外层事务:创建用户
        user := User{
            Username: "parentuser",
            Email:    "[email protected]",
            Status:   "active",
        }

        if err := tx.Create(&user).Error; err != nil {
            return err
        }

        // 内层事务:创建相关数据
        return tx.Transaction(func(tx2 *gorm.DB) error {
            // 创建用户资料
            profile := UserProfile{
                UserID: user.ID,
                Bio:    "Parent user bio",
            }

            if err := tx2.Create(&profile).Error; err != nil {
                return err
            }

            // 创建初始文章
            post := Post{
                Title:    "Welcome Post",
                Content:  "Welcome to our platform!",
                AuthorID: user.ID,
                Status:   "published",
            }

            if err := tx2.Create(&post).Error; err != nil {
                return err
            }

            return nil
        })
    })
}

// 保存点事务
func (tm *TransactionManager) SavepointTransaction() error {
    tx := tm.db.Begin()
    defer func() {
        if r := recover(); r != nil {
            tx.Rollback()
            panic(r)
        }
    }()

    // 创建用户
    user := User{
        Username: "savepointuser",
        Email:    "[email protected]",
        Status:   "active",
    }

    if err := tx.Create(&user).Error; err != nil {
        tx.Rollback()
        return err
    }

    // 创建保存点
    if err := tx.SavePoint("sp1").Error; err != nil {
        tx.Rollback()
        return err
    }

    // 尝试创建可能失败的操作
    profile := UserProfile{
        UserID: user.ID,
        Bio:    "Savepoint user bio",
    }

    if err := tx.Create(&profile).Error; err != nil {
        // 回滚到保存点
        if rollbackErr := tx.RollbackTo("sp1").Error; rollbackErr != nil {
            tx.Rollback()
            return rollbackErr
        }

        // 继续其他操作...
        log.Println("Profile creation failed, but user creation preserved")
    }

    // 提交事务
    return tx.Commit().Error
}

事务隔离级别 #

// 设置事务隔离级别
func (tm *TransactionManager) TransactionWithIsolationLevel() error {
    // 设置读已提交隔离级别
    tx := tm.db.Begin(&sql.TxOptions{
        Isolation: sql.LevelReadCommitted,
        ReadOnly:  false,
    })

    defer func() {
        if r := recover(); r != nil {
            tx.Rollback()
            panic(r)
        }
    }()

    // 执行需要特定隔离级别的操作
    var count int64
    if err := tx.Model(&User{}).Where("status = ?", "active").Count(&count).Error; err != nil {
        tx.Rollback()
        return err
    }

    // 基于计数结果执行操作
    if count > 100 {
        // 执行某些操作
        log.Printf("Active users count: %d", count)
    }

    return tx.Commit().Error
}

// 只读事务
func (tm *TransactionManager) ReadOnlyTransaction() error {
    tx := tm.db.Begin(&sql.TxOptions{
        ReadOnly: true,
    })

    defer tx.Rollback() // 只读事务总是回滚

    // 执行只读操作
    var users []User
    if err := tx.Where("status = ?", "active").Find(&users).Error; err != nil {
        return err
    }

    // 处理查询结果
    log.Printf("Found %d active users", len(users))

    return nil
}

分布式事务 #

// 分布式事务管理器
type DistributedTransactionManager struct {
    primaryDB   *gorm.DB
    secondaryDB *gorm.DB
}

func NewDistributedTransactionManager(primaryDB, secondaryDB *gorm.DB) *DistributedTransactionManager {
    return &DistributedTransactionManager{
        primaryDB:   primaryDB,
        secondaryDB: secondaryDB,
    }
}

// 两阶段提交模拟
func (dtm *DistributedTransactionManager) TwoPhaseCommit() error {
    // 阶段1:准备阶段
    tx1 := dtm.primaryDB.Begin()
    tx2 := dtm.secondaryDB.Begin()

    // 确保在出错时回滚所有事务
    defer func() {
        if r := recover(); r != nil {
            tx1.Rollback()
            tx2.Rollback()
            panic(r)
        }
    }()

    // 在主数据库中执行操作
    user := User{
        Username: "distuser",
        Email:    "[email protected]",
        Status:   "active",
    }

    if err := tx1.Create(&user).Error; err != nil {
        tx1.Rollback()
        tx2.Rollback()
        return fmt.Errorf("failed to create user in primary DB: %w", err)
    }

    // 在辅助数据库中执行操作
    userLog := UserLog{
        UserID:    user.ID,
        Action:    "created",
        Timestamp: time.Now(),
    }

    if err := tx2.Create(&userLog).Error; err != nil {
        tx1.Rollback()
        tx2.Rollback()
        return fmt.Errorf("failed to create user log in secondary DB: %w", err)
    }

    // 阶段2:提交阶段
    if err := tx1.Commit().Error; err != nil {
        tx2.Rollback()
        return fmt.Errorf("failed to commit primary transaction: %w", err)
    }

    if err := tx2.Commit().Error; err != nil {
        // 主事务已提交,需要补偿操作
        log.Printf("Secondary transaction failed, need compensation for user %d", user.ID)
        return fmt.Errorf("failed to commit secondary transaction: %w", err)
    }

    return nil
}

type UserLog struct {
    ID        uint      `gorm:"primaryKey"`
    UserID    uint      `gorm:"not null"`
    Action    string    `gorm:"size:50;not null"`
    Timestamp time.Time `gorm:"not null"`
}

数据一致性保证 #

乐观锁实现 #

// 带版本号的模型
type VersionedUser struct {
    BaseModel
    Username string `gorm:"uniqueIndex;size:50;not null"`
    Email    string `gorm:"uniqueIndex;size:100;not null"`
    Status   string `gorm:"size:20;default:active"`
    Version  int    `gorm:"not null;default:0"` // 版本号字段
}

// 乐观锁服务
type OptimisticLockService struct {
    db *gorm.DB
}

func NewOptimisticLockService(db *gorm.DB) *OptimisticLockService {
    return &OptimisticLockService{db: db}
}

// 乐观锁更新
func (ols *OptimisticLockService) UpdateWithOptimisticLock(userID uint, updates map[string]interface{}) error {
    return ols.db.Transaction(func(tx *gorm.DB) error {
        // 查询当前记录
        var user VersionedUser
        if err := tx.First(&user, userID).Error; err != nil {
            return err
        }

        currentVersion := user.Version

        // 更新记录,同时检查版本号
        updates["version"] = currentVersion + 1
        result := tx.Model(&user).
            Where("id = ? AND version = ?", userID, currentVersion).
            Updates(updates)

        if result.Error != nil {
            return result.Error
        }

        // 检查是否有记录被更新
        if result.RowsAffected == 0 {
            return fmt.Errorf("optimistic lock failed: record was modified by another transaction")
        }

        return nil
    })
}

// 重试机制的乐观锁更新
func (ols *OptimisticLockService) UpdateWithRetry(userID uint, updates map[string]interface{}, maxRetries int) error {
    for i := 0; i < maxRetries; i++ {
        err := ols.UpdateWithOptimisticLock(userID, updates)
        if err == nil {
            return nil
        }

        // 如果是乐观锁冲突,等待一段时间后重试
        if strings.Contains(err.Error(), "optimistic lock failed") {
            time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
            continue
        }

        // 其他错误直接返回
        return err
    }

    return fmt.Errorf("failed to update after %d retries", maxRetries)
}

悲观锁实现 #

// 悲观锁服务
type PessimisticLockService struct {
    db *gorm.DB
}

func NewPessimisticLockService(db *gorm.DB) *PessimisticLockService {
    return &PessimisticLockService{db: db}
}

// 悲观锁查询和更新
func (pls *PessimisticLockService) UpdateWithPessimisticLock(userID uint, updates map[string]interface{}) error {
    return pls.db.Transaction(func(tx *gorm.DB) error {
        // 使用 FOR UPDATE 锁定记录
        var user User
        if err := tx.Set("gorm:query_option", "FOR UPDATE").First(&user, userID).Error; err != nil {
            return err
        }

        // 执行更新操作
        if err := tx.Model(&user).Updates(updates).Error; err != nil {
            return err
        }

        return nil
    })
}

// 带超时的悲观锁
func (pls *PessimisticLockService) UpdateWithTimeout(userID uint, updates map[string]interface{}, timeout time.Duration) error {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    return pls.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
        var user User
        if err := tx.Set("gorm:query_option", "FOR UPDATE").First(&user, userID).Error; err != nil {
            return err
        }

        if err := tx.Model(&user).Updates(updates).Error; err != nil {
            return err
        }

        return nil
    })
}

数据一致性检查 #

// 数据一致性检查器
type ConsistencyChecker struct {
    db *gorm.DB
}

func NewConsistencyChecker(db *gorm.DB) *ConsistencyChecker {
    return &ConsistencyChecker{db: db}
}

// 检查外键一致性
func (cc *ConsistencyChecker) CheckForeignKeyConsistency() error {
    // 检查用户资料的用户ID是否存在
    var orphanedProfiles []UserProfile
    result := cc.db.Raw(`
        SELECT up.* FROM user_profiles up
        LEFT JOIN users u ON up.user_id = u.id
        WHERE u.id IS NULL
    `).Scan(&orphanedProfiles)

    if result.Error != nil {
        return result.Error
    }

    if len(orphanedProfiles) > 0 {
        log.Printf("Found %d orphaned user profiles", len(orphanedProfiles))
        // 可以选择清理或报告
    }

    // 检查文章的作者ID是否存在
    var orphanedPosts []Post
    result = cc.db.Raw(`
        SELECT p.* FROM posts p
        LEFT JOIN users u ON p.author_id = u.id
        WHERE u.id IS NULL
    `).Scan(&orphanedPosts)

    if result.Error != nil {
        return result.Error
    }

    if len(orphanedPosts) > 0 {
        log.Printf("Found %d orphaned posts", len(orphanedPosts))
    }

    return nil
}

// 检查数据完整性
func (cc *ConsistencyChecker) CheckDataIntegrity() (*IntegrityReport, error) {
    report := &IntegrityReport{}

    // 检查用户数据完整性
    var usersWithoutEmail int64
    cc.db.Model(&User{}).Where("email IS NULL OR email = ''").Count(&usersWithoutEmail)
    report.UsersWithoutEmail = usersWithoutEmail

    // 检查重复邮箱
    var duplicateEmails []DuplicateEmail
    cc.db.Raw(`
        SELECT email, COUNT(*) as count
        FROM users
        WHERE email IS NOT NULL AND email != ''
        GROUP BY email
        HAVING COUNT(*) > 1
    `).Scan(&duplicateEmails)
    report.DuplicateEmails = duplicateEmails

    // 检查文章状态一致性
    var invalidStatusPosts int64
    cc.db.Model(&Post{}).Where("status NOT IN ?", []string{"draft", "published", "archived"}).Count(&invalidStatusPosts)
    report.InvalidStatusPosts = invalidStatusPosts

    return report, nil
}

type IntegrityReport struct {
    UsersWithoutEmail   int64            `json:"users_without_email"`
    DuplicateEmails     []DuplicateEmail `json:"duplicate_emails"`
    InvalidStatusPosts  int64            `json:"invalid_status_posts"`
}

type DuplicateEmail struct {
    Email string `json:"email"`
    Count int    `json:"count"`
}

// 修复数据不一致问题
func (cc *ConsistencyChecker) FixInconsistencies() error {
    return cc.db.Transaction(func(tx *gorm.DB) error {
        // 清理孤立的用户资料
        result := tx.Exec(`
            DELETE up FROM user_profiles up
            LEFT JOIN users u ON up.user_id = u.id
            WHERE u.id IS NULL
        `)

        if result.Error != nil {
            return result.Error
        }

        log.Printf("Cleaned up %d orphaned user profiles", result.RowsAffected)

        // 清理孤立的文章
        result = tx.Exec(`
            DELETE p FROM posts p
            LEFT JOIN users u ON p.author_id = u.id
            WHERE u.id IS NULL
        `)

        if result.Error != nil {
            return result.Error
        }

        log.Printf("Cleaned up %d orphaned posts", result.RowsAffected)

        return nil
    })
}

完整的数据库管理服务 #

// 数据库管理服务
type DatabaseManagementService struct {
    migrationManager *VersionedMigrationManager
    transactionManager *TransactionManager
    consistencyChecker *ConsistencyChecker
}

func NewDatabaseManagementService(db *gorm.DB) *DatabaseManagementService {
    return &DatabaseManagementService{
        migrationManager:   NewVersionedMigrationManager(db),
        transactionManager: NewTransactionManager(db),
        consistencyChecker: NewConsistencyChecker(db),
    }
}

// 初始化数据库
func (dms *DatabaseManagementService) Initialize() error {
    log.Println("Initializing database...")

    // 注册迁移
    RegisterMigrations(dms.migrationManager)

    // 执行迁移
    if err := dms.migrationManager.Migrate(); err != nil {
        return fmt.Errorf("migration failed: %w", err)
    }

    // 检查数据一致性
    if err := dms.consistencyChecker.CheckForeignKeyConsistency(); err != nil {
        return fmt.Errorf("consistency check failed: %w", err)
    }

    log.Println("Database initialized successfully")
    return nil
}

// 健康检查
func (dms *DatabaseManagementService) HealthCheck() error {
    // 检查迁移状态
    status, err := dms.migrationManager.GetStatus()
    if err != nil {
        return fmt.Errorf("failed to get migration status: %w", err)
    }

    // 检查是否有未应用的迁移
    for _, s := range status {
        if !s.Applied {
            return fmt.Errorf("migration %s is not applied", s.Version)
        }
    }

    // 检查数据完整性
    report, err := dms.consistencyChecker.CheckDataIntegrity()
    if err != nil {
        return fmt.Errorf("integrity check failed: %w", err)
    }

    if report.UsersWithoutEmail > 0 || len(report.DuplicateEmails) > 0 || report.InvalidStatusPosts > 0 {
        log.Printf("Data integrity issues found: %+v", report)
    }

    return nil
}

// 维护任务
func (dms *DatabaseManagementService) MaintenanceTasks() error {
    log.Println("Running database maintenance tasks...")

    // 修复数据不一致问题
    if err := dms.consistencyChecker.FixInconsistencies(); err != nil {
        return fmt.Errorf("failed to fix inconsistencies: %w", err)
    }

    log.Println("Database maintenance completed")
    return nil
}

通过本节的学习,你已经全面掌握了数据库迁移管理、事务处理机制以及数据一致性保证策略。这些知识将帮助你构建健壮、可靠的数据持久化层,确保应用数据的完整性和一致性。

至此,我们已经完成了数据库操作章节的全部内容,涵盖了从基础连接配置到高级查询优化,从 GORM 基础操作到复杂的事务处理,为你的 Web 应用开发提供了完整的数据库解决方案。