3.4.4 数据库迁移与事务 #
数据库迁移和事务处理是企业级应用开发的重要组成部分。本节将深入探讨 GORM 的自动迁移功能、手动迁移管理、事务处理机制以及数据一致性保证策略。
数据库迁移管理 #
GORM 自动迁移 #
package main
import (
"fmt"
"log"
"gorm.io/gorm"
)
// 迁移管理器
type MigrationManager struct {
db *gorm.DB
}
func NewMigrationManager(db *gorm.DB) *MigrationManager {
return &MigrationManager{db: db}
}
// 自动迁移所有模型
func (mm *MigrationManager) AutoMigrate() error {
log.Println("Starting database migration...")
// 定义需要迁移的模型
models := []interface{}{
&User{},
&UserProfile{},
&Post{},
&Comment{},
&Tag{},
&Category{},
&PostTag{},
}
// 执行自动迁移
if err := mm.db.AutoMigrate(models...); err != nil {
return fmt.Errorf("failed to auto migrate: %w", err)
}
log.Println("Database migration completed successfully")
return nil
}
// 检查表是否存在
func (mm *MigrationManager) HasTable(model interface{}) bool {
return mm.db.Migrator().HasTable(model)
}
// 创建表
func (mm *MigrationManager) CreateTable(model interface{}) error {
if mm.HasTable(model) {
log.Printf("Table for %T already exists", model)
return nil
}
if err := mm.db.Migrator().CreateTable(model); err != nil {
return fmt.Errorf("failed to create table for %T: %w", model, err)
}
log.Printf("Created table for %T", model)
return nil
}
// 删除表
func (mm *MigrationManager) DropTable(model interface{}) error {
if !mm.HasTable(model) {
log.Printf("Table for %T does not exist", model)
return nil
}
if err := mm.db.Migrator().DropTable(model); err != nil {
return fmt.Errorf("failed to drop table for %T: %w", model, err)
}
log.Printf("Dropped table for %T", model)
return nil
}
// 重命名表
func (mm *MigrationManager) RenameTable(oldName, newName string) error {
if err := mm.db.Migrator().RenameTable(oldName, newName); err != nil {
return fmt.Errorf("failed to rename table from %s to %s: %w", oldName, newName, err)
}
log.Printf("Renamed table from %s to %s", oldName, newName)
return nil
}
字段级迁移操作 #
// 字段迁移操作
func (mm *MigrationManager) FieldOperations() error {
// 检查字段是否存在
if mm.db.Migrator().HasColumn(&User{}, "middle_name") {
log.Println("Column middle_name already exists")
} else {
// 添加字段
if err := mm.db.Migrator().AddColumn(&User{}, "middle_name"); err != nil {
return fmt.Errorf("failed to add column middle_name: %w", err)
}
log.Println("Added column middle_name")
}
// 修改字段类型
if err := mm.db.Migrator().AlterColumn(&User{}, "bio"); err != nil {
return fmt.Errorf("failed to alter column bio: %w", err)
}
// 重命名字段
if err := mm.db.Migrator().RenameColumn(&User{}, "old_name", "new_name"); err != nil {
return fmt.Errorf("failed to rename column: %w", err)
}
// 删除字段
if err := mm.db.Migrator().DropColumn(&User{}, "unused_field"); err != nil {
return fmt.Errorf("failed to drop column: %w", err)
}
return nil
}
// 索引操作
func (mm *MigrationManager) IndexOperations() error {
// 创建索引
if err := mm.db.Migrator().CreateIndex(&User{}, "Email"); err != nil {
return fmt.Errorf("failed to create index on email: %w", err)
}
// 创建复合索引
if err := mm.db.Migrator().CreateIndex(&User{}, "idx_name_status"); err != nil {
return fmt.Errorf("failed to create composite index: %w", err)
}
// 检查索引是否存在
if mm.db.Migrator().HasIndex(&User{}, "Email") {
log.Println("Index on email exists")
}
// 删除索引
if err := mm.db.Migrator().DropIndex(&User{}, "Email"); err != nil {
return fmt.Errorf("failed to drop index: %w", err)
}
return nil
}
// 约束操作
func (mm *MigrationManager) ConstraintOperations() error {
// 创建外键约束
if err := mm.db.Migrator().CreateConstraint(&User{}, "Posts"); err != nil {
return fmt.Errorf("failed to create foreign key constraint: %w", err)
}
// 检查约束是否存在
if mm.db.Migrator().HasConstraint(&User{}, "Posts") {
log.Println("Foreign key constraint exists")
}
// 删除约束
if err := mm.db.Migrator().DropConstraint(&User{}, "Posts"); err != nil {
return fmt.Errorf("failed to drop constraint: %w", err)
}
return nil
}
版本化迁移系统 #
import (
"sort"
"strconv"
"strings"
"time"
)
// 迁移版本
type Migration struct {
Version string
Name string
Up func(*gorm.DB) error
Down func(*gorm.DB) error
CreatedAt time.Time
}
// 迁移记录表
type MigrationRecord struct {
ID uint `gorm:"primaryKey"`
Version string `gorm:"uniqueIndex;size:50"`
Name string `gorm:"size:255"`
AppliedAt time.Time
}
// 版本化迁移管理器
type VersionedMigrationManager struct {
db *gorm.DB
migrations []Migration
}
func NewVersionedMigrationManager(db *gorm.DB) *VersionedMigrationManager {
vmm := &VersionedMigrationManager{
db: db,
migrations: make([]Migration, 0),
}
// 确保迁移记录表存在
db.AutoMigrate(&MigrationRecord{})
return vmm
}
// 注册迁移
func (vmm *VersionedMigrationManager) RegisterMigration(migration Migration) {
vmm.migrations = append(vmm.migrations, migration)
}
// 执行迁移
func (vmm *VersionedMigrationManager) Migrate() error {
// 按版本号排序
sort.Slice(vmm.migrations, func(i, j int) bool {
return vmm.migrations[i].Version < vmm.migrations[j].Version
})
for _, migration := range vmm.migrations {
// 检查是否已经应用
var record MigrationRecord
result := vmm.db.Where("version = ?", migration.Version).First(&record)
if result.Error == nil {
log.Printf("Migration %s already applied, skipping", migration.Version)
continue
}
if !errors.Is(result.Error, gorm.ErrRecordNotFound) {
return fmt.Errorf("failed to check migration status: %w", result.Error)
}
log.Printf("Applying migration %s: %s", migration.Version, migration.Name)
// 在事务中执行迁移
err := vmm.db.Transaction(func(tx *gorm.DB) error {
// 执行迁移
if err := migration.Up(tx); err != nil {
return fmt.Errorf("migration up failed: %w", err)
}
// 记录迁移
record := MigrationRecord{
Version: migration.Version,
Name: migration.Name,
AppliedAt: time.Now(),
}
if err := tx.Create(&record).Error; err != nil {
return fmt.Errorf("failed to record migration: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf("failed to apply migration %s: %w", migration.Version, err)
}
log.Printf("Successfully applied migration %s", migration.Version)
}
return nil
}
// 回滚迁移
func (vmm *VersionedMigrationManager) Rollback(targetVersion string) error {
// 获取已应用的迁移
var records []MigrationRecord
vmm.db.Where("version > ?", targetVersion).Order("version DESC").Find(&records)
for _, record := range records {
// 找到对应的迁移
var migration *Migration
for _, m := range vmm.migrations {
if m.Version == record.Version {
migration = &m
break
}
}
if migration == nil {
return fmt.Errorf("migration %s not found", record.Version)
}
log.Printf("Rolling back migration %s: %s", migration.Version, migration.Name)
// 在事务中执行回滚
err := vmm.db.Transaction(func(tx *gorm.DB) error {
// 执行回滚
if err := migration.Down(tx); err != nil {
return fmt.Errorf("migration down failed: %w", err)
}
// 删除迁移记录
if err := tx.Delete(&record).Error; err != nil {
return fmt.Errorf("failed to delete migration record: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf("failed to rollback migration %s: %w", migration.Version, err)
}
log.Printf("Successfully rolled back migration %s", migration.Version)
}
return nil
}
// 获取迁移状态
func (vmm *VersionedMigrationManager) GetStatus() ([]MigrationStatus, error) {
var records []MigrationRecord
vmm.db.Order("version").Find(&records)
recordMap := make(map[string]MigrationRecord)
for _, record := range records {
recordMap[record.Version] = record
}
var status []MigrationStatus
for _, migration := range vmm.migrations {
ms := MigrationStatus{
Version: migration.Version,
Name: migration.Name,
Applied: false,
}
if record, exists := recordMap[migration.Version]; exists {
ms.Applied = true
ms.AppliedAt = &record.AppliedAt
}
status = append(status, ms)
}
return status, nil
}
type MigrationStatus struct {
Version string `json:"version"`
Name string `json:"name"`
Applied bool `json:"applied"`
AppliedAt *time.Time `json:"applied_at,omitempty"`
}
迁移示例 #
// 定义具体的迁移
func RegisterMigrations(vmm *VersionedMigrationManager) {
// 迁移 001: 创建用户表
vmm.RegisterMigration(Migration{
Version: "001",
Name: "create_users_table",
Up: func(db *gorm.DB) error {
return db.AutoMigrate(&User{})
},
Down: func(db *gorm.DB) error {
return db.Migrator().DropTable(&User{})
},
})
// 迁移 002: 添加用户资料表
vmm.RegisterMigration(Migration{
Version: "002",
Name: "create_user_profiles_table",
Up: func(db *gorm.DB) error {
return db.AutoMigrate(&UserProfile{})
},
Down: func(db *gorm.DB) error {
return db.Migrator().DropTable(&UserProfile{})
},
})
// 迁移 003: 添加中间名字段
vmm.RegisterMigration(Migration{
Version: "003",
Name: "add_middle_name_to_users",
Up: func(db *gorm.DB) error {
return db.Migrator().AddColumn(&User{}, "middle_name")
},
Down: func(db *gorm.DB) error {
return db.Migrator().DropColumn(&User{}, "middle_name")
},
})
// 迁移 004: 创建索引
vmm.RegisterMigration(Migration{
Version: "004",
Name: "create_user_email_index",
Up: func(db *gorm.DB) error {
return db.Exec("CREATE INDEX idx_users_email ON users(email)").Error
},
Down: func(db *gorm.DB) error {
return db.Exec("DROP INDEX idx_users_email ON users").Error
},
})
// 迁移 005: 数据迁移示例
vmm.RegisterMigration(Migration{
Version: "005",
Name: "migrate_user_data",
Up: func(db *gorm.DB) error {
// 数据迁移逻辑
return db.Exec(`
UPDATE users
SET full_name = CONCAT(first_name, ' ', last_name)
WHERE full_name IS NULL OR full_name = ''
`).Error
},
Down: func(db *gorm.DB) error {
// 回滚数据迁移
return db.Exec("UPDATE users SET full_name = NULL").Error
},
})
}
事务处理机制 #
基础事务操作 #
// 事务管理器
type TransactionManager struct {
db *gorm.DB
}
func NewTransactionManager(db *gorm.DB) *TransactionManager {
return &TransactionManager{db: db}
}
// 基础事务示例
func (tm *TransactionManager) BasicTransaction() error {
// 开始事务
tx := tm.db.Begin()
// 检查事务是否成功开始
if tx.Error != nil {
return fmt.Errorf("failed to begin transaction: %w", tx.Error)
}
// 使用 defer 确保事务被正确处理
defer func() {
if r := recover(); r != nil {
tx.Rollback()
panic(r) // 重新抛出 panic
}
}()
// 执行数据库操作
user := User{
Username: "testuser",
Email: "[email protected]",
Status: "active",
}
if err := tx.Create(&user).Error; err != nil {
tx.Rollback()
return fmt.Errorf("failed to create user: %w", err)
}
// 创建用户资料
profile := UserProfile{
UserID: user.ID,
Bio: "Test user bio",
}
if err := tx.Create(&profile).Error; err != nil {
tx.Rollback()
return fmt.Errorf("failed to create profile: %w", err)
}
// 提交事务
if err := tx.Commit().Error; err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
// 使用 Transaction 方法(推荐)
func (tm *TransactionManager) RecommendedTransaction() error {
return tm.db.Transaction(func(tx *gorm.DB) error {
// 创建用户
user := User{
Username: "testuser2",
Email: "[email protected]",
Status: "active",
}
if err := tx.Create(&user).Error; err != nil {
return err // 自动回滚
}
// 创建用户资料
profile := UserProfile{
UserID: user.ID,
Bio: "Test user 2 bio",
}
if err := tx.Create(&profile).Error; err != nil {
return err // 自动回滚
}
return nil // 自动提交
})
}
嵌套事务 #
// 嵌套事务示例
func (tm *TransactionManager) NestedTransaction() error {
return tm.db.Transaction(func(tx *gorm.DB) error {
// 外层事务:创建用户
user := User{
Username: "parentuser",
Email: "[email protected]",
Status: "active",
}
if err := tx.Create(&user).Error; err != nil {
return err
}
// 内层事务:创建相关数据
return tx.Transaction(func(tx2 *gorm.DB) error {
// 创建用户资料
profile := UserProfile{
UserID: user.ID,
Bio: "Parent user bio",
}
if err := tx2.Create(&profile).Error; err != nil {
return err
}
// 创建初始文章
post := Post{
Title: "Welcome Post",
Content: "Welcome to our platform!",
AuthorID: user.ID,
Status: "published",
}
if err := tx2.Create(&post).Error; err != nil {
return err
}
return nil
})
})
}
// 保存点事务
func (tm *TransactionManager) SavepointTransaction() error {
tx := tm.db.Begin()
defer func() {
if r := recover(); r != nil {
tx.Rollback()
panic(r)
}
}()
// 创建用户
user := User{
Username: "savepointuser",
Email: "[email protected]",
Status: "active",
}
if err := tx.Create(&user).Error; err != nil {
tx.Rollback()
return err
}
// 创建保存点
if err := tx.SavePoint("sp1").Error; err != nil {
tx.Rollback()
return err
}
// 尝试创建可能失败的操作
profile := UserProfile{
UserID: user.ID,
Bio: "Savepoint user bio",
}
if err := tx.Create(&profile).Error; err != nil {
// 回滚到保存点
if rollbackErr := tx.RollbackTo("sp1").Error; rollbackErr != nil {
tx.Rollback()
return rollbackErr
}
// 继续其他操作...
log.Println("Profile creation failed, but user creation preserved")
}
// 提交事务
return tx.Commit().Error
}
事务隔离级别 #
// 设置事务隔离级别
func (tm *TransactionManager) TransactionWithIsolationLevel() error {
// 设置读已提交隔离级别
tx := tm.db.Begin(&sql.TxOptions{
Isolation: sql.LevelReadCommitted,
ReadOnly: false,
})
defer func() {
if r := recover(); r != nil {
tx.Rollback()
panic(r)
}
}()
// 执行需要特定隔离级别的操作
var count int64
if err := tx.Model(&User{}).Where("status = ?", "active").Count(&count).Error; err != nil {
tx.Rollback()
return err
}
// 基于计数结果执行操作
if count > 100 {
// 执行某些操作
log.Printf("Active users count: %d", count)
}
return tx.Commit().Error
}
// 只读事务
func (tm *TransactionManager) ReadOnlyTransaction() error {
tx := tm.db.Begin(&sql.TxOptions{
ReadOnly: true,
})
defer tx.Rollback() // 只读事务总是回滚
// 执行只读操作
var users []User
if err := tx.Where("status = ?", "active").Find(&users).Error; err != nil {
return err
}
// 处理查询结果
log.Printf("Found %d active users", len(users))
return nil
}
分布式事务 #
// 分布式事务管理器
type DistributedTransactionManager struct {
primaryDB *gorm.DB
secondaryDB *gorm.DB
}
func NewDistributedTransactionManager(primaryDB, secondaryDB *gorm.DB) *DistributedTransactionManager {
return &DistributedTransactionManager{
primaryDB: primaryDB,
secondaryDB: secondaryDB,
}
}
// 两阶段提交模拟
func (dtm *DistributedTransactionManager) TwoPhaseCommit() error {
// 阶段1:准备阶段
tx1 := dtm.primaryDB.Begin()
tx2 := dtm.secondaryDB.Begin()
// 确保在出错时回滚所有事务
defer func() {
if r := recover(); r != nil {
tx1.Rollback()
tx2.Rollback()
panic(r)
}
}()
// 在主数据库中执行操作
user := User{
Username: "distuser",
Email: "[email protected]",
Status: "active",
}
if err := tx1.Create(&user).Error; err != nil {
tx1.Rollback()
tx2.Rollback()
return fmt.Errorf("failed to create user in primary DB: %w", err)
}
// 在辅助数据库中执行操作
userLog := UserLog{
UserID: user.ID,
Action: "created",
Timestamp: time.Now(),
}
if err := tx2.Create(&userLog).Error; err != nil {
tx1.Rollback()
tx2.Rollback()
return fmt.Errorf("failed to create user log in secondary DB: %w", err)
}
// 阶段2:提交阶段
if err := tx1.Commit().Error; err != nil {
tx2.Rollback()
return fmt.Errorf("failed to commit primary transaction: %w", err)
}
if err := tx2.Commit().Error; err != nil {
// 主事务已提交,需要补偿操作
log.Printf("Secondary transaction failed, need compensation for user %d", user.ID)
return fmt.Errorf("failed to commit secondary transaction: %w", err)
}
return nil
}
type UserLog struct {
ID uint `gorm:"primaryKey"`
UserID uint `gorm:"not null"`
Action string `gorm:"size:50;not null"`
Timestamp time.Time `gorm:"not null"`
}
数据一致性保证 #
乐观锁实现 #
// 带版本号的模型
type VersionedUser struct {
BaseModel
Username string `gorm:"uniqueIndex;size:50;not null"`
Email string `gorm:"uniqueIndex;size:100;not null"`
Status string `gorm:"size:20;default:active"`
Version int `gorm:"not null;default:0"` // 版本号字段
}
// 乐观锁服务
type OptimisticLockService struct {
db *gorm.DB
}
func NewOptimisticLockService(db *gorm.DB) *OptimisticLockService {
return &OptimisticLockService{db: db}
}
// 乐观锁更新
func (ols *OptimisticLockService) UpdateWithOptimisticLock(userID uint, updates map[string]interface{}) error {
return ols.db.Transaction(func(tx *gorm.DB) error {
// 查询当前记录
var user VersionedUser
if err := tx.First(&user, userID).Error; err != nil {
return err
}
currentVersion := user.Version
// 更新记录,同时检查版本号
updates["version"] = currentVersion + 1
result := tx.Model(&user).
Where("id = ? AND version = ?", userID, currentVersion).
Updates(updates)
if result.Error != nil {
return result.Error
}
// 检查是否有记录被更新
if result.RowsAffected == 0 {
return fmt.Errorf("optimistic lock failed: record was modified by another transaction")
}
return nil
})
}
// 重试机制的乐观锁更新
func (ols *OptimisticLockService) UpdateWithRetry(userID uint, updates map[string]interface{}, maxRetries int) error {
for i := 0; i < maxRetries; i++ {
err := ols.UpdateWithOptimisticLock(userID, updates)
if err == nil {
return nil
}
// 如果是乐观锁冲突,等待一段时间后重试
if strings.Contains(err.Error(), "optimistic lock failed") {
time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
continue
}
// 其他错误直接返回
return err
}
return fmt.Errorf("failed to update after %d retries", maxRetries)
}
悲观锁实现 #
// 悲观锁服务
type PessimisticLockService struct {
db *gorm.DB
}
func NewPessimisticLockService(db *gorm.DB) *PessimisticLockService {
return &PessimisticLockService{db: db}
}
// 悲观锁查询和更新
func (pls *PessimisticLockService) UpdateWithPessimisticLock(userID uint, updates map[string]interface{}) error {
return pls.db.Transaction(func(tx *gorm.DB) error {
// 使用 FOR UPDATE 锁定记录
var user User
if err := tx.Set("gorm:query_option", "FOR UPDATE").First(&user, userID).Error; err != nil {
return err
}
// 执行更新操作
if err := tx.Model(&user).Updates(updates).Error; err != nil {
return err
}
return nil
})
}
// 带超时的悲观锁
func (pls *PessimisticLockService) UpdateWithTimeout(userID uint, updates map[string]interface{}, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return pls.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var user User
if err := tx.Set("gorm:query_option", "FOR UPDATE").First(&user, userID).Error; err != nil {
return err
}
if err := tx.Model(&user).Updates(updates).Error; err != nil {
return err
}
return nil
})
}
数据一致性检查 #
// 数据一致性检查器
type ConsistencyChecker struct {
db *gorm.DB
}
func NewConsistencyChecker(db *gorm.DB) *ConsistencyChecker {
return &ConsistencyChecker{db: db}
}
// 检查外键一致性
func (cc *ConsistencyChecker) CheckForeignKeyConsistency() error {
// 检查用户资料的用户ID是否存在
var orphanedProfiles []UserProfile
result := cc.db.Raw(`
SELECT up.* FROM user_profiles up
LEFT JOIN users u ON up.user_id = u.id
WHERE u.id IS NULL
`).Scan(&orphanedProfiles)
if result.Error != nil {
return result.Error
}
if len(orphanedProfiles) > 0 {
log.Printf("Found %d orphaned user profiles", len(orphanedProfiles))
// 可以选择清理或报告
}
// 检查文章的作者ID是否存在
var orphanedPosts []Post
result = cc.db.Raw(`
SELECT p.* FROM posts p
LEFT JOIN users u ON p.author_id = u.id
WHERE u.id IS NULL
`).Scan(&orphanedPosts)
if result.Error != nil {
return result.Error
}
if len(orphanedPosts) > 0 {
log.Printf("Found %d orphaned posts", len(orphanedPosts))
}
return nil
}
// 检查数据完整性
func (cc *ConsistencyChecker) CheckDataIntegrity() (*IntegrityReport, error) {
report := &IntegrityReport{}
// 检查用户数据完整性
var usersWithoutEmail int64
cc.db.Model(&User{}).Where("email IS NULL OR email = ''").Count(&usersWithoutEmail)
report.UsersWithoutEmail = usersWithoutEmail
// 检查重复邮箱
var duplicateEmails []DuplicateEmail
cc.db.Raw(`
SELECT email, COUNT(*) as count
FROM users
WHERE email IS NOT NULL AND email != ''
GROUP BY email
HAVING COUNT(*) > 1
`).Scan(&duplicateEmails)
report.DuplicateEmails = duplicateEmails
// 检查文章状态一致性
var invalidStatusPosts int64
cc.db.Model(&Post{}).Where("status NOT IN ?", []string{"draft", "published", "archived"}).Count(&invalidStatusPosts)
report.InvalidStatusPosts = invalidStatusPosts
return report, nil
}
type IntegrityReport struct {
UsersWithoutEmail int64 `json:"users_without_email"`
DuplicateEmails []DuplicateEmail `json:"duplicate_emails"`
InvalidStatusPosts int64 `json:"invalid_status_posts"`
}
type DuplicateEmail struct {
Email string `json:"email"`
Count int `json:"count"`
}
// 修复数据不一致问题
func (cc *ConsistencyChecker) FixInconsistencies() error {
return cc.db.Transaction(func(tx *gorm.DB) error {
// 清理孤立的用户资料
result := tx.Exec(`
DELETE up FROM user_profiles up
LEFT JOIN users u ON up.user_id = u.id
WHERE u.id IS NULL
`)
if result.Error != nil {
return result.Error
}
log.Printf("Cleaned up %d orphaned user profiles", result.RowsAffected)
// 清理孤立的文章
result = tx.Exec(`
DELETE p FROM posts p
LEFT JOIN users u ON p.author_id = u.id
WHERE u.id IS NULL
`)
if result.Error != nil {
return result.Error
}
log.Printf("Cleaned up %d orphaned posts", result.RowsAffected)
return nil
})
}
完整的数据库管理服务 #
// 数据库管理服务
type DatabaseManagementService struct {
migrationManager *VersionedMigrationManager
transactionManager *TransactionManager
consistencyChecker *ConsistencyChecker
}
func NewDatabaseManagementService(db *gorm.DB) *DatabaseManagementService {
return &DatabaseManagementService{
migrationManager: NewVersionedMigrationManager(db),
transactionManager: NewTransactionManager(db),
consistencyChecker: NewConsistencyChecker(db),
}
}
// 初始化数据库
func (dms *DatabaseManagementService) Initialize() error {
log.Println("Initializing database...")
// 注册迁移
RegisterMigrations(dms.migrationManager)
// 执行迁移
if err := dms.migrationManager.Migrate(); err != nil {
return fmt.Errorf("migration failed: %w", err)
}
// 检查数据一致性
if err := dms.consistencyChecker.CheckForeignKeyConsistency(); err != nil {
return fmt.Errorf("consistency check failed: %w", err)
}
log.Println("Database initialized successfully")
return nil
}
// 健康检查
func (dms *DatabaseManagementService) HealthCheck() error {
// 检查迁移状态
status, err := dms.migrationManager.GetStatus()
if err != nil {
return fmt.Errorf("failed to get migration status: %w", err)
}
// 检查是否有未应用的迁移
for _, s := range status {
if !s.Applied {
return fmt.Errorf("migration %s is not applied", s.Version)
}
}
// 检查数据完整性
report, err := dms.consistencyChecker.CheckDataIntegrity()
if err != nil {
return fmt.Errorf("integrity check failed: %w", err)
}
if report.UsersWithoutEmail > 0 || len(report.DuplicateEmails) > 0 || report.InvalidStatusPosts > 0 {
log.Printf("Data integrity issues found: %+v", report)
}
return nil
}
// 维护任务
func (dms *DatabaseManagementService) MaintenanceTasks() error {
log.Println("Running database maintenance tasks...")
// 修复数据不一致问题
if err := dms.consistencyChecker.FixInconsistencies(); err != nil {
return fmt.Errorf("failed to fix inconsistencies: %w", err)
}
log.Println("Database maintenance completed")
return nil
}
通过本节的学习,你已经全面掌握了数据库迁移管理、事务处理机制以及数据一致性保证策略。这些知识将帮助你构建健壮、可靠的数据持久化层,确保应用数据的完整性和一致性。
至此,我们已经完成了数据库操作章节的全部内容,涵盖了从基础连接配置到高级查询优化,从 GORM 基础操作到复杂的事务处理,为你的 Web 应用开发提供了完整的数据库解决方案。