3.4.3 GORM 高级查询 #
在掌握了 GORM 的基础操作后,本节将深入探讨 GORM 的高级查询功能,包括复杂查询、联表操作、预加载策略、原生 SQL 使用以及查询优化技巧。
复杂查询与联表操作 #
高级 WHERE 条件 #
package main
import (
"fmt"
"time"
"gorm.io/gorm"
)
type AdvancedUserRepository struct {
db *gorm.DB
}
func NewAdvancedUserRepository(db *gorm.DB) *AdvancedUserRepository {
return &AdvancedUserRepository{db: db}
}
// 复杂条件查询
func (r *AdvancedUserRepository) ComplexQuery() ([]User, error) {
var users []User
// 使用多种条件组合
result := r.db.Where("status = ? AND created_at > ?", "active", time.Now().AddDate(0, -1, 0)).
Where("(first_name LIKE ? OR last_name LIKE ?)", "%John%", "%John%").
Not("email LIKE ?", "%temp%").
Or("username IN ?", []string{"admin", "moderator"}).
Find(&users)
return users, result.Error
}
// 使用结构体作为条件
func (r *AdvancedUserRepository) QueryByStruct(condition User) ([]User, error) {
var users []User
// GORM 会忽略零值字段
result := r.db.Where(&condition).Find(&users)
return users, result.Error
}
// 使用 Map 作为条件
func (r *AdvancedUserRepository) QueryByMap(conditions map[string]interface{}) ([]User, error) {
var users []User
result := r.db.Where(conditions).Find(&users)
return users, result.Error
}
// 内联条件
func (r *AdvancedUserRepository) InlineConditions() ([]User, error) {
var users []User
// 直接在 Find 中使用条件
result := r.db.Find(&users, "status = ? AND created_at > ?", "active", time.Now().AddDate(0, -1, 0))
return users, result.Error
}
// 子查询
func (r *AdvancedUserRepository) SubQuery() ([]User, error) {
var users []User
// 查找有文章的用户
subQuery := r.db.Model(&Post{}).Select("DISTINCT author_id").Where("status = ?", "published")
result := r.db.Where("id IN (?)", subQuery).Find(&users)
return users, result.Error
}
// EXISTS 子查询
func (r *AdvancedUserRepository) ExistsQuery() ([]User, error) {
var users []User
// 查找存在已发布文章的用户
result := r.db.Where("EXISTS (?)",
r.db.Model(&Post{}).Select("1").Where("posts.author_id = users.id AND posts.status = ?", "published")).
Find(&users)
return users, result.Error
}
JOIN 操作 #
// 内连接查询
func (r *AdvancedUserRepository) InnerJoinQuery() ([]map[string]interface{}, error) {
var results []map[string]interface{}
result := r.db.Model(&User{}).
Select("users.id, users.username, users.email, user_profiles.bio").
Joins("INNER JOIN user_profiles ON users.id = user_profiles.user_id").
Where("users.status = ?", "active").
Find(&results)
return results, result.Error
}
// 左连接查询
func (r *AdvancedUserRepository) LeftJoinQuery() ([]User, error) {
var users []User
result := r.db.Preload("Profile").
Joins("LEFT JOIN user_profiles ON users.id = user_profiles.user_id").
Where("users.status = ?", "active").
Find(&users)
return users, result.Error
}
// 复杂连接查询
func (r *AdvancedUserRepository) ComplexJoinQuery() ([]UserWithStats, error) {
var results []UserWithStats
result := r.db.Model(&User{}).
Select(`users.id, users.username, users.email,
COUNT(DISTINCT posts.id) as post_count,
COUNT(DISTINCT comments.id) as comment_count,
MAX(posts.created_at) as last_post_date`).
Joins("LEFT JOIN posts ON users.id = posts.author_id").
Joins("LEFT JOIN comments ON users.id = comments.user_id").
Where("users.status = ?", "active").
Group("users.id").
Having("post_count > ?", 0).
Order("post_count DESC").
Find(&results)
return results, result.Error
}
// 自定义结构体接收连接查询结果
type UserWithStats struct {
ID uint `json:"id"`
Username string `json:"username"`
Email string `json:"email"`
PostCount int `json:"post_count"`
CommentCount int `json:"comment_count"`
LastPostDate time.Time `json:"last_post_date"`
}
聚合查询 #
// 聚合函数查询
func (r *AdvancedUserRepository) AggregateQueries() (*UserStatistics, error) {
var stats UserStatistics
// 基础统计
result := r.db.Model(&User{}).
Select(`COUNT(*) as total_users,
COUNT(CASE WHEN status = 'active' THEN 1 END) as active_users,
COUNT(CASE WHEN status = 'inactive' THEN 1 END) as inactive_users,
AVG(CASE WHEN last_login IS NOT NULL THEN TIMESTAMPDIFF(DAY, created_at, last_login) END) as avg_days_to_first_login`).
Scan(&stats)
return &stats, result.Error
}
// 分组统计
func (r *AdvancedUserRepository) GroupByStats() ([]StatusStats, error) {
var stats []StatusStats
result := r.db.Model(&User{}).
Select("status, COUNT(*) as count, MAX(created_at) as latest_created").
Group("status").
Order("count DESC").
Find(&stats)
return stats, result.Error
}
// 时间维度统计
func (r *AdvancedUserRepository) TimeBasedStats() ([]DailyStats, error) {
var stats []DailyStats
result := r.db.Model(&User{}).
Select("DATE(created_at) as date, COUNT(*) as user_count").
Where("created_at >= ?", time.Now().AddDate(0, 0, -30)).
Group("DATE(created_at)").
Order("date DESC").
Find(&stats)
return stats, result.Error
}
type UserStatistics struct {
TotalUsers int `json:"total_users"`
ActiveUsers int `json:"active_users"`
InactiveUsers int `json:"inactive_users"`
AvgDaysToFirstLogin float64 `json:"avg_days_to_first_login"`
}
type StatusStats struct {
Status string `json:"status"`
Count int `json:"count"`
LatestCreated time.Time `json:"latest_created"`
}
type DailyStats struct {
Date string `json:"date"`
UserCount int `json:"user_count"`
}
预加载与懒加载 #
基础预加载 #
// 预加载单个关联
func (r *AdvancedUserRepository) GetWithProfile(id uint) (*User, error) {
var user User
result := r.db.Preload("Profile").First(&user, id)
return &user, result.Error
}
// 预加载多个关联
func (r *AdvancedUserRepository) GetWithAllRelations(id uint) (*User, error) {
var user User
result := r.db.
Preload("Profile").
Preload("Posts").
Preload("Comments").
First(&user, id)
return &user, result.Error
}
// 嵌套预加载
func (r *AdvancedUserRepository) GetWithNestedRelations(id uint) (*User, error) {
var user User
result := r.db.
Preload("Posts.Comments"). // 预加载文章的评论
Preload("Posts.Tags"). // 预加载文章的标签
Preload("Comments.Post"). // 预加载评论的文章
First(&user, id)
return &user, result.Error
}
条件预加载 #
// 带条件的预加载
func (r *AdvancedUserRepository) GetWithConditionalPreload(id uint) (*User, error) {
var user User
result := r.db.
Preload("Posts", "status = ?", "published"). // 只预加载已发布的文章
Preload("Comments", "created_at > ?", time.Now().AddDate(0, 0, -7)). // 只预加载最近7天的评论
First(&user, id)
return &user, result.Error
}
// 使用函数进行条件预加载
func (r *AdvancedUserRepository) GetWithFunctionPreload(id uint) (*User, error) {
var user User
result := r.db.
Preload("Posts", func(db *gorm.DB) *gorm.DB {
return db.Where("status = ? AND view_count > ?", "published", 100).
Order("created_at DESC").
Limit(10)
}).
First(&user, id)
return &user, result.Error
}
// 自定义预加载
func (r *AdvancedUserRepository) CustomPreload(id uint) (*User, error) {
var user User
// 先查询用户
if err := r.db.First(&user, id).Error; err != nil {
return nil, err
}
// 自定义加载热门文章
var popularPosts []Post
r.db.Where("author_id = ? AND view_count > ?", user.ID, 1000).
Order("view_count DESC").
Limit(5).
Find(&popularPosts)
user.Posts = popularPosts
return &user, nil
}
选择性预加载 #
// Select 预加载
func (r *AdvancedUserRepository) SelectivePreload(id uint) (*User, error) {
var user User
result := r.db.
Select("id", "username", "email"). // 只选择特定字段
Preload("Profile", func(db *gorm.DB) *gorm.DB {
return db.Select("user_id", "bio", "website") // 预加载时也只选择特定字段
}).
First(&user, id)
return &user, result.Error
}
// Omit 预加载(排除字段)
func (r *AdvancedUserRepository) OmitPreload(id uint) (*User, error) {
var user User
result := r.db.
Omit("password", "deleted_at"). // 排除敏感字段
Preload("Profile").
First(&user, id)
return &user, result.Error
}
懒加载实现 #
// 懒加载服务
type LazyLoadService struct {
db *gorm.DB
}
func NewLazyLoadService(db *gorm.DB) *LazyLoadService {
return &LazyLoadService{db: db}
}
// 懒加载用户资料
func (s *LazyLoadService) LoadUserProfile(user *User) error {
if user.Profile != nil {
return nil // 已经加载过了
}
var profile UserProfile
if err := s.db.Where("user_id = ?", user.ID).First(&profile).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil // 没有资料,不是错误
}
return err
}
user.Profile = &profile
return nil
}
// 懒加载用户文章
func (s *LazyLoadService) LoadUserPosts(user *User, limit int) error {
if user.Posts != nil && len(user.Posts) > 0 {
return nil // 已经加载过了
}
var posts []Post
query := s.db.Where("author_id = ?", user.ID).Order("created_at DESC")
if limit > 0 {
query = query.Limit(limit)
}
if err := query.Find(&posts).Error; err != nil {
return err
}
user.Posts = posts
return nil
}
// 带缓存的懒加载
type CachedLazyLoadService struct {
db *gorm.DB
cache map[string]interface{} // 简单的内存缓存
mu sync.RWMutex
}
func NewCachedLazyLoadService(db *gorm.DB) *CachedLazyLoadService {
return &CachedLazyLoadService{
db: db,
cache: make(map[string]interface{}),
}
}
func (s *CachedLazyLoadService) LoadUserProfileCached(user *User) error {
cacheKey := fmt.Sprintf("user_profile_%d", user.ID)
s.mu.RLock()
if cached, exists := s.cache[cacheKey]; exists {
if profile, ok := cached.(*UserProfile); ok {
user.Profile = profile
s.mu.RUnlock()
return nil
}
}
s.mu.RUnlock()
var profile UserProfile
if err := s.db.Where("user_id = ?", user.ID).First(&profile).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil
}
return err
}
s.mu.Lock()
s.cache[cacheKey] = &profile
s.mu.Unlock()
user.Profile = &profile
return nil
}
原生 SQL 与自定义查询 #
原生 SQL 查询 #
// 原生 SQL 查询
func (r *AdvancedUserRepository) RawSQLQuery() ([]User, error) {
var users []User
result := r.db.Raw(`
SELECT u.*, p.bio
FROM users u
LEFT JOIN user_profiles p ON u.id = p.user_id
WHERE u.status = ? AND u.created_at > ?
ORDER BY u.created_at DESC
LIMIT ?
`, "active", time.Now().AddDate(0, -1, 0), 10).Scan(&users)
return users, result.Error
}
// 原生 SQL 统计查询
func (r *AdvancedUserRepository) RawStatisticsQuery() (*ComplexStats, error) {
var stats ComplexStats
result := r.db.Raw(`
SELECT
COUNT(DISTINCT u.id) as total_users,
COUNT(DISTINCT p.id) as users_with_posts,
AVG(post_counts.post_count) as avg_posts_per_user,
MAX(post_counts.post_count) as max_posts_per_user
FROM users u
LEFT JOIN (
SELECT author_id, COUNT(*) as post_count
FROM posts
WHERE status = 'published'
GROUP BY author_id
) post_counts ON u.id = post_counts.author_id
LEFT JOIN posts p ON u.id = p.author_id AND p.status = 'published'
WHERE u.status = 'active'
`).Scan(&stats)
return &stats, result.Error
}
type ComplexStats struct {
TotalUsers int `json:"total_users"`
UsersWithPosts int `json:"users_with_posts"`
AvgPostsPerUser float64 `json:"avg_posts_per_user"`
MaxPostsPerUser int `json:"max_posts_per_user"`
}
// 执行原生 SQL 语句
func (r *AdvancedUserRepository) ExecuteRawSQL(sql string, values ...interface{}) error {
result := r.db.Exec(sql, values...)
return result.Error
}
// 原生 SQL 更新
func (r *AdvancedUserRepository) RawUpdate() error {
result := r.db.Exec(`
UPDATE users
SET last_login = NOW()
WHERE id IN (
SELECT DISTINCT author_id
FROM posts
WHERE created_at > DATE_SUB(NOW(), INTERVAL 7 DAY)
)
`)
return result.Error
}
存储过程调用 #
// 调用存储过程
func (r *AdvancedUserRepository) CallStoredProcedure(userID uint) (*UserReport, error) {
var report UserReport
result := r.db.Raw("CALL GetUserReport(?)", userID).Scan(&report)
return &report, result.Error
}
// 调用返回多个结果集的存储过程
func (r *AdvancedUserRepository) CallComplexStoredProcedure(userID uint) (*ComplexUserReport, error) {
// 注意:GORM 对多结果集的支持有限,可能需要使用原生 database/sql
rows, err := r.db.Raw("CALL GetComplexUserReport(?)", userID).Rows()
if err != nil {
return nil, err
}
defer rows.Close()
var report ComplexUserReport
// 处理第一个结果集
if rows.Next() {
err = rows.Scan(&report.UserInfo.ID, &report.UserInfo.Username, &report.UserInfo.Email)
if err != nil {
return nil, err
}
}
// 移动到下一个结果集
if rows.NextResultSet() {
for rows.Next() {
var post PostSummary
err = rows.Scan(&post.ID, &post.Title, &post.ViewCount)
if err != nil {
return nil, err
}
report.Posts = append(report.Posts, post)
}
}
return &report, nil
}
type UserReport struct {
UserID uint `json:"user_id"`
PostCount int `json:"post_count"`
CommentCount int `json:"comment_count"`
AvgViews float64 `json:"avg_views"`
}
type ComplexUserReport struct {
UserInfo UserInfo `json:"user_info"`
Posts []PostSummary `json:"posts"`
}
type UserInfo struct {
ID uint `json:"id"`
Username string `json:"username"`
Email string `json:"email"`
}
type PostSummary struct {
ID uint `json:"id"`
Title string `json:"title"`
ViewCount int `json:"view_count"`
}
动态 SQL 构建 #
// 动态 SQL 构建器
type DynamicQueryBuilder struct {
db *gorm.DB
}
func NewDynamicQueryBuilder(db *gorm.DB) *DynamicQueryBuilder {
return &DynamicQueryBuilder{db: db}
}
// 动态构建复杂查询
func (dqb *DynamicQueryBuilder) BuildComplexQuery(params QueryParams) ([]User, error) {
var sqlParts []string
var args []interface{}
// 基础查询
sqlParts = append(sqlParts, "SELECT DISTINCT u.* FROM users u")
// 动态添加 JOIN
if params.IncludeProfile {
sqlParts = append(sqlParts, "LEFT JOIN user_profiles up ON u.id = up.user_id")
}
if params.IncludePosts {
sqlParts = append(sqlParts, "LEFT JOIN posts p ON u.id = p.author_id")
}
// 动态添加 WHERE 条件
var whereClauses []string
if params.Status != "" {
whereClauses = append(whereClauses, "u.status = ?")
args = append(args, params.Status)
}
if params.CreatedAfter != nil {
whereClauses = append(whereClauses, "u.created_at > ?")
args = append(args, *params.CreatedAfter)
}
if params.HasPosts {
whereClauses = append(whereClauses, "p.id IS NOT NULL")
}
if params.Keyword != "" {
whereClauses = append(whereClauses, "(u.username LIKE ? OR u.email LIKE ?)")
keyword := "%" + params.Keyword + "%"
args = append(args, keyword, keyword)
}
if len(whereClauses) > 0 {
sqlParts = append(sqlParts, "WHERE "+strings.Join(whereClauses, " AND "))
}
// 动态添加 ORDER BY
if params.OrderBy != "" {
direction := "ASC"
if params.OrderDesc {
direction = "DESC"
}
sqlParts = append(sqlParts, fmt.Sprintf("ORDER BY u.%s %s", params.OrderBy, direction))
}
// 动态添加 LIMIT
if params.Limit > 0 {
sqlParts = append(sqlParts, "LIMIT ?")
args = append(args, params.Limit)
}
// 执行查询
sql := strings.Join(sqlParts, " ")
var users []User
result := dqb.db.Raw(sql, args...).Scan(&users)
return users, result.Error
}
type QueryParams struct {
Status string
CreatedAfter *time.Time
HasPosts bool
IncludeProfile bool
IncludePosts bool
Keyword string
OrderBy string
OrderDesc bool
Limit int
}
查询优化技巧 #
索引优化 #
// 索引使用分析
func (r *AdvancedUserRepository) AnalyzeQuery(query string, args ...interface{}) error {
// 使用 EXPLAIN 分析查询
explainQuery := "EXPLAIN " + query
rows, err := r.db.Raw(explainQuery, args...).Rows()
if err != nil {
return err
}
defer rows.Close()
log.Println("Query Execution Plan:")
columns, _ := rows.Columns()
for rows.Next() {
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))
for i := range values {
valuePtrs[i] = &values[i]
}
rows.Scan(valuePtrs...)
for i, col := range columns {
val := values[i]
if val != nil {
log.Printf("%s: %v", col, val)
}
}
log.Println("---")
}
return nil
}
// 强制使用索引
func (r *AdvancedUserRepository) ForceIndex(indexName string) ([]User, error) {
var users []User
result := r.db.Raw(fmt.Sprintf(`
SELECT * FROM users FORCE INDEX (%s)
WHERE status = ? AND created_at > ?
`, indexName), "active", time.Now().AddDate(0, -1, 0)).Scan(&users)
return users, result.Error
}
查询缓存 #
// 查询结果缓存
type QueryCache struct {
cache map[string]CacheItem
mu sync.RWMutex
ttl time.Duration
}
type CacheItem struct {
Data interface{}
ExpiresAt time.Time
}
func NewQueryCache(ttl time.Duration) *QueryCache {
return &QueryCache{
cache: make(map[string]CacheItem),
ttl: ttl,
}
}
func (qc *QueryCache) Get(key string) (interface{}, bool) {
qc.mu.RLock()
defer qc.mu.RUnlock()
item, exists := qc.cache[key]
if !exists || time.Now().After(item.ExpiresAt) {
return nil, false
}
return item.Data, true
}
func (qc *QueryCache) Set(key string, data interface{}) {
qc.mu.Lock()
defer qc.mu.Unlock()
qc.cache[key] = CacheItem{
Data: data,
ExpiresAt: time.Now().Add(qc.ttl),
}
}
// 带缓存的查询服务
type CachedQueryService struct {
repo *AdvancedUserRepository
cache *QueryCache
}
func NewCachedQueryService(repo *AdvancedUserRepository) *CachedQueryService {
return &CachedQueryService{
repo: repo,
cache: NewQueryCache(5 * time.Minute),
}
}
func (cqs *CachedQueryService) GetUsersByStatus(status string) ([]User, error) {
cacheKey := fmt.Sprintf("users_by_status_%s", status)
if cached, found := cqs.cache.Get(cacheKey); found {
if users, ok := cached.([]User); ok {
return users, nil
}
}
users, err := cqs.repo.GetByStatus(status)
if err != nil {
return nil, err
}
cqs.cache.Set(cacheKey, users)
return users, nil
}
批量操作优化 #
// 批量查询优化
func (r *AdvancedUserRepository) BatchGetByIDs(ids []uint) ([]User, error) {
if len(ids) == 0 {
return []User{}, nil
}
var users []User
// 使用 IN 查询而不是多次单独查询
result := r.db.Where("id IN ?", ids).Find(&users)
return users, result.Error
}
// 批量更新优化
func (r *AdvancedUserRepository) BatchUpdateStatus(ids []uint, status string) error {
if len(ids) == 0 {
return nil
}
// 使用单个 UPDATE 语句而不是多次更新
result := r.db.Model(&User{}).Where("id IN ?", ids).Update("status", status)
return result.Error
}
// 批量插入优化
func (r *AdvancedUserRepository) BatchInsertOptimized(users []User) error {
if len(users) == 0 {
return nil
}
// 使用批量插入,设置合适的批次大小
batchSize := 100
result := r.db.CreateInBatches(users, batchSize)
return result.Error
}
分页优化 #
// 游标分页(适合大数据量)
func (r *AdvancedUserRepository) CursorPagination(cursor uint, limit int) ([]User, uint, error) {
var users []User
query := r.db.Where("id > ?", cursor).Order("id ASC").Limit(limit + 1)
result := query.Find(&users)
if result.Error != nil {
return nil, 0, result.Error
}
var nextCursor uint
if len(users) > limit {
nextCursor = users[limit-1].ID
users = users[:limit] // 移除多查询的一条记录
}
return users, nextCursor, nil
}
// 优化的偏移分页
func (r *AdvancedUserRepository) OptimizedOffsetPagination(page, pageSize int) ([]User, int64, error) {
var users []User
var total int64
offset := (page - 1) * pageSize
// 对于大偏移量,使用子查询优化
if offset > 10000 {
// 先查询 ID,再根据 ID 查询完整记录
var ids []uint
r.db.Model(&User{}).Select("id").Order("id DESC").Limit(pageSize).Offset(offset).Pluck("id", &ids)
if len(ids) > 0 {
r.db.Where("id IN ?", ids).Order("id DESC").Find(&users)
}
} else {
// 普通分页
r.db.Order("id DESC").Limit(pageSize).Offset(offset).Find(&users)
}
// 获取总数(可以考虑缓存)
r.db.Model(&User{}).Count(&total)
return users, total, nil
}
完整的高级查询服务示例 #
// 高级查询服务
type AdvancedQueryService struct {
repo *AdvancedUserRepository
cache *QueryCache
}
func NewAdvancedQueryService(db *gorm.DB) *AdvancedQueryService {
return &AdvancedQueryService{
repo: NewAdvancedUserRepository(db),
cache: NewQueryCache(10 * time.Minute),
}
}
// 智能搜索用户
func (aqs *AdvancedQueryService) SmartSearch(params SmartSearchParams) (*SearchResult, error) {
cacheKey := fmt.Sprintf("smart_search_%s_%s_%d_%d",
params.Keyword, params.Status, params.Page, params.PageSize)
if cached, found := aqs.cache.Get(cacheKey); found {
if result, ok := cached.(*SearchResult); ok {
return result, nil
}
}
// 构建复杂查询
query := aqs.repo.db.Model(&User{})
// 全文搜索
if params.Keyword != "" {
query = query.Where(`
MATCH(username, first_name, last_name) AGAINST(? IN NATURAL LANGUAGE MODE) OR
username LIKE ? OR email LIKE ?
`, params.Keyword, "%"+params.Keyword+"%", "%"+params.Keyword+"%")
}
// 状态筛选
if params.Status != "" {
query = query.Where("status = ?", params.Status)
}
// 时间范围筛选
if params.CreatedAfter != nil {
query = query.Where("created_at >= ?", *params.CreatedAfter)
}
if params.CreatedBefore != nil {
query = query.Where("created_at <= ?", *params.CreatedBefore)
}
// 获取总数
var total int64
if err := query.Count(&total).Error; err != nil {
return nil, err
}
// 分页查询
var users []User
offset := (params.Page - 1) * params.PageSize
result := query.Preload("Profile").
Order("created_at DESC").
Limit(params.PageSize).
Offset(offset).
Find(&users)
if result.Error != nil {
return nil, result.Error
}
searchResult := &SearchResult{
Users: users,
Pagination: PaginationInfo{
Page: params.Page,
PageSize: params.PageSize,
Total: total,
TotalPage: (total + int64(params.PageSize) - 1) / int64(params.PageSize),
},
}
// 缓存结果
aqs.cache.Set(cacheKey, searchResult)
return searchResult, nil
}
type SmartSearchParams struct {
Keyword string
Status string
CreatedAfter *time.Time
CreatedBefore *time.Time
Page int
PageSize int
}
type SearchResult struct {
Users []User `json:"users"`
Pagination PaginationInfo `json:"pagination"`
}
通过本节的学习,你已经掌握了 GORM 的高级查询技术,包括复杂查询构建、预加载策略、原生 SQL 使用以及查询优化技巧。这些知识将帮助你构建高效、灵活的数据访问层。在下一节中,我们将学习数据库迁移和事务处理。