2.1.3 并发模型与模式

2.1.3 并发模型与模式 #

并发编程模型概述 #

在并发编程中,不同的编程语言采用不同的并发模型来处理并发任务。理解这些模型有助于我们更好地使用 Go 语言的并发特性。

主要并发模型 #

1. 共享内存模型 #

传统的并发模型,多个线程共享同一块内存空间,通过锁机制来保证数据一致性:

package main

import (
    "fmt"
    "sync"
    "time"
)

// 共享内存模型示例
type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func sharedMemoryExample() {
    counter := &Counter{}
    var wg sync.WaitGroup

    // 启动多个 Goroutine 并发修改共享变量
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 1000; j++ {
                counter.Increment()
            }
            fmt.Printf("Goroutine %d completed\n", id)
        }(i)
    }

    wg.Wait()
    fmt.Printf("Final counter value: %d\n", counter.Value())
}

2. 消息传递模型(CSP) #

Go 语言采用的主要模型,基于 Tony Hoare 的 CSP(Communicating Sequential Processes)理论:

package main

import (
    "fmt"
    "time"
)

// CSP 模型示例
func messagePassingExample() {
    // 创建用于通信的 channel
    jobs := make(chan int, 10)
    results := make(chan int, 10)

    // 启动 worker goroutines
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    // 发送任务
    for j := 1; j <= 9; j++ {
        jobs <- j
    }
    close(jobs)

    // 收集结果
    for r := 1; r <= 9; r++ {
        result := <-results
        fmt.Printf("Result: %d\n", result)
    }
}

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        time.Sleep(100 * time.Millisecond) // 模拟工作
        results <- job * 2
    }
}

常见并发模式 #

1. 生产者-消费者模式 #

这是最基本的并发模式之一,生产者生成数据,消费者处理数据:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// 数据结构
type Task struct {
    ID   int
    Data string
}

// 生产者
func producer(id int, tasks chan<- Task, wg *sync.WaitGroup) {
    defer wg.Done()

    for i := 0; i < 5; i++ {
        task := Task{
            ID:   id*100 + i,
            Data: fmt.Sprintf("Data from producer %d, task %d", id, i),
        }

        fmt.Printf("Producer %d: Creating task %d\n", id, task.ID)
        tasks <- task

        // 随机延迟
        time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
    }

    fmt.Printf("Producer %d: Finished\n", id)
}

// 消费者
func consumer(id int, tasks <-chan Task, wg *sync.WaitGroup) {
    defer wg.Done()

    for task := range tasks {
        fmt.Printf("Consumer %d: Processing task %d - %s\n", id, task.ID, task.Data)

        // 模拟处理时间
        time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)

        fmt.Printf("Consumer %d: Completed task %d\n", id, task.ID)
    }

    fmt.Printf("Consumer %d: Finished\n", id)
}

func producerConsumerExample() {
    tasks := make(chan Task, 10) // 带缓冲的 channel
    var producerWg, consumerWg sync.WaitGroup

    // 启动生产者
    numProducers := 2
    for i := 0; i < numProducers; i++ {
        producerWg.Add(1)
        go producer(i, tasks, &producerWg)
    }

    // 启动消费者
    numConsumers := 3
    for i := 0; i < numConsumers; i++ {
        consumerWg.Add(1)
        go consumer(i, tasks, &consumerWg)
    }

    // 等待所有生产者完成,然后关闭 channel
    go func() {
        producerWg.Wait()
        close(tasks)
    }()

    // 等待所有消费者完成
    consumerWg.Wait()
    fmt.Println("All producers and consumers finished")
}

2. 扇入模式(Fan-in) #

将多个输入源的数据合并到一个输出通道:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// 数据源
func dataSource(name string, output chan<- string) {
    defer close(output)

    for i := 0; i < 5; i++ {
        data := fmt.Sprintf("%s-data-%d", name, i)
        fmt.Printf("Source %s: Generating %s\n", name, data)
        output <- data
        time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
    }

    fmt.Printf("Source %s: Finished\n", name)
}

// 扇入函数
func fanIn(inputs ...<-chan string) <-chan string {
    output := make(chan string)
    var wg sync.WaitGroup

    // 为每个输入 channel 启动一个 goroutine
    for i, input := range inputs {
        wg.Add(1)
        go func(id int, ch <-chan string) {
            defer wg.Done()
            for data := range ch {
                output <- fmt.Sprintf("[Source-%d] %s", id, data)
            }
        }(i, input)
    }

    // 等待所有输入完成后关闭输出 channel
    go func() {
        wg.Wait()
        close(output)
    }()

    return output
}

