2.4.3 Pipeline 模式

2.4.3 Pipeline 模式 #

Pipeline(流水线)模式是一种将复杂的数据处理任务分解为多个阶段的并发设计模式。每个阶段专注于特定的处理逻辑,数据在各个阶段之间流动,形成一个处理链。这种模式在数据处理、图像处理、编译器设计等领域应用广泛。

基本概念 #

Pipeline 模式的核心特点:

  1. 阶段化处理:将复杂任务分解为多个简单的处理阶段
  2. 数据流动:数据在各个阶段之间通过 channel 传递
  3. 并行执行:各个阶段可以并行处理不同的数据
  4. 解耦合:各阶段相互独立,便于维护和扩展

基础 Pipeline 实现 #

简单的数据处理 Pipeline #

package main

import (
    "fmt"
    "strconv"
    "strings"
    "time"
)

// 阶段1:数据生成
func generateNumbers(count int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)
        for i := 1; i <= count; i++ {
            out <- i
            time.Sleep(time.Millisecond * 100) // 模拟数据生成间隔
        }
        fmt.Println("数据生成完成")
    }()

    return out
}

// 阶段2:数据转换(平方)
func square(input <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)
        for num := range input {
            result := num * num
            fmt.Printf("平方处理: %d -> %d\n", num, result)
            out <- result
            time.Sleep(time.Millisecond * 50) // 模拟处理时间
        }
        fmt.Println("平方处理完成")
    }()

    return out
}

// 阶段3:数据过滤(只保留偶数)
func filterEven(input <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)
        for num := range input {
            if num%2 == 0 {
                fmt.Printf("过滤保留: %d\n", num)
                out <- num
            } else {
                fmt.Printf("过滤丢弃: %d\n", num)
            }
            time.Sleep(time.Millisecond * 30) // 模拟处理时间
        }
        fmt.Println("过滤处理完成")
    }()

    return out
}

// 阶段4:格式化输出
func formatOutput(input <-chan int) <-chan string {
    out := make(chan string)

    go func() {
        defer close(out)
        for num := range input {
            result := fmt.Sprintf("结果: %d", num)
            fmt.Printf("格式化: %d -> %s\n", num, result)
            out <- result
            time.Sleep(time.Millisecond * 20) // 模拟处理时间
        }
        fmt.Println("格式化处理完成")
    }()

    return out
}

func main() {
    // 构建Pipeline
    numbers := generateNumbers(10)
    squared := square(numbers)
    filtered := filterEven(squared)
    formatted := formatOutput(filtered)

    // 消费最终结果
    fmt.Println("=== 最终结果 ===")
    for result := range formatted {
        fmt.Println("最终输出:", result)
    }

    fmt.Println("Pipeline处理完成")
}

通用 Pipeline 框架 #

package main

import (
    "context"
    "fmt"
    "reflect"
    "sync"
    "time"
)

// Stage 表示Pipeline中的一个阶段
type Stage interface {
    Process(ctx context.Context, input interface{}) (interface{}, error)
    Name() string
}

// Pipeline 流水线结构
type Pipeline struct {
    stages []Stage
    ctx    context.Context
    cancel context.CancelFunc
}

// NewPipeline 创建新的Pipeline
func NewPipeline(stages ...Stage) *Pipeline {
    ctx, cancel := context.WithCancel(context.Background())
    return &Pipeline{
        stages: stages,
        ctx:    ctx,
        cancel: cancel,
    }
}

// Execute 执行Pipeline
func (p *Pipeline) Execute(input <-chan interface{}) <-chan PipelineResult {
    output := make(chan PipelineResult)

    go func() {
        defer close(output)

        current := input
        for i, stage := range p.stages {
            next := make(chan interface{})
            go p.runStage(stage, current, next, i)
            current = next
        }

        // 收集最终结果
        for result := range current {
            output <- PipelineResult{
                Data:  result,
                Error: nil,
            }
        }
    }()

    return output
}

