3.4.1 数据库连接与配置 #
数据库连接是 Web 应用的基础设施,正确的连接配置和管理对应用的性能和稳定性至关重要。本节将详细介绍 Go 语言中各种数据库的连接方法、连接池配置以及最佳实践。
数据库驱动安装与配置 #
常用数据库驱动 #
Go 语言支持多种数据库,每种数据库都有相应的驱动程序:
// MySQL 驱动
import _ "github.com/go-sql-driver/mysql"
// PostgreSQL 驱动
import _ "github.com/lib/pq"
// SQLite 驱动
import _ "github.com/mattn/go-sqlite3"
// SQL Server 驱动
import _ "github.com/denisenkom/go-mssqldb"
// Oracle 驱动
import _ "github.com/godror/godror"
MySQL 连接配置 #
package main
import (
"database/sql"
"fmt"
"log"
"time"
_ "github.com/go-sql-driver/mysql"
)
// MySQL 连接配置结构
type MySQLConfig struct {
Host string
Port int
Username string
Password string
Database string
Charset string
Timeout time.Duration
// 连接池配置
MaxOpenConns int
MaxIdleConns int
ConnMaxLifetime time.Duration
ConnMaxIdleTime time.Duration
}
// 默认 MySQL 配置
func DefaultMySQLConfig() *MySQLConfig {
return &MySQLConfig{
Host: "localhost",
Port: 3306,
Username: "root",
Password: "",
Database: "test",
Charset: "utf8mb4",
Timeout: 10 * time.Second,
MaxOpenConns: 100,
MaxIdleConns: 10,
ConnMaxLifetime: time.Hour,
ConnMaxIdleTime: 10 * time.Minute,
}
}
// 构建 MySQL DSN
func (c *MySQLConfig) DSN() string {
return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=%s&parseTime=true&loc=Local&timeout=%s",
c.Username,
c.Password,
c.Host,
c.Port,
c.Database,
c.Charset,
c.Timeout,
)
}
// 创建 MySQL 连接
func NewMySQLConnection(config *MySQLConfig) (*sql.DB, error) {
db, err := sql.Open("mysql", config.DSN())
if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err)
}
// 配置连接池
db.SetMaxOpenConns(config.MaxOpenConns)
db.SetMaxIdleConns(config.MaxIdleConns)
db.SetConnMaxLifetime(config.ConnMaxLifetime)
db.SetConnMaxIdleTime(config.ConnMaxIdleTime)
// 测试连接
if err := db.Ping(); err != nil {
db.Close()
return nil, fmt.Errorf("failed to ping database: %w", err)
}
log.Printf("MySQL connected successfully to %s:%d/%s",
config.Host, config.Port, config.Database)
return db, nil
}
PostgreSQL 连接配置 #
import (
"database/sql"
"fmt"
_ "github.com/lib/pq"
)
// PostgreSQL 连接配置
type PostgreSQLConfig struct {
Host string
Port int
Username string
Password string
Database string
SSLMode string
Timezone string
// 连接池配置
MaxOpenConns int
MaxIdleConns int
ConnMaxLifetime time.Duration
}
func DefaultPostgreSQLConfig() *PostgreSQLConfig {
return &PostgreSQLConfig{
Host: "localhost",
Port: 5432,
Username: "postgres",
Password: "",
Database: "postgres",
SSLMode: "disable",
Timezone: "UTC",
MaxOpenConns: 100,
MaxIdleConns: 10,
ConnMaxLifetime: time.Hour,
}
}
func (c *PostgreSQLConfig) DSN() string {
return fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s TimeZone=%s",
c.Host,
c.Port,
c.Username,
c.Password,
c.Database,
c.SSLMode,
c.Timezone,
)
}
func NewPostgreSQLConnection(config *PostgreSQLConfig) (*sql.DB, error) {
db, err := sql.Open("postgres", config.DSN())
if err != nil {
return nil, fmt.Errorf("failed to open PostgreSQL database: %w", err)
}
// 配置连接池
db.SetMaxOpenConns(config.MaxOpenConns)
db.SetMaxIdleConns(config.MaxIdleConns)
db.SetConnMaxLifetime(config.ConnMaxLifetime)
// 测试连接
if err := db.Ping(); err != nil {
db.Close()
return nil, fmt.Errorf("failed to ping PostgreSQL database: %w", err)
}
return db, nil
}
SQLite 连接配置 #
import (
"database/sql"
"fmt"
"os"
"path/filepath"
_ "github.com/mattn/go-sqlite3"
)
type SQLiteConfig struct {
Path string
Mode string // ro, rw, rwc, memory
Cache string // shared, private
MaxOpenConns int
MaxIdleConns int
ConnMaxLifetime time.Duration
}
func DefaultSQLiteConfig() *SQLiteConfig {
return &SQLiteConfig{
Path: "./app.db",
Mode: "rwc",
Cache: "shared",
MaxOpenConns: 1, // SQLite 建议单连接
MaxIdleConns: 1,
ConnMaxLifetime: time.Hour,
}
}
func (c *SQLiteConfig) DSN() string {
return fmt.Sprintf("file:%s?mode=%s&cache=%s&_fk=1",
c.Path,
c.Mode,
c.Cache,
)
}
func NewSQLiteConnection(config *SQLiteConfig) (*sql.DB, error) {
// 确保目录存在
dir := filepath.Dir(config.Path)
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("failed to create directory: %w", err)
}
db, err := sql.Open("sqlite3", config.DSN())
if err != nil {
return nil, fmt.Errorf("failed to open SQLite database: %w", err)
}
// SQLite 特殊配置
db.SetMaxOpenConns(config.MaxOpenConns)
db.SetMaxIdleConns(config.MaxIdleConns)
db.SetConnMaxLifetime(config.ConnMaxLifetime)
// 启用 WAL 模式以提高并发性能
if _, err := db.Exec("PRAGMA journal_mode=WAL;"); err != nil {
db.Close()
return nil, fmt.Errorf("failed to enable WAL mode: %w", err)
}
return db, nil
}
连接池管理 #
连接池配置详解 #
// 连接池管理器
type ConnectionPoolManager struct {
db *sql.DB
config *PoolConfig
stats *PoolStats
}
type PoolConfig struct {
MaxOpenConns int // 最大打开连接数
MaxIdleConns int // 最大空闲连接数
ConnMaxLifetime time.Duration // 连接最大生存时间
ConnMaxIdleTime time.Duration // 连接最大空闲时间
}
type PoolStats struct {
OpenConnections int
InUse int
Idle int
WaitCount int64
WaitDuration time.Duration
MaxIdleClosed int64
MaxLifetimeClosed int64
}
func NewConnectionPoolManager(db *sql.DB, config *PoolConfig) *ConnectionPoolManager {
return &ConnectionPoolManager{
db: db,
config: config,
stats: &PoolStats{},
}
}
// 配置连接池
func (cpm *ConnectionPoolManager) ConfigurePool() {
cpm.db.SetMaxOpenConns(cpm.config.MaxOpenConns)
cpm.db.SetMaxIdleConns(cpm.config.MaxIdleConns)
cpm.db.SetConnMaxLifetime(cpm.config.ConnMaxLifetime)
cpm.db.SetConnMaxIdleTime(cpm.config.ConnMaxIdleTime)
}
// 获取连接池统计信息
func (cpm *ConnectionPoolManager) GetStats() *PoolStats {
dbStats := cpm.db.Stats()
return &PoolStats{
OpenConnections: dbStats.OpenConnections,
InUse: dbStats.InUse,
Idle: dbStats.Idle,
WaitCount: dbStats.WaitCount,
WaitDuration: dbStats.WaitDuration,
MaxIdleClosed: dbStats.MaxIdleClosed,
MaxLifetimeClosed: dbStats.MaxLifetimeClosed,
}
}
// 监控连接池状态
func (cpm *ConnectionPoolManager) MonitorPool(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
stats := cpm.GetStats()
log.Printf("Connection Pool Stats - Open: %d, InUse: %d, Idle: %d, Wait: %d",
stats.OpenConnections,
stats.InUse,
stats.Idle,
stats.WaitCount,
)
// 检查连接池健康状态
if stats.WaitCount > 100 {
log.Printf("Warning: High connection wait count: %d", stats.WaitCount)
}
if stats.OpenConnections >= cpm.config.MaxOpenConns {
log.Printf("Warning: Connection pool at maximum capacity")
}
}
}
连接池优化策略 #
// 连接池优化器
type PoolOptimizer struct {
manager *ConnectionPoolManager
metrics *PoolMetrics
}
type PoolMetrics struct {
avgResponseTime time.Duration
errorRate float64
throughput int64
}
func NewPoolOptimizer(manager *ConnectionPoolManager) *PoolOptimizer {
return &PoolOptimizer{
manager: manager,
metrics: &PoolMetrics{},
}
}
// 动态调整连接池大小
func (po *PoolOptimizer) OptimizePool() {
stats := po.manager.GetStats()
// 根据使用情况调整最大连接数
utilizationRate := float64(stats.InUse) / float64(stats.OpenConnections)
if utilizationRate > 0.8 && stats.WaitCount > 0 {
// 高使用率且有等待,增加连接数
newMaxOpen := int(float64(po.manager.config.MaxOpenConns) * 1.2)
if newMaxOpen <= 200 { // 设置上限
po.manager.config.MaxOpenConns = newMaxOpen
po.manager.ConfigurePool()
log.Printf("Increased max open connections to %d", newMaxOpen)
}
} else if utilizationRate < 0.3 && stats.OpenConnections > 10 {
// 低使用率,减少连接数
newMaxOpen := int(float64(po.manager.config.MaxOpenConns) * 0.9)
if newMaxOpen >= 10 { // 设置下限
po.manager.config.MaxOpenConns = newMaxOpen
po.manager.ConfigurePool()
log.Printf("Decreased max open connections to %d", newMaxOpen)
}
}
}
多数据库支持 #
数据库管理器 #
// 数据库类型枚举
type DatabaseType string
const (
MySQL DatabaseType = "mysql"
PostgreSQL DatabaseType = "postgresql"
SQLite DatabaseType = "sqlite"
SQLServer DatabaseType = "sqlserver"
)
// 数据库连接信息
type DatabaseConnection struct {
Name string
Type DatabaseType
DB *sql.DB
Config interface{}
IsActive bool
}
// 多数据库管理器
type DatabaseManager struct {
connections map[string]*DatabaseConnection
primary string
mu sync.RWMutex
}
func NewDatabaseManager() *DatabaseManager {
return &DatabaseManager{
connections: make(map[string]*DatabaseConnection),
}
}
// 添加数据库连接
func (dm *DatabaseManager) AddConnection(name string, dbType DatabaseType, config interface{}) error {
dm.mu.Lock()
defer dm.mu.Unlock()
var db *sql.DB
var err error
switch dbType {
case MySQL:
if mysqlConfig, ok := config.(*MySQLConfig); ok {
db, err = NewMySQLConnection(mysqlConfig)
} else {
return fmt.Errorf("invalid MySQL config type")
}
case PostgreSQL:
if pgConfig, ok := config.(*PostgreSQLConfig); ok {
db, err = NewPostgreSQLConnection(pgConfig)
} else {
return fmt.Errorf("invalid PostgreSQL config type")
}
case SQLite:
if sqliteConfig, ok := config.(*SQLiteConfig); ok {
db, err = NewSQLiteConnection(sqliteConfig)
} else {
return fmt.Errorf("invalid SQLite config type")
}
default:
return fmt.Errorf("unsupported database type: %s", dbType)
}
if err != nil {
return fmt.Errorf("failed to create connection: %w", err)
}
dm.connections[name] = &DatabaseConnection{
Name: name,
Type: dbType,
DB: db,
Config: config,
IsActive: true,
}
// 如果是第一个连接,设为主连接
if dm.primary == "" {
dm.primary = name
}
return nil
}
// 获取数据库连接
func (dm *DatabaseManager) GetConnection(name string) (*sql.DB, error) {
dm.mu.RLock()
defer dm.mu.RUnlock()
conn, exists := dm.connections[name]
if !exists {
return nil, fmt.Errorf("connection %s not found", name)
}
if !conn.IsActive {
return nil, fmt.Errorf("connection %s is not active", name)
}
return conn.DB, nil
}
// 获取主数据库连接
func (dm *DatabaseManager) GetPrimaryConnection() (*sql.DB, error) {
if dm.primary == "" {
return nil, fmt.Errorf("no primary connection set")
}
return dm.GetConnection(dm.primary)
}
// 设置主数据库
func (dm *DatabaseManager) SetPrimary(name string) error {
dm.mu.Lock()
defer dm.mu.Unlock()
if _, exists := dm.connections[name]; !exists {
return fmt.Errorf("connection %s not found", name)
}
dm.primary = name
return nil
}
// 关闭所有连接
func (dm *DatabaseManager) CloseAll() error {
dm.mu.Lock()
defer dm.mu.Unlock()
var errors []string
for name, conn := range dm.connections {
if err := conn.DB.Close(); err != nil {
errors = append(errors, fmt.Sprintf("failed to close %s: %v", name, err))
}
conn.IsActive = false
}
if len(errors) > 0 {
return fmt.Errorf("errors closing connections: %s", strings.Join(errors, "; "))
}
return nil
}
读写分离配置 #
// 读写分离管理器
type ReadWriteManager struct {
writeDB *sql.DB
readDBs []*sql.DB
current int
mu sync.RWMutex
}
func NewReadWriteManager(writeDB *sql.DB, readDBs []*sql.DB) *ReadWriteManager {
return &ReadWriteManager{
writeDB: writeDB,
readDBs: readDBs,
current: 0,
}
}
// 获取写数据库
func (rwm *ReadWriteManager) GetWriteDB() *sql.DB {
return rwm.writeDB
}
// 获取读数据库(负载均衡)
func (rwm *ReadWriteManager) GetReadDB() *sql.DB {
rwm.mu.Lock()
defer rwm.mu.Unlock()
if len(rwm.readDBs) == 0 {
return rwm.writeDB // 如果没有读库,使用写库
}
// 轮询选择读库
db := rwm.readDBs[rwm.current]
rwm.current = (rwm.current + 1) % len(rwm.readDBs)
return db
}
// 健康检查
func (rwm *ReadWriteManager) HealthCheck() error {
// 检查写库
if err := rwm.writeDB.Ping(); err != nil {
return fmt.Errorf("write database unhealthy: %w", err)
}
// 检查读库
for i, db := range rwm.readDBs {
if err := db.Ping(); err != nil {
log.Printf("Read database %d unhealthy: %v", i, err)
// 可以实现自动移除不健康的读库
}
}
return nil
}
连接安全与性能优化 #
连接安全配置 #
// 安全连接配置
type SecurityConfig struct {
EnableTLS bool
TLSConfig *tls.Config
CertFile string
KeyFile string
CAFile string
SkipVerify bool
ConnectionTimeout time.Duration
QueryTimeout time.Duration
}
func NewSecurityConfig() *SecurityConfig {
return &SecurityConfig{
EnableTLS: true,
SkipVerify: false,
ConnectionTimeout: 30 * time.Second,
QueryTimeout: 30 * time.Second,
}
}
// 创建安全的 MySQL 连接
func NewSecureMySQLConnection(config *MySQLConfig, security *SecurityConfig) (*sql.DB, error) {
dsn := config.DSN()
if security.EnableTLS {
// 配置 TLS
tlsConfig := &tls.Config{
InsecureSkipVerify: security.SkipVerify,
}
if security.CAFile != "" {
caCert, err := ioutil.ReadFile(security.CAFile)
if err != nil {
return nil, fmt.Errorf("failed to read CA file: %w", err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
}
if security.CertFile != "" && security.KeyFile != "" {
cert, err := tls.LoadX509KeyPair(security.CertFile, security.KeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %w", err)
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
mysql.RegisterTLSConfig("custom", tlsConfig)
dsn += "&tls=custom"
}
db, err := sql.Open("mysql", dsn)
if err != nil {
return nil, err
}
// 配置连接池
db.SetMaxOpenConns(config.MaxOpenConns)
db.SetMaxIdleConns(config.MaxIdleConns)
db.SetConnMaxLifetime(config.ConnMaxLifetime)
return db, nil
}
性能监控 #
// 数据库性能监控器
type PerformanceMonitor struct {
db *sql.DB
metrics *PerformanceMetrics
interval time.Duration
}
type PerformanceMetrics struct {
QueryCount int64
SlowQueryCount int64
AvgQueryTime time.Duration
MaxQueryTime time.Duration
ErrorCount int64
ConnectionErrors int64
}
func NewPerformanceMonitor(db *sql.DB, interval time.Duration) *PerformanceMonitor {
return &PerformanceMonitor{
db: db,
metrics: &PerformanceMetrics{},
interval: interval,
}
}
// 开始监控
func (pm *PerformanceMonitor) Start(ctx context.Context) {
ticker := time.NewTicker(pm.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
pm.collectMetrics()
pm.reportMetrics()
}
}
}
func (pm *PerformanceMonitor) collectMetrics() {
stats := pm.db.Stats()
// 记录连接池统计
log.Printf("DB Stats - Open: %d, InUse: %d, Idle: %d, Wait: %d, WaitDuration: %v",
stats.OpenConnections,
stats.InUse,
stats.Idle,
stats.WaitCount,
stats.WaitDuration,
)
// 检查慢查询
if pm.metrics.AvgQueryTime > 1*time.Second {
log.Printf("Warning: Average query time is high: %v", pm.metrics.AvgQueryTime)
}
}
func (pm *PerformanceMonitor) reportMetrics() {
// 可以发送到监控系统
metrics := map[string]interface{}{
"query_count": pm.metrics.QueryCount,
"slow_query_count": pm.metrics.SlowQueryCount,
"avg_query_time": pm.metrics.AvgQueryTime.Milliseconds(),
"max_query_time": pm.metrics.MaxQueryTime.Milliseconds(),
"error_count": pm.metrics.ErrorCount,
"connection_errors": pm.metrics.ConnectionErrors,
}
// 发送到监控系统(如 Prometheus、InfluxDB 等)
sendToMonitoringSystem(metrics)
}
func sendToMonitoringSystem(metrics map[string]interface{}) {
// 实现发送到监控系统的逻辑
log.Printf("Metrics: %+v", metrics)
}
完整的数据库初始化示例 #
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
// 创建数据库管理器
dbManager := NewDatabaseManager()
// 配置主数据库(MySQL)
mysqlConfig := DefaultMySQLConfig()
mysqlConfig.Host = os.Getenv("MYSQL_HOST")
mysqlConfig.Username = os.Getenv("MYSQL_USER")
mysqlConfig.Password = os.Getenv("MYSQL_PASSWORD")
mysqlConfig.Database = os.Getenv("MYSQL_DATABASE")
if err := dbManager.AddConnection("primary", MySQL, mysqlConfig); err != nil {
log.Fatalf("Failed to add primary database: %v", err)
}
// 配置缓存数据库(SQLite)
sqliteConfig := DefaultSQLiteConfig()
sqliteConfig.Path = "./cache.db"
if err := dbManager.AddConnection("cache", SQLite, sqliteConfig); err != nil {
log.Fatalf("Failed to add cache database: %v", err)
}
// 获取主数据库连接
primaryDB, err := dbManager.GetPrimaryConnection()
if err != nil {
log.Fatalf("Failed to get primary connection: %v", err)
}
// 创建连接池管理器
poolConfig := &PoolConfig{
MaxOpenConns: 50,
MaxIdleConns: 10,
ConnMaxLifetime: time.Hour,
ConnMaxIdleTime: 10 * time.Minute,
}
poolManager := NewConnectionPoolManager(primaryDB, poolConfig)
poolManager.ConfigurePool()
// 启动性能监控
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
monitor := NewPerformanceMonitor(primaryDB, 30*time.Second)
go monitor.Start(ctx)
// 启动连接池监控
go poolManager.MonitorPool(1 * time.Minute)
log.Println("Database connections initialized successfully")
// 等待退出信号
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
log.Println("Shutting down...")
// 关闭所有数据库连接
if err := dbManager.CloseAll(); err != nil {
log.Printf("Error closing database connections: %v", err)
}
log.Println("Database connections closed")
}
通过本节的学习,你已经掌握了 Go 语言中数据库连接的各种配置方法、连接池管理技术以及多数据库支持的实现。这些知识为构建高性能、可扩展的数据库访问层奠定了坚实的基础。在下一节中,我们将学习 GORM 框架的基础操作。