3.4.1 数据库连接与配置

3.4.1 数据库连接与配置 #

数据库连接是 Web 应用的基础设施,正确的连接配置和管理对应用的性能和稳定性至关重要。本节将详细介绍 Go 语言中各种数据库的连接方法、连接池配置以及最佳实践。

数据库驱动安装与配置 #

常用数据库驱动 #

Go 语言支持多种数据库,每种数据库都有相应的驱动程序:

// MySQL 驱动
import _ "github.com/go-sql-driver/mysql"

// PostgreSQL 驱动
import _ "github.com/lib/pq"

// SQLite 驱动
import _ "github.com/mattn/go-sqlite3"

// SQL Server 驱动
import _ "github.com/denisenkom/go-mssqldb"

// Oracle 驱动
import _ "github.com/godror/godror"

MySQL 连接配置 #

package main

import (
    "database/sql"
    "fmt"
    "log"
    "time"

    _ "github.com/go-sql-driver/mysql"
)

// MySQL 连接配置结构
type MySQLConfig struct {
    Host     string
    Port     int
    Username string
    Password string
    Database string
    Charset  string
    Timeout  time.Duration

    // 连接池配置
    MaxOpenConns    int
    MaxIdleConns    int
    ConnMaxLifetime time.Duration
    ConnMaxIdleTime time.Duration
}

// 默认 MySQL 配置
func DefaultMySQLConfig() *MySQLConfig {
    return &MySQLConfig{
        Host:            "localhost",
        Port:            3306,
        Username:        "root",
        Password:        "",
        Database:        "test",
        Charset:         "utf8mb4",
        Timeout:         10 * time.Second,
        MaxOpenConns:    100,
        MaxIdleConns:    10,
        ConnMaxLifetime: time.Hour,
        ConnMaxIdleTime: 10 * time.Minute,
    }
}

// 构建 MySQL DSN
func (c *MySQLConfig) DSN() string {
    return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=%s&parseTime=true&loc=Local&timeout=%s",
        c.Username,
        c.Password,
        c.Host,
        c.Port,
        c.Database,
        c.Charset,
        c.Timeout,
    )
}

// 创建 MySQL 连接
func NewMySQLConnection(config *MySQLConfig) (*sql.DB, error) {
    db, err := sql.Open("mysql", config.DSN())
    if err != nil {
        return nil, fmt.Errorf("failed to open database: %w", err)
    }

    // 配置连接池
    db.SetMaxOpenConns(config.MaxOpenConns)
    db.SetMaxIdleConns(config.MaxIdleConns)
    db.SetConnMaxLifetime(config.ConnMaxLifetime)
    db.SetConnMaxIdleTime(config.ConnMaxIdleTime)

    // 测试连接
    if err := db.Ping(); err != nil {
        db.Close()
        return nil, fmt.Errorf("failed to ping database: %w", err)
    }

    log.Printf("MySQL connected successfully to %s:%d/%s",
        config.Host, config.Port, config.Database)

    return db, nil
}

PostgreSQL 连接配置 #

import (
    "database/sql"
    "fmt"

    _ "github.com/lib/pq"
)

// PostgreSQL 连接配置
type PostgreSQLConfig struct {
    Host     string
    Port     int
    Username string
    Password string
    Database string
    SSLMode  string
    Timezone string

    // 连接池配置
    MaxOpenConns    int
    MaxIdleConns    int
    ConnMaxLifetime time.Duration
}

func DefaultPostgreSQLConfig() *PostgreSQLConfig {
    return &PostgreSQLConfig{
        Host:            "localhost",
        Port:            5432,
        Username:        "postgres",
        Password:        "",
        Database:        "postgres",
        SSLMode:         "disable",
        Timezone:        "UTC",
        MaxOpenConns:    100,
        MaxIdleConns:    10,
        ConnMaxLifetime: time.Hour,
    }
}

func (c *PostgreSQLConfig) DSN() string {
    return fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s TimeZone=%s",
        c.Host,
        c.Port,
        c.Username,
        c.Password,
        c.Database,
        c.SSLMode,
        c.Timezone,
    )
}

func NewPostgreSQLConnection(config *PostgreSQLConfig) (*sql.DB, error) {
    db, err := sql.Open("postgres", config.DSN())
    if err != nil {
        return nil, fmt.Errorf("failed to open PostgreSQL database: %w", err)
    }

    // 配置连接池
    db.SetMaxOpenConns(config.MaxOpenConns)
    db.SetMaxIdleConns(config.MaxIdleConns)
    db.SetConnMaxLifetime(config.ConnMaxLifetime)

    // 测试连接
    if err := db.Ping(); err != nil {
        db.Close()
        return nil, fmt.Errorf("failed to ping PostgreSQL database: %w", err)
    }

    return db, nil
}