// PipelineResult Pipeline执行结果
type PipelineResult struct {
    Data  interface{}
    Error error
}

// runStage 运行单个阶段
func (p *Pipeline) runStage(stage Stage, input <-chan interface{}, output chan<- interface{}, stageIndex int) {
    defer close(output)

    for data := range input {
        select {
        case <-p.ctx.Done():
            return
        default:
            result, err := stage.Process(p.ctx, data)
            if err != nil {
                fmt.Printf("阶段 %s 处理错误: %v\n", stage.Name(), err)
                continue
            }

            if result != nil {
                output <- result
            }
        }
    }

    fmt.Printf("阶段 %s 完成\n", stage.Name())
}

// Cancel 取消Pipeline执行
func (p *Pipeline) Cancel() {
    p.cancel()
}

// 示例阶段实现

// NumberGeneratorStage 数字生成阶段
type NumberGeneratorStage struct {
    count int
}

func (ngs *NumberGeneratorStage) Process(ctx context.Context, input interface{}) (interface{}, error) {
    // 这个阶段不需要输入,直接生成数据
    return input, nil
}

func (ngs *NumberGeneratorStage) Name() string {
    return "NumberGenerator"
}

// MultiplyStage 乘法阶段
type MultiplyStage struct {
    factor int
}

func (ms *MultiplyStage) Process(ctx context.Context, input interface{}) (interface{}, error) {
    num, ok := input.(int)
    if !ok {
        return nil, fmt.Errorf("期望int类型,得到%T", input)
    }

    result := num * ms.factor
    fmt.Printf("乘法阶段: %d * %d = %d\n", num, ms.factor, result)

    // 模拟处理时间
    time.Sleep(time.Millisecond * 50)

    return result, nil
}

func (ms *MultiplyStage) Name() string {
    return fmt.Sprintf("Multiply(%d)", ms.factor)
}

// FilterStage 过滤阶段
type FilterStage struct {
    predicate func(interface{}) bool
}

func (fs *FilterStage) Process(ctx context.Context, input interface{}) (interface{}, error) {
    if fs.predicate(input) {
        fmt.Printf("过滤阶段: 保留 %v\n", input)
        return input, nil
    }

    fmt.Printf("过滤阶段: 丢弃 %v\n", input)
    return nil, nil // 返回nil表示过滤掉
}

func (fs *FilterStage) Name() string {
    return "Filter"
}

// StringFormatStage 字符串格式化阶段
type StringFormatStage struct {
    format string
}

func (sfs *StringFormatStage) Process(ctx context.Context, input interface{}) (interface{}, error) {
    result := fmt.Sprintf(sfs.format, input)
    fmt.Printf("格式化阶段: %v -> %s\n", input, result)

    time.Sleep(time.Millisecond * 30)

    return result, nil
}

func (sfs *StringFormatStage) Name() string {
    return "StringFormat"
}

func main() {
    // 创建输入数据
    input := make(chan interface{}, 10)
    go func() {
        defer close(input)
        for i := 1; i <= 10; i++ {
            input <- i
        }
    }()

    // 创建Pipeline阶段
    stages := []Stage{
        &MultiplyStage{factor: 2},
        &FilterStage{predicate: func(v interface{}) bool {
            if num, ok := v.(int); ok {
                return num > 10
            }
            return false
        }},
        &StringFormatStage{format: "结果: %v"},
    }

    // 创建并执行Pipeline
    pipeline := NewPipeline(stages...)
    results := pipeline.Execute(input)

    // 消费结果
    fmt.Println("=== Pipeline执行结果 ===")
    for result := range results {
        if result.Error != nil {
            fmt.Printf("错误: %v\n", result.Error)
        } else {
            fmt.Printf("最终结果: %v\n", result.Data)
        }
    }
}

并行 Pipeline #

