2.1.3 并发模型与模式 #
并发编程模型概述 #
在并发编程中,不同的编程语言采用不同的并发模型来处理并发任务。理解这些模型有助于我们更好地使用 Go 语言的并发特性。
主要并发模型 #
1. 共享内存模型 #
传统的并发模型,多个线程共享同一块内存空间,通过锁机制来保证数据一致性:
package main
import (
"fmt"
"sync"
"time"
)
// 共享内存模型示例
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func sharedMemoryExample() {
counter := &Counter{}
var wg sync.WaitGroup
// 启动多个 Goroutine 并发修改共享变量
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 1000; j++ {
counter.Increment()
}
fmt.Printf("Goroutine %d completed\n", id)
}(i)
}
wg.Wait()
fmt.Printf("Final counter value: %d\n", counter.Value())
}
2. 消息传递模型(CSP) #
Go 语言采用的主要模型,基于 Tony Hoare 的 CSP(Communicating Sequential Processes)理论:
package main
import (
"fmt"
"time"
)
// CSP 模型示例
func messagePassingExample() {
// 创建用于通信的 channel
jobs := make(chan int, 10)
results := make(chan int, 10)
// 启动 worker goroutines
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 发送任务
for j := 1; j <= 9; j++ {
jobs <- j
}
close(jobs)
// 收集结果
for r := 1; r <= 9; r++ {
result := <-results
fmt.Printf("Result: %d\n", result)
}
}
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(100 * time.Millisecond) // 模拟工作
results <- job * 2
}
}
常见并发模式 #
1. 生产者-消费者模式 #
这是最基本的并发模式之一,生产者生成数据,消费者处理数据:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 数据结构
type Task struct {
ID int
Data string
}
// 生产者
func producer(id int, tasks chan<- Task, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
task := Task{
ID: id*100 + i,
Data: fmt.Sprintf("Data from producer %d, task %d", id, i),
}
fmt.Printf("Producer %d: Creating task %d\n", id, task.ID)
tasks <- task
// 随机延迟
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
}
fmt.Printf("Producer %d: Finished\n", id)
}
// 消费者
func consumer(id int, tasks <-chan Task, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
fmt.Printf("Consumer %d: Processing task %d - %s\n", id, task.ID, task.Data)
// 模拟处理时间
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
fmt.Printf("Consumer %d: Completed task %d\n", id, task.ID)
}
fmt.Printf("Consumer %d: Finished\n", id)
}
func producerConsumerExample() {
tasks := make(chan Task, 10) // 带缓冲的 channel
var producerWg, consumerWg sync.WaitGroup
// 启动生产者
numProducers := 2
for i := 0; i < numProducers; i++ {
producerWg.Add(1)
go producer(i, tasks, &producerWg)
}
// 启动消费者
numConsumers := 3
for i := 0; i < numConsumers; i++ {
consumerWg.Add(1)
go consumer(i, tasks, &consumerWg)
}
// 等待所有生产者完成,然后关闭 channel
go func() {
producerWg.Wait()
close(tasks)
}()
// 等待所有消费者完成
consumerWg.Wait()
fmt.Println("All producers and consumers finished")
}
2. 扇入模式(Fan-in) #
将多个输入源的数据合并到一个输出通道:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 数据源
func dataSource(name string, output chan<- string) {
defer close(output)
for i := 0; i < 5; i++ {
data := fmt.Sprintf("%s-data-%d", name, i)
fmt.Printf("Source %s: Generating %s\n", name, data)
output <- data
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
}
fmt.Printf("Source %s: Finished\n", name)
}
// 扇入函数
func fanIn(inputs ...<-chan string) <-chan string {
output := make(chan string)
var wg sync.WaitGroup
// 为每个输入 channel 启动一个 goroutine
for i, input := range inputs {
wg.Add(1)
go func(id int, ch <-chan string) {
defer wg.Done()
for data := range ch {
output <- fmt.Sprintf("[Source-%d] %s", id, data)
}
}(i, input)
}
// 等待所有输入完成后关闭输出 channel
go func() {
wg.Wait()
close(output)
}()
return output
}
func fanInExample() {
// 创建多个数据源
source1 := make(chan string)
source2 := make(chan string)
source3 := make(chan string)
// 启动数据源
go dataSource("A", source1)
go dataSource("B", source2)
go dataSource("C", source3)
// 扇入合并
merged := fanIn(source1, source2, source3)
// 处理合并后的数据
fmt.Println("=== Merged Data ===")
for data := range merged {
fmt.Printf("Received: %s\n", data)
}
fmt.Println("Fan-in example completed")
}
3. 扇出模式(Fan-out) #
将一个输入源的数据分发到多个处理器:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 任务类型
type Job struct {
ID int
Payload string
Duration time.Duration
}
// 任务生成器
func jobGenerator(jobs chan<- Job) {
defer close(jobs)
for i := 0; i < 20; i++ {
job := Job{
ID: i,
Payload: fmt.Sprintf("Job-%d-payload", i),
Duration: time.Duration(rand.Intn(500)) * time.Millisecond,
}
fmt.Printf("Generated job %d\n", job.ID)
jobs <- job
time.Sleep(50 * time.Millisecond)
}
fmt.Println("Job generator finished")
}
// 工作处理器
func worker(id int, jobs <-chan Job, results chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d: Starting job %d\n", id, job.ID)
// 模拟工作
time.Sleep(job.Duration)
result := fmt.Sprintf("Worker-%d processed job-%d (%s)", id, job.ID, job.Payload)
results <- result
fmt.Printf("Worker %d: Completed job %d\n", id, job.ID)
}
fmt.Printf("Worker %d: Finished\n", id)
}
func fanOutExample() {
jobs := make(chan Job, 10)
results := make(chan string, 20)
// 启动任务生成器
go jobGenerator(jobs)
// 启动多个工作处理器(扇出)
var wg sync.WaitGroup
numWorkers := 4
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// 等待所有工作完成后关闭结果 channel
go func() {
wg.Wait()
close(results)
}()
// 收集结果
fmt.Println("\n=== Results ===")
for result := range results {
fmt.Printf("Result: %s\n", result)
}
fmt.Println("Fan-out example completed")
}
4. 管道模式(Pipeline) #
将数据处理分解为多个阶段,每个阶段由不同的 Goroutine 处理:
package main
import (
"fmt"
"strconv"
"strings"
"time"
)
// 管道阶段1:数据生成
func generateNumbers(nums chan<- int) {
defer close(nums)
for i := 1; i <= 10; i++ {
fmt.Printf("Stage 1: Generating %d\n", i)
nums <- i
time.Sleep(100 * time.Millisecond)
}
fmt.Println("Stage 1: Number generation completed")
}
// 管道阶段2:数据转换
func squareNumbers(nums <-chan int, squares chan<- int) {
defer close(squares)
for num := range nums {
squared := num * num
fmt.Printf("Stage 2: %d squared = %d\n", num, squared)
squares <- squared
time.Sleep(50 * time.Millisecond)
}
fmt.Println("Stage 2: Number squaring completed")
}
// 管道阶段3:数据格式化
func formatNumbers(squares <-chan int, formatted chan<- string) {
defer close(formatted)
for square := range squares {
result := fmt.Sprintf("Number: %s", strconv.Itoa(square))
fmt.Printf("Stage 3: Formatting %d -> %s\n", square, result)
formatted <- result
time.Sleep(30 * time.Millisecond)
}
fmt.Println("Stage 3: Number formatting completed")
}
// 管道阶段4:数据输出
func printResults(formatted <-chan string) {
fmt.Println("\n=== Final Results ===")
for result := range formatted {
fmt.Printf("Output: %s\n", strings.ToUpper(result))
time.Sleep(20 * time.Millisecond)
}
fmt.Println("Stage 4: Output completed")
}
func pipelineExample() {
// 创建管道 channels
nums := make(chan int)
squares := make(chan int)
formatted := make(chan string)
// 启动管道各阶段
go generateNumbers(nums)
go squareNumbers(nums, squares)
go formatNumbers(squares, formatted)
// 处理最终结果
printResults(formatted)
fmt.Println("Pipeline example completed")
}
5. 工作池模式(Worker Pool) #
创建固定数量的工作 Goroutine 来处理任务队列:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 工作任务
type WorkItem struct {
ID int
Data string
}
// 工作结果
type WorkResult struct {
Item WorkItem
Result string
Worker int
}
// 工作池
type WorkerPool struct {
workerCount int
jobQueue chan WorkItem
resultQueue chan WorkResult
wg sync.WaitGroup
}
// 创建工作池
func NewWorkerPool(workerCount, jobQueueSize, resultQueueSize int) *WorkerPool {
return &WorkerPool{
workerCount: workerCount,
jobQueue: make(chan WorkItem, jobQueueSize),
resultQueue: make(chan WorkResult, resultQueueSize),
}
}
// 启动工作池
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workerCount; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
// 工作者
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
fmt.Printf("Worker %d: Started\n", id)
for job := range wp.jobQueue {
fmt.Printf("Worker %d: Processing job %d\n", id, job.ID)
// 模拟工作
processingTime := time.Duration(rand.Intn(500)) * time.Millisecond
time.Sleep(processingTime)
result := WorkResult{
Item: job,
Result: fmt.Sprintf("Processed: %s (took %v)", job.Data, processingTime),
Worker: id,
}
wp.resultQueue <- result
fmt.Printf("Worker %d: Completed job %d\n", id, job.ID)
}
fmt.Printf("Worker %d: Finished\n", id)
}
// 提交任务
func (wp *WorkerPool) Submit(item WorkItem) {
wp.jobQueue <- item
}
// 关闭任务队列
func (wp *WorkerPool) Close() {
close(wp.jobQueue)
}
// 等待所有工作完成
func (wp *WorkerPool) Wait() {
wp.wg.Wait()
close(wp.resultQueue)
}
// 获取结果
func (wp *WorkerPool) Results() <-chan WorkResult {
return wp.resultQueue
}
func workerPoolExample() {
// 创建工作池
pool := NewWorkerPool(3, 10, 20)
// 启动工作池
pool.Start()
// 提交任务
go func() {
for i := 0; i < 15; i++ {
item := WorkItem{
ID: i,
Data: fmt.Sprintf("Task-%d-data", i),
}
fmt.Printf("Submitting job %d\n", item.ID)
pool.Submit(item)
}
pool.Close()
}()
// 收集结果
go func() {
pool.Wait()
}()
// 处理结果
fmt.Println("\n=== Processing Results ===")
for result := range pool.Results() {
fmt.Printf("Result from Worker %d: Job %d - %s\n",
result.Worker, result.Item.ID, result.Result)
}
fmt.Println("Worker pool example completed")
}
6. 超时模式 #
使用 select
语句实现超时控制:
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
// 模拟可能超时的操作
func slowOperation(id int) <-chan string {
result := make(chan string, 1)
go func() {
// 随机延迟,模拟不确定的处理时间
delay := time.Duration(rand.Intn(2000)) * time.Millisecond
time.Sleep(delay)
result <- fmt.Sprintf("Operation %d completed after %v", id, delay)
}()
return result
}
// 使用 select 实现超时
func timeoutWithSelect() {
fmt.Println("=== Timeout with Select ===")
for i := 0; i < 5; i++ {
fmt.Printf("Starting operation %d...\n", i)
select {
case result := <-slowOperation(i):
fmt.Printf("Success: %s\n", result)
case <-time.After(1 * time.Second):
fmt.Printf("Timeout: Operation %d took too long\n", i)
}
}
}
// 使用 context 实现超时
func timeoutWithContext() {
fmt.Println("\n=== Timeout with Context ===")
for i := 0; i < 5; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
fmt.Printf("Starting operation %d with context...\n", i)
select {
case result := <-slowOperation(i):
fmt.Printf("Success: %s\n", result)
case <-ctx.Done():
fmt.Printf("Context timeout: Operation %d cancelled\n", i)
}
cancel() // 清理资源
}
}
// 可取消的长时间运行任务
func cancellableTask(ctx context.Context, id int) <-chan string {
result := make(chan string, 1)
go func() {
defer close(result)
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
result <- fmt.Sprintf("Task %d cancelled at step %d", id, i)
return
default:
time.Sleep(200 * time.Millisecond)
fmt.Printf("Task %d: Step %d completed\n", id, i)
}
}
result <- fmt.Sprintf("Task %d completed successfully", id)
}()
return result
}
func cancellationExample() {
fmt.Println("\n=== Cancellation Example ===")
ctx, cancel := context.WithCancel(context.Background())
// 启动任务
result := cancellableTask(ctx, 1)
// 1.5 秒后取消任务
go func() {
time.Sleep(1500 * time.Millisecond)
fmt.Println("Cancelling task...")
cancel()
}()
// 等待结果
fmt.Printf("Final result: %s\n", <-result)
}
func timeoutExample() {
timeoutWithSelect()
timeoutWithContext()
cancellationExample()
}
并发模式的选择指南 #
何时使用共享内存 #
- 需要高性能的数据访问
- 数据结构复杂,难以通过消息传递
- 读多写少的场景
何时使用消息传递 #
- 需要清晰的数据流向
- 避免复杂的锁机制
- 分布式系统中的组件通信
模式选择建议 #
package main
import (
"fmt"
"sync"
"time"
)
// 场景1:简单的计数器 - 使用共享内存
type SimpleCounter struct {
mu sync.RWMutex
count int
}
func (c *SimpleCounter) Increment() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}
func (c *SimpleCounter) Value() int {
c.mu.RLock()
defer c.mu.RUnlock()
return c.count
}
// 场景2:数据处理管道 - 使用消息传递
func processingPipeline() {
input := make(chan int, 10)
output := make(chan string, 10)
// 数据处理阶段
go func() {
defer close(output)
for num := range input {
processed := fmt.Sprintf("Processed: %d", num*2)
output <- processed
}
}()
// 发送数据
go func() {
defer close(input)
for i := 1; i <= 5; i++ {
input <- i
}
}()
// 接收结果
for result := range output {
fmt.Println(result)
}
}
func patternSelectionExample() {
fmt.Println("=== Pattern Selection Examples ===")
// 使用共享内存的计数器
counter := &SimpleCounter{}
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Printf("Counter value: %d\n", counter.Value())
// 使用消息传递的处理管道
fmt.Println("\nProcessing pipeline:")
processingPipeline()
}
小结 #
在本节中,我们学习了:
- 并发模型:共享内存模型 vs CSP 模型
- 基本模式:生产者-消费者、扇入、扇出、管道、工作池
- 超时控制:使用 select 和 context 实现超时和取消
- 模式选择:根据场景选择合适的并发模式
这些并发模式是构建复杂并发系统的基础,掌握它们将帮助您设计出更加健壮和高效的并发程序。
练习题 #
- 实现一个支持优先级的工作池,高优先级任务优先处理
- 设计一个数据处理管道,包含数据验证、转换和存储三个阶段
- 创建一个并发的网页爬虫,使用工作池模式处理 URL 队列,并实现去重功能
主函数示例 #
func main() {
fmt.Println("=== Concurrency Patterns Examples ===\n")
fmt.Println("1. Shared Memory Example:")
sharedMemoryExample()
fmt.Println("\n2. Message Passing Example:")
messagePassingExample()
fmt.Println("\n3. Producer-Consumer Example:")
producerConsumerExample()
fmt.Println("\n4. Fan-in Example:")
fanInExample()
fmt.Println("\n5. Fan-out Example:")
fanOutExample()
fmt.Println("\n6. Pipeline Example:")
pipelineExample()
fmt.Println("\n7. Worker Pool Example:")
workerPoolExample()
fmt.Println("\n8. Timeout Example:")
timeoutExample()
fmt.Println("\n9. Pattern Selection Example:")
patternSelectionExample()
}