SQLite 连接配置 #

import (
    "database/sql"
    "fmt"
    "os"
    "path/filepath"

    _ "github.com/mattn/go-sqlite3"
)

type SQLiteConfig struct {
    Path            string
    Mode            string // ro, rw, rwc, memory
    Cache           string // shared, private
    MaxOpenConns    int
    MaxIdleConns    int
    ConnMaxLifetime time.Duration
}

func DefaultSQLiteConfig() *SQLiteConfig {
    return &SQLiteConfig{
        Path:            "./app.db",
        Mode:            "rwc",
        Cache:           "shared",
        MaxOpenConns:    1, // SQLite 建议单连接
        MaxIdleConns:    1,
        ConnMaxLifetime: time.Hour,
    }
}

func (c *SQLiteConfig) DSN() string {
    return fmt.Sprintf("file:%s?mode=%s&cache=%s&_fk=1",
        c.Path,
        c.Mode,
        c.Cache,
    )
}

func NewSQLiteConnection(config *SQLiteConfig) (*sql.DB, error) {
    // 确保目录存在
    dir := filepath.Dir(config.Path)
    if err := os.MkdirAll(dir, 0755); err != nil {
        return nil, fmt.Errorf("failed to create directory: %w", err)
    }

    db, err := sql.Open("sqlite3", config.DSN())
    if err != nil {
        return nil, fmt.Errorf("failed to open SQLite database: %w", err)
    }

    // SQLite 特殊配置
    db.SetMaxOpenConns(config.MaxOpenConns)
    db.SetMaxIdleConns(config.MaxIdleConns)
    db.SetConnMaxLifetime(config.ConnMaxLifetime)

    // 启用 WAL 模式以提高并发性能
    if _, err := db.Exec("PRAGMA journal_mode=WAL;"); err != nil {
        db.Close()
        return nil, fmt.Errorf("failed to enable WAL mode: %w", err)
    }

    return db, nil
}

连接池管理 #

连接池配置详解 #

// 连接池管理器
type ConnectionPoolManager struct {
    db     *sql.DB
    config *PoolConfig
    stats  *PoolStats
}

type PoolConfig struct {
    MaxOpenConns    int           // 最大打开连接数
    MaxIdleConns    int           // 最大空闲连接数
    ConnMaxLifetime time.Duration // 连接最大生存时间
    ConnMaxIdleTime time.Duration // 连接最大空闲时间
}

type PoolStats struct {
    OpenConnections int
    InUse          int
    Idle           int
    WaitCount      int64
    WaitDuration   time.Duration
    MaxIdleClosed  int64
    MaxLifetimeClosed int64
}

func NewConnectionPoolManager(db *sql.DB, config *PoolConfig) *ConnectionPoolManager {
    return &ConnectionPoolManager{
        db:     db,
        config: config,
        stats:  &PoolStats{},
    }
}

// 配置连接池
func (cpm *ConnectionPoolManager) ConfigurePool() {
    cpm.db.SetMaxOpenConns(cpm.config.MaxOpenConns)
    cpm.db.SetMaxIdleConns(cpm.config.MaxIdleConns)
    cpm.db.SetConnMaxLifetime(cpm.config.ConnMaxLifetime)
    cpm.db.SetConnMaxIdleTime(cpm.config.ConnMaxIdleTime)
}

// 获取连接池统计信息
func (cpm *ConnectionPoolManager) GetStats() *PoolStats {
    dbStats := cpm.db.Stats()

    return &PoolStats{
        OpenConnections:   dbStats.OpenConnections,
        InUse:            dbStats.InUse,
        Idle:             dbStats.Idle,
        WaitCount:        dbStats.WaitCount,
        WaitDuration:     dbStats.WaitDuration,
        MaxIdleClosed:    dbStats.MaxIdleClosed,
        MaxLifetimeClosed: dbStats.MaxLifetimeClosed,
    }
}

// 监控连接池状态
func (cpm *ConnectionPoolManager) MonitorPool(interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()

    for range ticker.C {
        stats := cpm.GetStats()

        log.Printf("Connection Pool Stats - Open: %d, InUse: %d, Idle: %d, Wait: %d",
            stats.OpenConnections,
            stats.InUse,
            stats.Idle,
            stats.WaitCount,
        )

        // 检查连接池健康状态
        if stats.WaitCount > 100 {
            log.Printf("Warning: High connection wait count: %d", stats.WaitCount)
        }

        if stats.OpenConnections >= cpm.config.MaxOpenConns {
            log.Printf("Warning: Connection pool at maximum capacity")
        }
    }
}

