2.4.1 Worker Pool 模式

2.4.1 Worker Pool 模式 #

Worker Pool(工作池)是并发编程中最常用的设计模式之一。它通过创建固定数量的工作协程来处理大量任务,有效控制并发度,避免系统资源耗尽,同时提高程序的整体性能。

基本概念 #

Worker Pool 模式的核心思想是:

  1. 任务队列:使用 channel 作为任务队列,存储待处理的任务
  2. 工作协程:创建固定数量的 goroutine 作为工作者
  3. 任务分发:工作者从任务队列中获取任务并处理
  4. 结果收集:可选地收集处理结果

基础实现 #

让我们从一个简单的 Worker Pool 实现开始:

package main

import (
    "fmt"
    "sync"
    "time"
)

// Job 表示一个工作任务
type Job struct {
    ID   int
    Data string
}

// Result 表示任务处理结果
type Result struct {
    JobID int
    Value string
    Error error
}

// Worker 工作者函数
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()

    for job := range jobs {
        fmt.Printf("Worker %d 开始处理任务 %d\n", id, job.ID)

        // 模拟任务处理时间
        time.Sleep(time.Millisecond * 100)

        // 处理任务
        result := Result{
            JobID: job.ID,
            Value: fmt.Sprintf("处理结果: %s", job.Data),
            Error: nil,
        }

        results <- result
        fmt.Printf("Worker %d 完成任务 %d\n", id, job.ID)
    }
}

func main() {
    const numWorkers = 3
    const numJobs = 10

    // 创建任务和结果channel
    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)

    // 创建WaitGroup等待所有worker完成
    var wg sync.WaitGroup

    // 启动worker
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }

    // 发送任务
    for i := 1; i <= numJobs; i++ {
        jobs <- Job{
            ID:   i,
            Data: fmt.Sprintf("任务数据 %d", i),
        }
    }
    close(jobs) // 关闭任务channel,通知worker没有更多任务

    // 等待所有worker完成
    go func() {
        wg.Wait()
        close(results) // 所有worker完成后关闭结果channel
    }()

    // 收集结果
    for result := range results {
        if result.Error != nil {
            fmt.Printf("任务 %d 处理失败: %v\n", result.JobID, result.Error)
        } else {
            fmt.Printf("任务 %d 处理成功: %s\n", result.JobID, result.Value)
        }
    }

    fmt.Println("所有任务处理完成")
}

面向对象的 Worker Pool #

为了更好的封装和复用,我们可以创建一个面向对象的 Worker Pool:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Task 定义任务接口
type Task interface {
    Process() (interface{}, error)
}

// WorkerPool 工作池结构
type WorkerPool struct {
    workerCount int
    taskQueue   chan Task
    resultQueue chan TaskResult
    wg          sync.WaitGroup
    ctx         context.Context
    cancel      context.CancelFunc
}

// TaskResult 任务结果
type TaskResult struct {
    Result interface{}
    Error  error
}

// NewWorkerPool 创建新的工作池
func NewWorkerPool(workerCount, queueSize int) *WorkerPool {
    ctx, cancel := context.WithCancel(context.Background())

    return &WorkerPool{
        workerCount: workerCount,
        taskQueue:   make(chan Task, queueSize),
        resultQueue: make(chan TaskResult, queueSize),
        ctx:         ctx,
        cancel:      cancel,
    }
}

// Start 启动工作池
func (wp *WorkerPool) Start() {
    for i := 0; i < wp.workerCount; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }
}

// worker 工作者协程
func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()

    for {
        select {
        case task, ok := <-wp.taskQueue:
            if !ok {
                fmt.Printf("Worker %d 退出\n", id)
                return
            }

            fmt.Printf("Worker %d 开始处理任务\n", id)
            result, err := task.Process()

            select {
            case wp.resultQueue <- TaskResult{Result: result, Error: err}:
            case <-wp.ctx.Done():
                return
            }

        case <-wp.ctx.Done():
            fmt.Printf("Worker %d 被取消\n", id)
            return
        }
    }
}

// Submit 提交任务
func (wp *WorkerPool) Submit(task Task) error {
    select {
    case wp.taskQueue <- task:
        return nil
    case <-wp.ctx.Done():
        return fmt.Errorf("工作池已关闭")
    default:
        return fmt.Errorf("任务队列已满")
    }
}

// Results 获取结果channel
func (wp *WorkerPool) Results() <-chan TaskResult {
    return wp.resultQueue
}

// Stop 停止工作池
func (wp *WorkerPool) Stop() {
    close(wp.taskQueue)
    wp.wg.Wait()
    close(wp.resultQueue)
}

