2.4.1 Worker Pool 模式 #
Worker Pool(工作池)是并发编程中最常用的设计模式之一。它通过创建固定数量的工作协程来处理大量任务,有效控制并发度,避免系统资源耗尽,同时提高程序的整体性能。
基本概念 #
Worker Pool 模式的核心思想是:
- 任务队列:使用 channel 作为任务队列,存储待处理的任务
- 工作协程:创建固定数量的 goroutine 作为工作者
- 任务分发:工作者从任务队列中获取任务并处理
- 结果收集:可选地收集处理结果
基础实现 #
让我们从一个简单的 Worker Pool 实现开始:
package main
import (
"fmt"
"sync"
"time"
)
// Job 表示一个工作任务
type Job struct {
ID int
Data string
}
// Result 表示任务处理结果
type Result struct {
JobID int
Value string
Error error
}
// Worker 工作者函数
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d 开始处理任务 %d\n", id, job.ID)
// 模拟任务处理时间
time.Sleep(time.Millisecond * 100)
// 处理任务
result := Result{
JobID: job.ID,
Value: fmt.Sprintf("处理结果: %s", job.Data),
Error: nil,
}
results <- result
fmt.Printf("Worker %d 完成任务 %d\n", id, job.ID)
}
}
func main() {
const numWorkers = 3
const numJobs = 10
// 创建任务和结果channel
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
// 创建WaitGroup等待所有worker完成
var wg sync.WaitGroup
// 启动worker
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// 发送任务
for i := 1; i <= numJobs; i++ {
jobs <- Job{
ID: i,
Data: fmt.Sprintf("任务数据 %d", i),
}
}
close(jobs) // 关闭任务channel,通知worker没有更多任务
// 等待所有worker完成
go func() {
wg.Wait()
close(results) // 所有worker完成后关闭结果channel
}()
// 收集结果
for result := range results {
if result.Error != nil {
fmt.Printf("任务 %d 处理失败: %v\n", result.JobID, result.Error)
} else {
fmt.Printf("任务 %d 处理成功: %s\n", result.JobID, result.Value)
}
}
fmt.Println("所有任务处理完成")
}
面向对象的 Worker Pool #
为了更好的封装和复用,我们可以创建一个面向对象的 Worker Pool:
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Task 定义任务接口
type Task interface {
Process() (interface{}, error)
}
// WorkerPool 工作池结构
type WorkerPool struct {
workerCount int
taskQueue chan Task
resultQueue chan TaskResult
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
// TaskResult 任务结果
type TaskResult struct {
Result interface{}
Error error
}
// NewWorkerPool 创建新的工作池
func NewWorkerPool(workerCount, queueSize int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
return &WorkerPool{
workerCount: workerCount,
taskQueue: make(chan Task, queueSize),
resultQueue: make(chan TaskResult, queueSize),
ctx: ctx,
cancel: cancel,
}
}
// Start 启动工作池
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workerCount; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
// worker 工作者协程
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for {
select {
case task, ok := <-wp.taskQueue:
if !ok {
fmt.Printf("Worker %d 退出\n", id)
return
}
fmt.Printf("Worker %d 开始处理任务\n", id)
result, err := task.Process()
select {
case wp.resultQueue <- TaskResult{Result: result, Error: err}:
case <-wp.ctx.Done():
return
}
case <-wp.ctx.Done():
fmt.Printf("Worker %d 被取消\n", id)
return
}
}
}
// Submit 提交任务
func (wp *WorkerPool) Submit(task Task) error {
select {
case wp.taskQueue <- task:
return nil
case <-wp.ctx.Done():
return fmt.Errorf("工作池已关闭")
default:
return fmt.Errorf("任务队列已满")
}
}
// Results 获取结果channel
func (wp *WorkerPool) Results() <-chan TaskResult {
return wp.resultQueue
}
// Stop 停止工作池
func (wp *WorkerPool) Stop() {
close(wp.taskQueue)
wp.wg.Wait()
close(wp.resultQueue)
}
// Cancel 取消所有任务
func (wp *WorkerPool) Cancel() {
wp.cancel()
wp.wg.Wait()
close(wp.resultQueue)
}
// 示例任务实现
type NumberTask struct {
Number int
}
func (nt *NumberTask) Process() (interface{}, error) {
// 模拟处理时间
time.Sleep(time.Millisecond * 200)
if nt.Number < 0 {
return nil, fmt.Errorf("负数无法处理")
}
return nt.Number * nt.Number, nil
}
func main() {
// 创建工作池
pool := NewWorkerPool(3, 10)
pool.Start()
// 提交任务
for i := -2; i <= 10; i++ {
task := &NumberTask{Number: i}
if err := pool.Submit(task); err != nil {
fmt.Printf("提交任务失败: %v\n", err)
}
}
// 收集结果
go func() {
time.Sleep(time.Second * 3) // 3秒后停止
pool.Stop()
}()
for result := range pool.Results() {
if result.Error != nil {
fmt.Printf("任务处理失败: %v\n", result.Error)
} else {
fmt.Printf("任务处理成功: %v\n", result.Result)
}
}
fmt.Println("工作池已停止")
}
动态调整的 Worker Pool #
在某些场景下,我们可能需要根据负载动态调整 worker 数量:
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
// DynamicWorkerPool 动态工作池
type DynamicWorkerPool struct {
minWorkers int
maxWorkers int
currentWorkers int64
taskQueue chan Task
resultQueue chan TaskResult
workerWg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
mu sync.RWMutex
}
// NewDynamicWorkerPool 创建动态工作池
func NewDynamicWorkerPool(minWorkers, maxWorkers, queueSize int) *DynamicWorkerPool {
ctx, cancel := context.WithCancel(context.Background())
pool := &DynamicWorkerPool{
minWorkers: minWorkers,
maxWorkers: maxWorkers,
taskQueue: make(chan Task, queueSize),
resultQueue: make(chan TaskResult, queueSize),
ctx: ctx,
cancel: cancel,
}
return pool
}
// Start 启动动态工作池
func (dwp *DynamicWorkerPool) Start() {
// 启动最小数量的worker
for i := 0; i < dwp.minWorkers; i++ {
dwp.addWorker()
}
// 启动监控协程
go dwp.monitor()
}
// addWorker 添加worker
func (dwp *DynamicWorkerPool) addWorker() {
if atomic.LoadInt64(&dwp.currentWorkers) >= int64(dwp.maxWorkers) {
return
}
atomic.AddInt64(&dwp.currentWorkers, 1)
dwp.workerWg.Add(1)
go func() {
defer dwp.workerWg.Done()
defer atomic.AddInt64(&dwp.currentWorkers, -1)
idleCount := 0
for {
select {
case task, ok := <-dwp.taskQueue:
if !ok {
return
}
idleCount = 0 // 重置空闲计数
result, err := task.Process()
select {
case dwp.resultQueue <- TaskResult{Result: result, Error: err}:
case <-dwp.ctx.Done():
return
}
case <-time.After(time.Second):
idleCount++
// 如果空闲时间过长且worker数量大于最小值,则退出
if idleCount > 5 && atomic.LoadInt64(&dwp.currentWorkers) > int64(dwp.minWorkers) {
fmt.Printf("Worker 因空闲退出,当前worker数量: %d\n",
atomic.LoadInt64(&dwp.currentWorkers)-1)
return
}
case <-dwp.ctx.Done():
return
}
}
}()
}
// monitor 监控任务队列长度,动态调整worker数量
func (dwp *DynamicWorkerPool) monitor() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
queueLen := len(dwp.taskQueue)
currentWorkers := atomic.LoadInt64(&dwp.currentWorkers)
// 如果队列长度大于worker数量的2倍,增加worker
if queueLen > int(currentWorkers)*2 && currentWorkers < int64(dwp.maxWorkers) {
dwp.addWorker()
fmt.Printf("增加worker,当前worker数量: %d,队列长度: %d\n",
atomic.LoadInt64(&dwp.currentWorkers), queueLen)
}
case <-dwp.ctx.Done():
return
}
}
}
// Submit 提交任务
func (dwp *DynamicWorkerPool) Submit(task Task) error {
select {
case dwp.taskQueue <- task:
return nil
case <-dwp.ctx.Done():
return fmt.Errorf("工作池已关闭")
default:
return fmt.Errorf("任务队列已满")
}
}
// Results 获取结果channel
func (dwp *DynamicWorkerPool) Results() <-chan TaskResult {
return dwp.resultQueue
}
// Stop 停止工作池
func (dwp *DynamicWorkerPool) Stop() {
close(dwp.taskQueue)
dwp.workerWg.Wait()
close(dwp.resultQueue)
}
// GetWorkerCount 获取当前worker数量
func (dwp *DynamicWorkerPool) GetWorkerCount() int64 {
return atomic.LoadInt64(&dwp.currentWorkers)
}
func main() {
// 创建动态工作池
pool := NewDynamicWorkerPool(2, 10, 50)
pool.Start()
// 模拟突发任务
go func() {
for i := 0; i < 100; i++ {
task := &NumberTask{Number: i}
if err := pool.Submit(task); err != nil {
fmt.Printf("提交任务失败: %v\n", err)
time.Sleep(time.Millisecond * 10)
}
if i == 50 {
time.Sleep(time.Second * 2) // 模拟任务间隔
}
}
}()
// 监控worker数量变化
go func() {
for i := 0; i < 20; i++ {
fmt.Printf("当前worker数量: %d,队列长度: %d\n",
pool.GetWorkerCount(), len(pool.taskQueue))
time.Sleep(time.Second)
}
pool.Stop()
}()
// 收集结果
resultCount := 0
for result := range pool.Results() {
resultCount++
if result.Error != nil {
fmt.Printf("任务处理失败: %v\n", result.Error)
} else if resultCount%10 == 0 {
fmt.Printf("已处理 %d 个任务,最新结果: %v\n", resultCount, result.Result)
}
}
fmt.Printf("总共处理了 %d 个任务\n", resultCount)
}
最佳实践 #
1. 合理设置 Worker 数量 #
import (
"runtime"
)
// 根据CPU核心数设置worker数量
func getOptimalWorkerCount() int {
// CPU密集型任务
cpuIntensive := runtime.NumCPU()
// I/O密集型任务
ioIntensive := runtime.NumCPU() * 2
// 根据实际情况选择
return cpuIntensive
}
2. 优雅关闭 #
// GracefulWorkerPool 支持优雅关闭的工作池
type GracefulWorkerPool struct {
*WorkerPool
shutdownTimeout time.Duration
}
func (gwp *GracefulWorkerPool) GracefulStop() error {
// 停止接收新任务
close(gwp.taskQueue)
// 等待现有任务完成,但设置超时
done := make(chan struct{})
go func() {
gwp.wg.Wait()
close(done)
}()
select {
case <-done:
close(gwp.resultQueue)
return nil
case <-time.After(gwp.shutdownTimeout):
gwp.cancel() // 强制取消
return fmt.Errorf("优雅关闭超时")
}
}
3. 错误处理和重试 #
// RetryTask 支持重试的任务
type RetryTask struct {
task Task
maxRetries int
retryCount int
}
func (rt *RetryTask) Process() (interface{}, error) {
for rt.retryCount <= rt.maxRetries {
result, err := rt.task.Process()
if err == nil {
return result, nil
}
rt.retryCount++
if rt.retryCount <= rt.maxRetries {
time.Sleep(time.Second * time.Duration(rt.retryCount))
}
}
return nil, fmt.Errorf("任务重试 %d 次后仍然失败", rt.maxRetries)
}
性能优化技巧 #
1. 减少内存分配 #
// 使用对象池减少内存分配
var taskPool = sync.Pool{
New: func() interface{} {
return &NumberTask{}
},
}
func getTask(number int) *NumberTask {
task := taskPool.Get().(*NumberTask)
task.Number = number
return task
}
func putTask(task *NumberTask) {
task.Number = 0 // 重置状态
taskPool.Put(task)
}
2. 批量处理 #
// BatchTask 批量任务
type BatchTask struct {
Tasks []Task
}
func (bt *BatchTask) Process() (interface{}, error) {
results := make([]interface{}, len(bt.Tasks))
for i, task := range bt.Tasks {
result, err := task.Process()
if err != nil {
return nil, err
}
results[i] = result
}
return results, nil
}
Worker Pool 模式是 Go 并发编程的基石,掌握其实现原理和优化技巧对于构建高性能的并发应用至关重要。在实际应用中,要根据具体的业务场景选择合适的实现方式,并注意资源管理和错误处理。