连接池优化策略 #

// 连接池优化器
type PoolOptimizer struct {
    manager *ConnectionPoolManager
    metrics *PoolMetrics
}

type PoolMetrics struct {
    avgResponseTime time.Duration
    errorRate      float64
    throughput     int64
}

func NewPoolOptimizer(manager *ConnectionPoolManager) *PoolOptimizer {
    return &PoolOptimizer{
        manager: manager,
        metrics: &PoolMetrics{},
    }
}

// 动态调整连接池大小
func (po *PoolOptimizer) OptimizePool() {
    stats := po.manager.GetStats()

    // 根据使用情况调整最大连接数
    utilizationRate := float64(stats.InUse) / float64(stats.OpenConnections)

    if utilizationRate > 0.8 && stats.WaitCount > 0 {
        // 高使用率且有等待,增加连接数
        newMaxOpen := int(float64(po.manager.config.MaxOpenConns) * 1.2)
        if newMaxOpen <= 200 { // 设置上限
            po.manager.config.MaxOpenConns = newMaxOpen
            po.manager.ConfigurePool()
            log.Printf("Increased max open connections to %d", newMaxOpen)
        }
    } else if utilizationRate < 0.3 && stats.OpenConnections > 10 {
        // 低使用率,减少连接数
        newMaxOpen := int(float64(po.manager.config.MaxOpenConns) * 0.9)
        if newMaxOpen >= 10 { // 设置下限
            po.manager.config.MaxOpenConns = newMaxOpen
            po.manager.ConfigurePool()
            log.Printf("Decreased max open connections to %d", newMaxOpen)
        }
    }
}

多数据库支持 #

数据库管理器 #

// 数据库类型枚举
type DatabaseType string

const (
    MySQL      DatabaseType = "mysql"
    PostgreSQL DatabaseType = "postgresql"
    SQLite     DatabaseType = "sqlite"
    SQLServer  DatabaseType = "sqlserver"
)

// 数据库连接信息
type DatabaseConnection struct {
    Name     string
    Type     DatabaseType
    DB       *sql.DB
    Config   interface{}
    IsActive bool
}

// 多数据库管理器
type DatabaseManager struct {
    connections map[string]*DatabaseConnection
    primary     string
    mu          sync.RWMutex
}

func NewDatabaseManager() *DatabaseManager {
    return &DatabaseManager{
        connections: make(map[string]*DatabaseConnection),
    }
}

// 添加数据库连接
func (dm *DatabaseManager) AddConnection(name string, dbType DatabaseType, config interface{}) error {
    dm.mu.Lock()
    defer dm.mu.Unlock()

    var db *sql.DB
    var err error

    switch dbType {
    case MySQL:
        if mysqlConfig, ok := config.(*MySQLConfig); ok {
            db, err = NewMySQLConnection(mysqlConfig)
        } else {
            return fmt.Errorf("invalid MySQL config type")
        }
    case PostgreSQL:
        if pgConfig, ok := config.(*PostgreSQLConfig); ok {
            db, err = NewPostgreSQLConnection(pgConfig)
        } else {
            return fmt.Errorf("invalid PostgreSQL config type")
        }
    case SQLite:
        if sqliteConfig, ok := config.(*SQLiteConfig); ok {
            db, err = NewSQLiteConnection(sqliteConfig)
        } else {
            return fmt.Errorf("invalid SQLite config type")
        }
    default:
        return fmt.Errorf("unsupported database type: %s", dbType)
    }

    if err != nil {
        return fmt.Errorf("failed to create connection: %w", err)
    }

    dm.connections[name] = &DatabaseConnection{
        Name:     name,
        Type:     dbType,
        DB:       db,
        Config:   config,
        IsActive: true,
    }

    // 如果是第一个连接,设为主连接
    if dm.primary == "" {
        dm.primary = name
    }

    return nil
}

// 获取数据库连接
func (dm *DatabaseManager) GetConnection(name string) (*sql.DB, error) {
    dm.mu.RLock()
    defer dm.mu.RUnlock()

    conn, exists := dm.connections[name]
    if !exists {
        return nil, fmt.Errorf("connection %s not found", name)
    }

    if !conn.IsActive {
        return nil, fmt.Errorf("connection %s is not active", name)
    }

    return conn.DB, nil
}

// 获取主数据库连接
func (dm *DatabaseManager) GetPrimaryConnection() (*sql.DB, error) {
    if dm.primary == "" {
        return nil, fmt.Errorf("no primary connection set")
    }
    return dm.GetConnection(dm.primary)
}