// Cancel 取消所有任务
func (wp *WorkerPool) Cancel() {
    wp.cancel()
    wp.wg.Wait()
    close(wp.resultQueue)
}

// 示例任务实现
type NumberTask struct {
    Number int
}

func (nt *NumberTask) Process() (interface{}, error) {
    // 模拟处理时间
    time.Sleep(time.Millisecond * 200)

    if nt.Number < 0 {
        return nil, fmt.Errorf("负数无法处理")
    }

    return nt.Number * nt.Number, nil
}

func main() {
    // 创建工作池
    pool := NewWorkerPool(3, 10)
    pool.Start()

    // 提交任务
    for i := -2; i <= 10; i++ {
        task := &NumberTask{Number: i}
        if err := pool.Submit(task); err != nil {
            fmt.Printf("提交任务失败: %v\n", err)
        }
    }

    // 收集结果
    go func() {
        time.Sleep(time.Second * 3) // 3秒后停止
        pool.Stop()
    }()

    for result := range pool.Results() {
        if result.Error != nil {
            fmt.Printf("任务处理失败: %v\n", result.Error)
        } else {
            fmt.Printf("任务处理成功: %v\n", result.Result)
        }
    }

    fmt.Println("工作池已停止")
}

动态调整的 Worker Pool #

在某些场景下,我们可能需要根据负载动态调整 worker 数量:

package main

import (
    "context"
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

// DynamicWorkerPool 动态工作池
type DynamicWorkerPool struct {
    minWorkers    int
    maxWorkers    int
    currentWorkers int64
    taskQueue     chan Task
    resultQueue   chan TaskResult
    workerWg      sync.WaitGroup
    ctx           context.Context
    cancel        context.CancelFunc
    mu            sync.RWMutex
}

// NewDynamicWorkerPool 创建动态工作池
func NewDynamicWorkerPool(minWorkers, maxWorkers, queueSize int) *DynamicWorkerPool {
    ctx, cancel := context.WithCancel(context.Background())

    pool := &DynamicWorkerPool{
        minWorkers:  minWorkers,
        maxWorkers:  maxWorkers,
        taskQueue:   make(chan Task, queueSize),
        resultQueue: make(chan TaskResult, queueSize),
        ctx:         ctx,
        cancel:      cancel,
    }

    return pool
}

// Start 启动动态工作池
func (dwp *DynamicWorkerPool) Start() {
    // 启动最小数量的worker
    for i := 0; i < dwp.minWorkers; i++ {
        dwp.addWorker()
    }

    // 启动监控协程
    go dwp.monitor()
}

// addWorker 添加worker
func (dwp *DynamicWorkerPool) addWorker() {
    if atomic.LoadInt64(&dwp.currentWorkers) >= int64(dwp.maxWorkers) {
        return
    }

    atomic.AddInt64(&dwp.currentWorkers, 1)
    dwp.workerWg.Add(1)

    go func() {
        defer dwp.workerWg.Done()
        defer atomic.AddInt64(&dwp.currentWorkers, -1)

        idleCount := 0
        for {
            select {
            case task, ok := <-dwp.taskQueue:
                if !ok {
                    return
                }

                idleCount = 0 // 重置空闲计数
                result, err := task.Process()

                select {
                case dwp.resultQueue <- TaskResult{Result: result, Error: err}:
                case <-dwp.ctx.Done():
                    return
                }

            case <-time.After(time.Second):
                idleCount++
                // 如果空闲时间过长且worker数量大于最小值,则退出
                if idleCount > 5 && atomic.LoadInt64(&dwp.currentWorkers) > int64(dwp.minWorkers) {
                    fmt.Printf("Worker 因空闲退出,当前worker数量: %d\n",
                        atomic.LoadInt64(&dwp.currentWorkers)-1)
                    return
                }

            case <-dwp.ctx.Done():
                return
            }
        }
    }()
}

// monitor 监控任务队列长度,动态调整worker数量
func (dwp *DynamicWorkerPool) monitor() {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            queueLen := len(dwp.taskQueue)
            currentWorkers := atomic.LoadInt64(&dwp.currentWorkers)

            // 如果队列长度大于worker数量的2倍,增加worker
            if queueLen > int(currentWorkers)*2 && currentWorkers < int64(dwp.maxWorkers) {
                dwp.addWorker()
                fmt.Printf("增加worker,当前worker数量: %d,队列长度: %d\n",
                    atomic.LoadInt64(&dwp.currentWorkers), queueLen)
            }

        case <-dwp.ctx.Done():
            return
        }
    }
}

// Submit 提交任务
func (dwp *DynamicWorkerPool) Submit(task Task) error {
    select {
    case dwp.taskQueue <- task:
        return nil
    case <-dwp.ctx.Done():
        return fmt.Errorf("工作池已关闭")
    default:
        return fmt.Errorf("任务队列已满")
    }
}