每个阶段并行处理 #

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// ParallelStage 并行处理阶段
type ParallelStage struct {
    name        string
    workerCount int
    processor   func(interface{}) (interface{}, error)
}

func NewParallelStage(name string, workerCount int, processor func(interface{}) (interface{}, error)) *ParallelStage {
    return &ParallelStage{
        name:        name,
        workerCount: workerCount,
        processor:   processor,
    }
}

// Process 并行处理
func (ps *ParallelStage) Process(ctx context.Context, input <-chan interface{}) <-chan interface{} {
    output := make(chan interface{})

    var wg sync.WaitGroup

    // 启动多个worker
    for i := 0; i < ps.workerCount; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()

            for data := range input {
                select {
                case <-ctx.Done():
                    return
                default:
                    result, err := ps.processor(data)
                    if err != nil {
                        fmt.Printf("Worker %d 处理错误: %v\n", workerID, err)
                        continue
                    }

                    if result != nil {
                        fmt.Printf("Worker %d 处理: %v -> %v\n", workerID, data, result)
                        output <- result
                    }
                }
            }
        }(i)
    }

    // 等待所有worker完成后关闭输出
    go func() {
        wg.Wait()
        close(output)
        fmt.Printf("阶段 %s 完成\n", ps.name)
    }()

    return output
}

func (ps *ParallelStage) Name() string {
    return ps.name
}

// ParallelPipeline 并行Pipeline
type ParallelPipeline struct {
    stages []ParallelStage
    ctx    context.Context
    cancel context.CancelFunc
}

func NewParallelPipeline() *ParallelPipeline {
    ctx, cancel := context.WithCancel(context.Background())
    return &ParallelPipeline{
        ctx:    ctx,
        cancel: cancel,
    }
}

// AddStage 添加阶段
func (pp *ParallelPipeline) AddStage(stage ParallelStage) {
    pp.stages = append(pp.stages, stage)
}

// Execute 执行并行Pipeline
func (pp *ParallelPipeline) Execute(input <-chan interface{}) <-chan interface{} {
    current := input

    for _, stage := range pp.stages {
        current = stage.Process(pp.ctx, current)
    }

    return current
}

// Cancel 取消执行
func (pp *ParallelPipeline) Cancel() {
    pp.cancel()
}

func main() {
    // 创建输入数据
    input := make(chan interface{}, 20)
    go func() {
        defer close(input)
        for i := 1; i <= 20; i++ {
            input <- i
            time.Sleep(time.Millisecond * 50)
        }
    }()

    // 创建并行Pipeline
    pipeline := NewParallelPipeline()

    // 添加阶段1:平方计算(3个worker)
    pipeline.AddStage(*NewParallelStage("Square", 3, func(data interface{}) (interface{}, error) {
        if num, ok := data.(int); ok {
            time.Sleep(time.Millisecond * 100) // 模拟计算时间
            return num * num, nil
        }
        return nil, fmt.Errorf("无效数据类型")
    }))

    // 添加阶段2:过滤(2个worker)
    pipeline.AddStage(*NewParallelStage("Filter", 2, func(data interface{}) (interface{}, error) {
        if num, ok := data.(int); ok {
            time.Sleep(time.Millisecond * 50) // 模拟处理时间
            if num > 50 {
                return num, nil
            }
            return nil, nil // 过滤掉
        }
        return nil, fmt.Errorf("无效数据类型")
    }))

    // 添加阶段3:格式化(4个worker)
    pipeline.AddStage(*NewParallelStage("Format", 4, func(data interface{}) (interface{}, error) {
        if num, ok := data.(int); ok {
            time.Sleep(time.Millisecond * 30) // 模拟处理时间
            return fmt.Sprintf("结果: %d", num), nil
        }
        return nil, fmt.Errorf("无效数据类型")
    }))

    // 执行Pipeline
    results := pipeline.Execute(input)

    // 消费结果
    fmt.Println("=== 并行Pipeline执行结果 ===")
    resultCount := 0
    for result := range results {
        resultCount++
        fmt.Printf("最终结果 %d: %v\n", resultCount, result)
    }

    fmt.Printf("总共处理了 %d 个结果\n", resultCount)
}