// 设置主数据库
func (dm *DatabaseManager) SetPrimary(name string) error {
    dm.mu.Lock()
    defer dm.mu.Unlock()

    if _, exists := dm.connections[name]; !exists {
        return fmt.Errorf("connection %s not found", name)
    }

    dm.primary = name
    return nil
}

// 关闭所有连接
func (dm *DatabaseManager) CloseAll() error {
    dm.mu.Lock()
    defer dm.mu.Unlock()

    var errors []string

    for name, conn := range dm.connections {
        if err := conn.DB.Close(); err != nil {
            errors = append(errors, fmt.Sprintf("failed to close %s: %v", name, err))
        }
        conn.IsActive = false
    }

    if len(errors) > 0 {
        return fmt.Errorf("errors closing connections: %s", strings.Join(errors, "; "))
    }

    return nil
}

读写分离配置 #

// 读写分离管理器
type ReadWriteManager struct {
    writeDB *sql.DB
    readDBs []*sql.DB
    current int
    mu      sync.RWMutex
}

func NewReadWriteManager(writeDB *sql.DB, readDBs []*sql.DB) *ReadWriteManager {
    return &ReadWriteManager{
        writeDB: writeDB,
        readDBs: readDBs,
        current: 0,
    }
}

// 获取写数据库
func (rwm *ReadWriteManager) GetWriteDB() *sql.DB {
    return rwm.writeDB
}

// 获取读数据库(负载均衡)
func (rwm *ReadWriteManager) GetReadDB() *sql.DB {
    rwm.mu.Lock()
    defer rwm.mu.Unlock()

    if len(rwm.readDBs) == 0 {
        return rwm.writeDB // 如果没有读库,使用写库
    }

    // 轮询选择读库
    db := rwm.readDBs[rwm.current]
    rwm.current = (rwm.current + 1) % len(rwm.readDBs)

    return db
}

// 健康检查
func (rwm *ReadWriteManager) HealthCheck() error {
    // 检查写库
    if err := rwm.writeDB.Ping(); err != nil {
        return fmt.Errorf("write database unhealthy: %w", err)
    }

    // 检查读库
    for i, db := range rwm.readDBs {
        if err := db.Ping(); err != nil {
            log.Printf("Read database %d unhealthy: %v", i, err)
            // 可以实现自动移除不健康的读库
        }
    }

    return nil
}

连接安全与性能优化 #

连接安全配置 #

// 安全连接配置
type SecurityConfig struct {
    EnableTLS       bool
    TLSConfig       *tls.Config
    CertFile        string
    KeyFile         string
    CAFile          string
    SkipVerify      bool
    ConnectionTimeout time.Duration
    QueryTimeout    time.Duration
}

func NewSecurityConfig() *SecurityConfig {
    return &SecurityConfig{
        EnableTLS:         true,
        SkipVerify:        false,
        ConnectionTimeout: 30 * time.Second,
        QueryTimeout:      30 * time.Second,
    }
}

// 创建安全的 MySQL 连接
func NewSecureMySQLConnection(config *MySQLConfig, security *SecurityConfig) (*sql.DB, error) {
    dsn := config.DSN()

    if security.EnableTLS {
        // 配置 TLS
        tlsConfig := &tls.Config{
            InsecureSkipVerify: security.SkipVerify,
        }

        if security.CAFile != "" {
            caCert, err := ioutil.ReadFile(security.CAFile)
            if err != nil {
                return nil, fmt.Errorf("failed to read CA file: %w", err)
            }

            caCertPool := x509.NewCertPool()
            caCertPool.AppendCertsFromPEM(caCert)
            tlsConfig.RootCAs = caCertPool
        }

        if security.CertFile != "" && security.KeyFile != "" {
            cert, err := tls.LoadX509KeyPair(security.CertFile, security.KeyFile)
            if err != nil {
                return nil, fmt.Errorf("failed to load client certificate: %w", err)
            }
            tlsConfig.Certificates = []tls.Certificate{cert}
        }

        mysql.RegisterTLSConfig("custom", tlsConfig)
        dsn += "&tls=custom"
    }

    db, err := sql.Open("mysql", dsn)
    if err != nil {
        return nil, err
    }

    // 配置连接池
    db.SetMaxOpenConns(config.MaxOpenConns)
    db.SetMaxIdleConns(config.MaxIdleConns)
    db.SetConnMaxLifetime(config.ConnMaxLifetime)

    return db, nil
}

性能监控 #