// Results 获取结果channel
func (dwp *DynamicWorkerPool) Results() <-chan TaskResult {
    return dwp.resultQueue
}

// Stop 停止工作池
func (dwp *DynamicWorkerPool) Stop() {
    close(dwp.taskQueue)
    dwp.workerWg.Wait()
    close(dwp.resultQueue)
}

// GetWorkerCount 获取当前worker数量
func (dwp *DynamicWorkerPool) GetWorkerCount() int64 {
    return atomic.LoadInt64(&dwp.currentWorkers)
}

func main() {
    // 创建动态工作池
    pool := NewDynamicWorkerPool(2, 10, 50)
    pool.Start()

    // 模拟突发任务
    go func() {
        for i := 0; i < 100; i++ {
            task := &NumberTask{Number: i}
            if err := pool.Submit(task); err != nil {
                fmt.Printf("提交任务失败: %v\n", err)
                time.Sleep(time.Millisecond * 10)
            }

            if i == 50 {
                time.Sleep(time.Second * 2) // 模拟任务间隔
            }
        }
    }()

    // 监控worker数量变化
    go func() {
        for i := 0; i < 20; i++ {
            fmt.Printf("当前worker数量: %d,队列长度: %d\n",
                pool.GetWorkerCount(), len(pool.taskQueue))
            time.Sleep(time.Second)
        }
        pool.Stop()
    }()

    // 收集结果
    resultCount := 0
    for result := range pool.Results() {
        resultCount++
        if result.Error != nil {
            fmt.Printf("任务处理失败: %v\n", result.Error)
        } else if resultCount%10 == 0 {
            fmt.Printf("已处理 %d 个任务,最新结果: %v\n", resultCount, result.Result)
        }
    }

    fmt.Printf("总共处理了 %d 个任务\n", resultCount)
}

最佳实践 #

1. 合理设置 Worker 数量 #

import (
    "runtime"
)

// 根据CPU核心数设置worker数量
func getOptimalWorkerCount() int {
    // CPU密集型任务
    cpuIntensive := runtime.NumCPU()

    // I/O密集型任务
    ioIntensive := runtime.NumCPU() * 2

    // 根据实际情况选择
    return cpuIntensive
}

2. 优雅关闭 #

// GracefulWorkerPool 支持优雅关闭的工作池
type GracefulWorkerPool struct {
    *WorkerPool
    shutdownTimeout time.Duration
}

func (gwp *GracefulWorkerPool) GracefulStop() error {
    // 停止接收新任务
    close(gwp.taskQueue)

    // 等待现有任务完成,但设置超时
    done := make(chan struct{})
    go func() {
        gwp.wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        close(gwp.resultQueue)
        return nil
    case <-time.After(gwp.shutdownTimeout):
        gwp.cancel() // 强制取消
        return fmt.Errorf("优雅关闭超时")
    }
}

3. 错误处理和重试 #

// RetryTask 支持重试的任务
type RetryTask struct {
    task       Task
    maxRetries int
    retryCount int
}

func (rt *RetryTask) Process() (interface{}, error) {
    for rt.retryCount <= rt.maxRetries {
        result, err := rt.task.Process()
        if err == nil {
            return result, nil
        }

        rt.retryCount++
        if rt.retryCount <= rt.maxRetries {
            time.Sleep(time.Second * time.Duration(rt.retryCount))
        }
    }

    return nil, fmt.Errorf("任务重试 %d 次后仍然失败", rt.maxRetries)
}

性能优化技巧 #

1. 减少内存分配 #

// 使用对象池减少内存分配
var taskPool = sync.Pool{
    New: func() interface{} {
        return &NumberTask{}
    },
}

func getTask(number int) *NumberTask {
    task := taskPool.Get().(*NumberTask)
    task.Number = number
    return task
}

func putTask(task *NumberTask) {
    task.Number = 0 // 重置状态
    taskPool.Put(task)
}

2. 批量处理 #

// BatchTask 批量任务
type BatchTask struct {
    Tasks []Task
}

func (bt *BatchTask) Process() (interface{}, error) {
    results := make([]interface{}, len(bt.Tasks))

    for i, task := range bt.Tasks {
        result, err := task.Process()
        if err != nil {
            return nil, err
        }
        results[i] = result
    }

    return results, nil
}

Worker Pool 模式是 Go 并发编程的基石,掌握其实现原理和优化技巧对于构建高性能的并发应用至关重要。在实际应用中,要根据具体的业务场景选择合适的实现方式,并注意资源管理和错误处理。