带缓冲的 Pipeline #

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// BufferedPipeline 带缓冲的Pipeline
type BufferedPipeline struct {
    stages     []BufferedStage
    bufferSize int
    ctx        context.Context
    cancel     context.CancelFunc
}

// BufferedStage 带缓冲的阶段
type BufferedStage struct {
    name       string
    processor  func(interface{}) (interface{}, error)
    bufferSize int
}

func NewBufferedPipeline(bufferSize int) *BufferedPipeline {
    ctx, cancel := context.WithCancel(context.Background())
    return &BufferedPipeline{
        bufferSize: bufferSize,
        ctx:        ctx,
        cancel:     cancel,
    }
}

func (bp *BufferedPipeline) AddStage(name string, processor func(interface{}) (interface{}, error)) {
    stage := BufferedStage{
        name:       name,
        processor:  processor,
        bufferSize: bp.bufferSize,
    }
    bp.stages = append(bp.stages, stage)
}

// Execute 执行带缓冲的Pipeline
func (bp *BufferedPipeline) Execute(input <-chan interface{}) <-chan interface{} {
    current := input

    for i, stage := range bp.stages {
        next := make(chan interface{}, stage.bufferSize)
        go bp.runBufferedStage(stage, current, next, i)
        current = next
    }

    return current
}

// runBufferedStage 运行带缓冲的阶段
func (bp *BufferedPipeline) runBufferedStage(stage BufferedStage, input <-chan interface{}, output chan<- interface{}, stageIndex int) {
    defer close(output)

    processed := 0
    startTime := time.Now()

    for data := range input {
        select {
        case <-bp.ctx.Done():
            return
        default:
            result, err := stage.processor(data)
            if err != nil {
                fmt.Printf("阶段 %s 处理错误: %v\n", stage.name, err)
                continue
            }

            if result != nil {
                output <- result
                processed++

                // 每处理100个数据输出一次统计
                if processed%100 == 0 {
                    elapsed := time.Since(startTime)
                    rate := float64(processed) / elapsed.Seconds()
                    fmt.Printf("阶段 %s: 已处理 %d 个数据,处理速率: %.2f/秒\n",
                        stage.name, processed, rate)
                }
            }
        }
    }

    elapsed := time.Since(startTime)
    rate := float64(processed) / elapsed.Seconds()
    fmt.Printf("阶段 %s 完成: 总共处理 %d 个数据,平均速率: %.2f/秒\n",
        stage.name, processed, rate)
}

// Cancel 取消执行
func (bp *BufferedPipeline) Cancel() {
    bp.cancel()
}

// 性能监控
type PipelineMonitor struct {
    pipeline   *BufferedPipeline
    mu         sync.RWMutex
    stageStats map[string]*StageStats
}

type StageStats struct {
    Processed   int64
    Errors      int64
    StartTime   time.Time
    LastUpdate  time.Time
}

func NewPipelineMonitor(pipeline *BufferedPipeline) *PipelineMonitor {
    return &PipelineMonitor{
        pipeline:   pipeline,
        stageStats: make(map[string]*StageStats),
    }
}

func (pm *PipelineMonitor) StartMonitoring() {
    ticker := time.NewTicker(time.Second * 5)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            pm.printStats()
        case <-pm.pipeline.ctx.Done():
            return
        }
    }
}

func (pm *PipelineMonitor) printStats() {
    pm.mu.RLock()
    defer pm.mu.RUnlock()

    fmt.Println("=== Pipeline性能统计 ===")
    for stageName, stats := range pm.stageStats {
        elapsed := time.Since(stats.StartTime)
        rate := float64(stats.Processed) / elapsed.Seconds()
        fmt.Printf("阶段 %s: 处理 %d, 错误 %d, 速率 %.2f/秒\n",
            stageName, stats.Processed, stats.Errors, rate)
    }
    fmt.Println()
}