// 数据库性能监控器
type PerformanceMonitor struct {
    db       *sql.DB
    metrics  *PerformanceMetrics
    interval time.Duration
}

type PerformanceMetrics struct {
    QueryCount       int64
    SlowQueryCount   int64
    AvgQueryTime     time.Duration
    MaxQueryTime     time.Duration
    ErrorCount       int64
    ConnectionErrors int64
}

func NewPerformanceMonitor(db *sql.DB, interval time.Duration) *PerformanceMonitor {
    return &PerformanceMonitor{
        db:       db,
        metrics:  &PerformanceMetrics{},
        interval: interval,
    }
}

// 开始监控
func (pm *PerformanceMonitor) Start(ctx context.Context) {
    ticker := time.NewTicker(pm.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            pm.collectMetrics()
            pm.reportMetrics()
        }
    }
}

func (pm *PerformanceMonitor) collectMetrics() {
    stats := pm.db.Stats()

    // 记录连接池统计
    log.Printf("DB Stats - Open: %d, InUse: %d, Idle: %d, Wait: %d, WaitDuration: %v",
        stats.OpenConnections,
        stats.InUse,
        stats.Idle,
        stats.WaitCount,
        stats.WaitDuration,
    )

    // 检查慢查询
    if pm.metrics.AvgQueryTime > 1*time.Second {
        log.Printf("Warning: Average query time is high: %v", pm.metrics.AvgQueryTime)
    }
}

func (pm *PerformanceMonitor) reportMetrics() {
    // 可以发送到监控系统
    metrics := map[string]interface{}{
        "query_count":       pm.metrics.QueryCount,
        "slow_query_count":  pm.metrics.SlowQueryCount,
        "avg_query_time":    pm.metrics.AvgQueryTime.Milliseconds(),
        "max_query_time":    pm.metrics.MaxQueryTime.Milliseconds(),
        "error_count":       pm.metrics.ErrorCount,
        "connection_errors": pm.metrics.ConnectionErrors,
    }

    // 发送到监控系统(如 Prometheus、InfluxDB 等)
    sendToMonitoringSystem(metrics)
}

func sendToMonitoringSystem(metrics map[string]interface{}) {
    // 实现发送到监控系统的逻辑
    log.Printf("Metrics: %+v", metrics)
}

完整的数据库初始化示例 #

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    // 创建数据库管理器
    dbManager := NewDatabaseManager()

    // 配置主数据库(MySQL)
    mysqlConfig := DefaultMySQLConfig()
    mysqlConfig.Host = os.Getenv("MYSQL_HOST")
    mysqlConfig.Username = os.Getenv("MYSQL_USER")
    mysqlConfig.Password = os.Getenv("MYSQL_PASSWORD")
    mysqlConfig.Database = os.Getenv("MYSQL_DATABASE")

    if err := dbManager.AddConnection("primary", MySQL, mysqlConfig); err != nil {
        log.Fatalf("Failed to add primary database: %v", err)
    }

    // 配置缓存数据库(SQLite)
    sqliteConfig := DefaultSQLiteConfig()
    sqliteConfig.Path = "./cache.db"

    if err := dbManager.AddConnection("cache", SQLite, sqliteConfig); err != nil {
        log.Fatalf("Failed to add cache database: %v", err)
    }

    // 获取主数据库连接
    primaryDB, err := dbManager.GetPrimaryConnection()
    if err != nil {
        log.Fatalf("Failed to get primary connection: %v", err)
    }

    // 创建连接池管理器
    poolConfig := &PoolConfig{
        MaxOpenConns:    50,
        MaxIdleConns:    10,
        ConnMaxLifetime: time.Hour,
        ConnMaxIdleTime: 10 * time.Minute,
    }

    poolManager := NewConnectionPoolManager(primaryDB, poolConfig)
    poolManager.ConfigurePool()

    // 启动性能监控
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    monitor := NewPerformanceMonitor(primaryDB, 30*time.Second)
    go monitor.Start(ctx)

    // 启动连接池监控
    go poolManager.MonitorPool(1 * time.Minute)

    log.Println("Database connections initialized successfully")

    // 等待退出信号
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    <-sigChan

    log.Println("Shutting down...")

    // 关闭所有数据库连接
    if err := dbManager.CloseAll(); err != nil {
        log.Printf("Error closing database connections: %v", err)
    }

    log.Println("Database connections closed")
}

通过本节的学习,你已经掌握了 Go 语言中数据库连接的各种配置方法、连接池管理技术以及多数据库支持的实现。这些知识为构建高性能、可扩展的数据库访问层奠定了坚实的基础。在下一节中,我们将学习 GORM 框架的基础操作。