func fanInExample() {
    // 创建多个数据源
    source1 := make(chan string)
    source2 := make(chan string)
    source3 := make(chan string)

    // 启动数据源
    go dataSource("A", source1)
    go dataSource("B", source2)
    go dataSource("C", source3)

    // 扇入合并
    merged := fanIn(source1, source2, source3)

    // 处理合并后的数据
    fmt.Println("=== Merged Data ===")
    for data := range merged {
        fmt.Printf("Received: %s\n", data)
    }

    fmt.Println("Fan-in example completed")
}

3. 扇出模式(Fan-out) #

将一个输入源的数据分发到多个处理器:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// 任务类型
type Job struct {
    ID       int
    Payload  string
    Duration time.Duration
}

// 任务生成器
func jobGenerator(jobs chan<- Job) {
    defer close(jobs)

    for i := 0; i < 20; i++ {
        job := Job{
            ID:       i,
            Payload:  fmt.Sprintf("Job-%d-payload", i),
            Duration: time.Duration(rand.Intn(500)) * time.Millisecond,
        }

        fmt.Printf("Generated job %d\n", job.ID)
        jobs <- job
        time.Sleep(50 * time.Millisecond)
    }

    fmt.Println("Job generator finished")
}

// 工作处理器
func worker(id int, jobs <-chan Job, results chan<- string, wg *sync.WaitGroup) {
    defer wg.Done()

    for job := range jobs {
        fmt.Printf("Worker %d: Starting job %d\n", id, job.ID)

        // 模拟工作
        time.Sleep(job.Duration)

        result := fmt.Sprintf("Worker-%d processed job-%d (%s)", id, job.ID, job.Payload)
        results <- result

        fmt.Printf("Worker %d: Completed job %d\n", id, job.ID)
    }

    fmt.Printf("Worker %d: Finished\n", id)
}

func fanOutExample() {
    jobs := make(chan Job, 10)
    results := make(chan string, 20)

    // 启动任务生成器
    go jobGenerator(jobs)

    // 启动多个工作处理器(扇出)
    var wg sync.WaitGroup
    numWorkers := 4

    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }

    // 等待所有工作完成后关闭结果 channel
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集结果
    fmt.Println("\n=== Results ===")
    for result := range results {
        fmt.Printf("Result: %s\n", result)
    }

    fmt.Println("Fan-out example completed")
}

4. 管道模式(Pipeline) #

将数据处理分解为多个阶段,每个阶段由不同的 Goroutine 处理:

package main

import (
    "fmt"
    "strconv"
    "strings"
    "time"
)

// 管道阶段1:数据生成
func generateNumbers(nums chan<- int) {
    defer close(nums)

    for i := 1; i <= 10; i++ {
        fmt.Printf("Stage 1: Generating %d\n", i)
        nums <- i
        time.Sleep(100 * time.Millisecond)
    }

    fmt.Println("Stage 1: Number generation completed")
}

// 管道阶段2:数据转换
func squareNumbers(nums <-chan int, squares chan<- int) {
    defer close(squares)

    for num := range nums {
        squared := num * num
        fmt.Printf("Stage 2: %d squared = %d\n", num, squared)
        squares <- squared
        time.Sleep(50 * time.Millisecond)
    }

    fmt.Println("Stage 2: Number squaring completed")
}

// 管道阶段3:数据格式化
func formatNumbers(squares <-chan int, formatted chan<- string) {
    defer close(formatted)

    for square := range squares {
        result := fmt.Sprintf("Number: %s", strconv.Itoa(square))
        fmt.Printf("Stage 3: Formatting %d -> %s\n", square, result)
        formatted <- result
        time.Sleep(30 * time.Millisecond)
    }

    fmt.Println("Stage 3: Number formatting completed")
}

// 管道阶段4:数据输出
func printResults(formatted <-chan string) {
    fmt.Println("\n=== Final Results ===")
    for result := range formatted {
        fmt.Printf("Output: %s\n", strings.ToUpper(result))
        time.Sleep(20 * time.Millisecond)
    }

    fmt.Println("Stage 4: Output completed")
}