func main() {
    // 创建大量输入数据
    input := make(chan interface{}, 1000)
    go func() {
        defer close(input)
        for i := 1; i <= 1000; i++ {
            input <- i
        }
    }()

    // 创建带缓冲的Pipeline
    pipeline := NewBufferedPipeline(100) // 缓冲区大小为100

    // 添加处理阶段
    pipeline.AddStage("Multiply", func(data interface{}) (interface{}, error) {
        if num, ok := data.(int); ok {
            time.Sleep(time.Microsecond * 100) // 模拟计算时间
            return num * 2, nil
        }
        return nil, fmt.Errorf("无效数据类型")
    })

    pipeline.AddStage("Filter", func(data interface{}) (interface{}, error) {
        if num, ok := data.(int); ok {
            time.Sleep(time.Microsecond * 50) // 模拟处理时间
            if num%10 == 0 {
                return num, nil
            }
            return nil, nil // 过滤掉
        }
        return nil, fmt.Errorf("无效数据类型")
    })

    pipeline.AddStage("Format", func(data interface{}) (interface{}, error) {
        if num, ok := data.(int); ok {
            time.Sleep(time.Microsecond * 200) // 模拟格式化时间
            return fmt.Sprintf("结果: %d", num), nil
        }
        return nil, fmt.Errorf("无效数据类型")
    })

    // 执行Pipeline
    results := pipeline.Execute(input)

    // 消费结果
    fmt.Println("开始处理数据...")
    startTime := time.Now()
    resultCount := 0

    for result := range results {
        resultCount++
        if resultCount%20 == 0 {
            fmt.Printf("已收到 %d 个结果: %v\n", resultCount, result)
        }
    }

    elapsed := time.Since(startTime)
    fmt.Printf("Pipeline完成: 总共处理 %d 个结果,总耗时: %v\n", resultCount, elapsed)
}

实际应用场景 #

1. 日志处理 Pipeline #

// LogProcessingPipeline 日志处理流水线
type LogProcessingPipeline struct {
    parser    *LogParserStage
    filter    *LogFilterStage
    enricher  *LogEnricherStage
    formatter *LogFormatterStage
}

type LogEntry struct {
    Timestamp time.Time
    Level     string
    Message   string
    Source    string
    Metadata  map[string]interface{}
}

// 实现各个处理阶段...

2. 图像处理 Pipeline #

// ImageProcessingPipeline 图像处理流水线
type ImageProcessingPipeline struct {
    loader    *ImageLoaderStage
    resizer   *ImageResizerStage
    filter    *ImageFilterStage
    compressor *ImageCompressorStage
    saver     *ImageSaverStage
}

// 实现各个处理阶段...

性能优化技巧 #

1. 动态调整缓冲区大小 #

func (bp *BufferedPipeline) adjustBufferSize(stageName string, queueLength int) {
    // 根据队列长度动态调整缓冲区大小
    if queueLength > 80 {
        // 增加缓冲区大小
    } else if queueLength < 20 {
        // 减少缓冲区大小
    }
}

2. 背压处理 #

func (bp *BufferedPipeline) handleBackpressure(stage BufferedStage, input <-chan interface{}, output chan<- interface{}) {
    for data := range input {
        select {
        case output <- data:
            // 正常发送
        default:
            // 输出缓冲区满,实施背压策略
            fmt.Printf("阶段 %s 出现背压\n", stage.name)
            // 可以选择丢弃数据、等待或者通知上游减速
        }
    }
}

Pipeline 模式是构建高效数据处理系统的重要工具,通过合理的设计和优化,可以充分利用系统资源,提高处理效率。在实际应用中,要根据数据特点和处理需求选择合适的 Pipeline 架构,并注意性能监控和优化。