func pipelineExample() {
    // 创建管道 channels
    nums := make(chan int)
    squares := make(chan int)
    formatted := make(chan string)

    // 启动管道各阶段
    go generateNumbers(nums)
    go squareNumbers(nums, squares)
    go formatNumbers(squares, formatted)

    // 处理最终结果
    printResults(formatted)

    fmt.Println("Pipeline example completed")
}

5. 工作池模式(Worker Pool) #

创建固定数量的工作 Goroutine 来处理任务队列:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// 工作任务
type WorkItem struct {
    ID   int
    Data string
}

// 工作结果
type WorkResult struct {
    Item   WorkItem
    Result string
    Worker int
}

// 工作池
type WorkerPool struct {
    workerCount int
    jobQueue    chan WorkItem
    resultQueue chan WorkResult
    wg          sync.WaitGroup
}

// 创建工作池
func NewWorkerPool(workerCount, jobQueueSize, resultQueueSize int) *WorkerPool {
    return &WorkerPool{
        workerCount: workerCount,
        jobQueue:    make(chan WorkItem, jobQueueSize),
        resultQueue: make(chan WorkResult, resultQueueSize),
    }
}

// 启动工作池
func (wp *WorkerPool) Start() {
    for i := 0; i < wp.workerCount; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }
}

// 工作者
func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()

    fmt.Printf("Worker %d: Started\n", id)

    for job := range wp.jobQueue {
        fmt.Printf("Worker %d: Processing job %d\n", id, job.ID)

        // 模拟工作
        processingTime := time.Duration(rand.Intn(500)) * time.Millisecond
        time.Sleep(processingTime)

        result := WorkResult{
            Item:   job,
            Result: fmt.Sprintf("Processed: %s (took %v)", job.Data, processingTime),
            Worker: id,
        }

        wp.resultQueue <- result
        fmt.Printf("Worker %d: Completed job %d\n", id, job.ID)
    }

    fmt.Printf("Worker %d: Finished\n", id)
}

// 提交任务
func (wp *WorkerPool) Submit(item WorkItem) {
    wp.jobQueue <- item
}

// 关闭任务队列
func (wp *WorkerPool) Close() {
    close(wp.jobQueue)
}

// 等待所有工作完成
func (wp *WorkerPool) Wait() {
    wp.wg.Wait()
    close(wp.resultQueue)
}

// 获取结果
func (wp *WorkerPool) Results() <-chan WorkResult {
    return wp.resultQueue
}

func workerPoolExample() {
    // 创建工作池
    pool := NewWorkerPool(3, 10, 20)

    // 启动工作池
    pool.Start()

    // 提交任务
    go func() {
        for i := 0; i < 15; i++ {
            item := WorkItem{
                ID:   i,
                Data: fmt.Sprintf("Task-%d-data", i),
            }
            fmt.Printf("Submitting job %d\n", item.ID)
            pool.Submit(item)
        }
        pool.Close()
    }()

    // 收集结果
    go func() {
        pool.Wait()
    }()

    // 处理结果
    fmt.Println("\n=== Processing Results ===")
    for result := range pool.Results() {
        fmt.Printf("Result from Worker %d: Job %d - %s\n",
            result.Worker, result.Item.ID, result.Result)
    }

    fmt.Println("Worker pool example completed")
}

6. 超时模式 #

使用 select 语句实现超时控制:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "time"
)

// 模拟可能超时的操作
func slowOperation(id int) <-chan string {
    result := make(chan string, 1)

    go func() {
        // 随机延迟,模拟不确定的处理时间
        delay := time.Duration(rand.Intn(2000)) * time.Millisecond
        time.Sleep(delay)

        result <- fmt.Sprintf("Operation %d completed after %v", id, delay)
    }()

    return result
}

// 使用 select 实现超时
func timeoutWithSelect() {
    fmt.Println("=== Timeout with Select ===")

    for i := 0; i < 5; i++ {
        fmt.Printf("Starting operation %d...\n", i)

        select {
        case result := <-slowOperation(i):
            fmt.Printf("Success: %s\n", result)
        case <-time.After(1 * time.Second):
            fmt.Printf("Timeout: Operation %d took too long\n", i)
        }
    }
}

// 使用 context 实现超时
func timeoutWithContext() {
    fmt.Println("\n=== Timeout with Context ===")

    for i := 0; i < 5; i++ {
        ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)

        fmt.Printf("Starting operation %d with context...\n", i)

        select {
        case result := <-slowOperation(i):
            fmt.Printf("Success: %s\n", result)
        case <-ctx.Done():
            fmt.Printf("Context timeout: Operation %d cancelled\n", i)
        }

        cancel() // 清理资源
    }
}

// 可取消的长时间运行任务
func cancellableTask(ctx context.Context, id int) <-chan string {
    result := make(chan string, 1)

    go func() {
        defer close(result)

        for i := 0; i < 10; i++ {
            select {
            case <-ctx.Done():
                result <- fmt.Sprintf("Task %d cancelled at step %d", id, i)
                return
            default:
                time.Sleep(200 * time.Millisecond)
                fmt.Printf("Task %d: Step %d completed\n", id, i)
            }
        }

        result <- fmt.Sprintf("Task %d completed successfully", id)
    }()

    return result
}

func cancellationExample() {
    fmt.Println("\n=== Cancellation Example ===")

    ctx, cancel := context.WithCancel(context.Background())

    // 启动任务
    result := cancellableTask(ctx, 1)

    // 1.5 秒后取消任务
    go func() {
        time.Sleep(1500 * time.Millisecond)
        fmt.Println("Cancelling task...")
        cancel()
    }()

    // 等待结果
    fmt.Printf("Final result: %s\n", <-result)
}

func timeoutExample() {
    timeoutWithSelect()
    timeoutWithContext()
    cancellationExample()
}

并发模式的选择指南 #

何时使用共享内存 #

  • 需要高性能的数据访问
  • 数据结构复杂,难以通过消息传递
  • 读多写少的场景

何时使用消息传递 #

  • 需要清晰的数据流向
  • 避免复杂的锁机制
  • 分布式系统中的组件通信

模式选择建议 #

package main

import (
    "fmt"
    "sync"
    "time"
)

// 场景1:简单的计数器 - 使用共享内存
type SimpleCounter struct {
    mu    sync.RWMutex
    count int
}

func (c *SimpleCounter) Increment() {
    c.mu.Lock()
    c.count++
    c.mu.Unlock()
}

func (c *SimpleCounter) Value() int {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.count
}

// 场景2:数据处理管道 - 使用消息传递
func processingPipeline() {
    input := make(chan int, 10)
    output := make(chan string, 10)

    // 数据处理阶段
    go func() {
        defer close(output)
        for num := range input {
            processed := fmt.Sprintf("Processed: %d", num*2)
            output <- processed
        }
    }()

    // 发送数据
    go func() {
        defer close(input)
        for i := 1; i <= 5; i++ {
            input <- i
        }
    }()

    // 接收结果
    for result := range output {
        fmt.Println(result)
    }
}

func patternSelectionExample() {
    fmt.Println("=== Pattern Selection Examples ===")

    // 使用共享内存的计数器
    counter := &SimpleCounter{}
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }

    wg.Wait()
    fmt.Printf("Counter value: %d\n", counter.Value())

    // 使用消息传递的处理管道
    fmt.Println("\nProcessing pipeline:")
    processingPipeline()
}

小结 #

在本节中,我们学习了:

  1. 并发模型:共享内存模型 vs CSP 模型
  2. 基本模式:生产者-消费者、扇入、扇出、管道、工作池
  3. 超时控制:使用 select 和 context 实现超时和取消
  4. 模式选择:根据场景选择合适的并发模式

这些并发模式是构建复杂并发系统的基础,掌握它们将帮助您设计出更加健壮和高效的并发程序。

练习题 #

  1. 实现一个支持优先级的工作池,高优先级任务优先处理
  2. 设计一个数据处理管道,包含数据验证、转换和存储三个阶段
  3. 创建一个并发的网页爬虫,使用工作池模式处理 URL 队列,并实现去重功能

主函数示例 #

func main() {
    fmt.Println("=== Concurrency Patterns Examples ===\n")

    fmt.Println("1. Shared Memory Example:")
    sharedMemoryExample()

    fmt.Println("\n2. Message Passing Example:")
    messagePassingExample()

    fmt.Println("\n3. Producer-Consumer Example:")
    producerConsumerExample()

    fmt.Println("\n4. Fan-in Example:")
    fanInExample()

    fmt.Println("\n5. Fan-out Example:")
    fanOutExample()

    fmt.Println("\n6. Pipeline Example:")
    pipelineExample()

    fmt.Println("\n7. Worker Pool Example:")
    workerPoolExample()

    fmt.Println("\n8. Timeout Example:")
    timeoutExample()

    fmt.Println("\n9. Pattern Selection Example:")
    patternSelectionExample()
}