2.4.3 Pipeline 模式 #
Pipeline(流水线)模式是一种将复杂的数据处理任务分解为多个阶段的并发设计模式。每个阶段专注于特定的处理逻辑,数据在各个阶段之间流动,形成一个处理链。这种模式在数据处理、图像处理、编译器设计等领域应用广泛。
基本概念 #
Pipeline 模式的核心特点:
- 阶段化处理:将复杂任务分解为多个简单的处理阶段
- 数据流动:数据在各个阶段之间通过 channel 传递
- 并行执行:各个阶段可以并行处理不同的数据
- 解耦合:各阶段相互独立,便于维护和扩展
基础 Pipeline 实现 #
简单的数据处理 Pipeline #
package main
import (
"fmt"
"strconv"
"strings"
"time"
)
// 阶段1:数据生成
func generateNumbers(count int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= count; i++ {
out <- i
time.Sleep(time.Millisecond * 100) // 模拟数据生成间隔
}
fmt.Println("数据生成完成")
}()
return out
}
// 阶段2:数据转换(平方)
func square(input <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for num := range input {
result := num * num
fmt.Printf("平方处理: %d -> %d\n", num, result)
out <- result
time.Sleep(time.Millisecond * 50) // 模拟处理时间
}
fmt.Println("平方处理完成")
}()
return out
}
// 阶段3:数据过滤(只保留偶数)
func filterEven(input <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for num := range input {
if num%2 == 0 {
fmt.Printf("过滤保留: %d\n", num)
out <- num
} else {
fmt.Printf("过滤丢弃: %d\n", num)
}
time.Sleep(time.Millisecond * 30) // 模拟处理时间
}
fmt.Println("过滤处理完成")
}()
return out
}
// 阶段4:格式化输出
func formatOutput(input <-chan int) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for num := range input {
result := fmt.Sprintf("结果: %d", num)
fmt.Printf("格式化: %d -> %s\n", num, result)
out <- result
time.Sleep(time.Millisecond * 20) // 模拟处理时间
}
fmt.Println("格式化处理完成")
}()
return out
}
func main() {
// 构建Pipeline
numbers := generateNumbers(10)
squared := square(numbers)
filtered := filterEven(squared)
formatted := formatOutput(filtered)
// 消费最终结果
fmt.Println("=== 最终结果 ===")
for result := range formatted {
fmt.Println("最终输出:", result)
}
fmt.Println("Pipeline处理完成")
}
通用 Pipeline 框架 #
package main
import (
"context"
"fmt"
"reflect"
"sync"
"time"
)
// Stage 表示Pipeline中的一个阶段
type Stage interface {
Process(ctx context.Context, input interface{}) (interface{}, error)
Name() string
}
// Pipeline 流水线结构
type Pipeline struct {
stages []Stage
ctx context.Context
cancel context.CancelFunc
}
// NewPipeline 创建新的Pipeline
func NewPipeline(stages ...Stage) *Pipeline {
ctx, cancel := context.WithCancel(context.Background())
return &Pipeline{
stages: stages,
ctx: ctx,
cancel: cancel,
}
}
// Execute 执行Pipeline
func (p *Pipeline) Execute(input <-chan interface{}) <-chan PipelineResult {
output := make(chan PipelineResult)
go func() {
defer close(output)
current := input
for i, stage := range p.stages {
next := make(chan interface{})
go p.runStage(stage, current, next, i)
current = next
}
// 收集最终结果
for result := range current {
output <- PipelineResult{
Data: result,
Error: nil,
}
}
}()
return output
}
// PipelineResult Pipeline执行结果
type PipelineResult struct {
Data interface{}
Error error
}
// runStage 运行单个阶段
func (p *Pipeline) runStage(stage Stage, input <-chan interface{}, output chan<- interface{}, stageIndex int) {
defer close(output)
for data := range input {
select {
case <-p.ctx.Done():
return
default:
result, err := stage.Process(p.ctx, data)
if err != nil {
fmt.Printf("阶段 %s 处理错误: %v\n", stage.Name(), err)
continue
}
if result != nil {
output <- result
}
}
}
fmt.Printf("阶段 %s 完成\n", stage.Name())
}
// Cancel 取消Pipeline执行
func (p *Pipeline) Cancel() {
p.cancel()
}
// 示例阶段实现
// NumberGeneratorStage 数字生成阶段
type NumberGeneratorStage struct {
count int
}
func (ngs *NumberGeneratorStage) Process(ctx context.Context, input interface{}) (interface{}, error) {
// 这个阶段不需要输入,直接生成数据
return input, nil
}
func (ngs *NumberGeneratorStage) Name() string {
return "NumberGenerator"
}
// MultiplyStage 乘法阶段
type MultiplyStage struct {
factor int
}
func (ms *MultiplyStage) Process(ctx context.Context, input interface{}) (interface{}, error) {
num, ok := input.(int)
if !ok {
return nil, fmt.Errorf("期望int类型,得到%T", input)
}
result := num * ms.factor
fmt.Printf("乘法阶段: %d * %d = %d\n", num, ms.factor, result)
// 模拟处理时间
time.Sleep(time.Millisecond * 50)
return result, nil
}
func (ms *MultiplyStage) Name() string {
return fmt.Sprintf("Multiply(%d)", ms.factor)
}
// FilterStage 过滤阶段
type FilterStage struct {
predicate func(interface{}) bool
}
func (fs *FilterStage) Process(ctx context.Context, input interface{}) (interface{}, error) {
if fs.predicate(input) {
fmt.Printf("过滤阶段: 保留 %v\n", input)
return input, nil
}
fmt.Printf("过滤阶段: 丢弃 %v\n", input)
return nil, nil // 返回nil表示过滤掉
}
func (fs *FilterStage) Name() string {
return "Filter"
}
// StringFormatStage 字符串格式化阶段
type StringFormatStage struct {
format string
}
func (sfs *StringFormatStage) Process(ctx context.Context, input interface{}) (interface{}, error) {
result := fmt.Sprintf(sfs.format, input)
fmt.Printf("格式化阶段: %v -> %s\n", input, result)
time.Sleep(time.Millisecond * 30)
return result, nil
}
func (sfs *StringFormatStage) Name() string {
return "StringFormat"
}
func main() {
// 创建输入数据
input := make(chan interface{}, 10)
go func() {
defer close(input)
for i := 1; i <= 10; i++ {
input <- i
}
}()
// 创建Pipeline阶段
stages := []Stage{
&MultiplyStage{factor: 2},
&FilterStage{predicate: func(v interface{}) bool {
if num, ok := v.(int); ok {
return num > 10
}
return false
}},
&StringFormatStage{format: "结果: %v"},
}
// 创建并执行Pipeline
pipeline := NewPipeline(stages...)
results := pipeline.Execute(input)
// 消费结果
fmt.Println("=== Pipeline执行结果 ===")
for result := range results {
if result.Error != nil {
fmt.Printf("错误: %v\n", result.Error)
} else {
fmt.Printf("最终结果: %v\n", result.Data)
}
}
}
并行 Pipeline #
每个阶段并行处理 #
package main
import (
"context"
"fmt"
"sync"
"time"
)
// ParallelStage 并行处理阶段
type ParallelStage struct {
name string
workerCount int
processor func(interface{}) (interface{}, error)
}
func NewParallelStage(name string, workerCount int, processor func(interface{}) (interface{}, error)) *ParallelStage {
return &ParallelStage{
name: name,
workerCount: workerCount,
processor: processor,
}
}
// Process 并行处理
func (ps *ParallelStage) Process(ctx context.Context, input <-chan interface{}) <-chan interface{} {
output := make(chan interface{})
var wg sync.WaitGroup
// 启动多个worker
for i := 0; i < ps.workerCount; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for data := range input {
select {
case <-ctx.Done():
return
default:
result, err := ps.processor(data)
if err != nil {
fmt.Printf("Worker %d 处理错误: %v\n", workerID, err)
continue
}
if result != nil {
fmt.Printf("Worker %d 处理: %v -> %v\n", workerID, data, result)
output <- result
}
}
}
}(i)
}
// 等待所有worker完成后关闭输出
go func() {
wg.Wait()
close(output)
fmt.Printf("阶段 %s 完成\n", ps.name)
}()
return output
}
func (ps *ParallelStage) Name() string {
return ps.name
}
// ParallelPipeline 并行Pipeline
type ParallelPipeline struct {
stages []ParallelStage
ctx context.Context
cancel context.CancelFunc
}
func NewParallelPipeline() *ParallelPipeline {
ctx, cancel := context.WithCancel(context.Background())
return &ParallelPipeline{
ctx: ctx,
cancel: cancel,
}
}
// AddStage 添加阶段
func (pp *ParallelPipeline) AddStage(stage ParallelStage) {
pp.stages = append(pp.stages, stage)
}
// Execute 执行并行Pipeline
func (pp *ParallelPipeline) Execute(input <-chan interface{}) <-chan interface{} {
current := input
for _, stage := range pp.stages {
current = stage.Process(pp.ctx, current)
}
return current
}
// Cancel 取消执行
func (pp *ParallelPipeline) Cancel() {
pp.cancel()
}
func main() {
// 创建输入数据
input := make(chan interface{}, 20)
go func() {
defer close(input)
for i := 1; i <= 20; i++ {
input <- i
time.Sleep(time.Millisecond * 50)
}
}()
// 创建并行Pipeline
pipeline := NewParallelPipeline()
// 添加阶段1:平方计算(3个worker)
pipeline.AddStage(*NewParallelStage("Square", 3, func(data interface{}) (interface{}, error) {
if num, ok := data.(int); ok {
time.Sleep(time.Millisecond * 100) // 模拟计算时间
return num * num, nil
}
return nil, fmt.Errorf("无效数据类型")
}))
// 添加阶段2:过滤(2个worker)
pipeline.AddStage(*NewParallelStage("Filter", 2, func(data interface{}) (interface{}, error) {
if num, ok := data.(int); ok {
time.Sleep(time.Millisecond * 50) // 模拟处理时间
if num > 50 {
return num, nil
}
return nil, nil // 过滤掉
}
return nil, fmt.Errorf("无效数据类型")
}))
// 添加阶段3:格式化(4个worker)
pipeline.AddStage(*NewParallelStage("Format", 4, func(data interface{}) (interface{}, error) {
if num, ok := data.(int); ok {
time.Sleep(time.Millisecond * 30) // 模拟处理时间
return fmt.Sprintf("结果: %d", num), nil
}
return nil, fmt.Errorf("无效数据类型")
}))
// 执行Pipeline
results := pipeline.Execute(input)
// 消费结果
fmt.Println("=== 并行Pipeline执行结果 ===")
resultCount := 0
for result := range results {
resultCount++
fmt.Printf("最终结果 %d: %v\n", resultCount, result)
}
fmt.Printf("总共处理了 %d 个结果\n", resultCount)
}
带缓冲的 Pipeline #
package main
import (
"context"
"fmt"
"sync"
"time"
)
// BufferedPipeline 带缓冲的Pipeline
type BufferedPipeline struct {
stages []BufferedStage
bufferSize int
ctx context.Context
cancel context.CancelFunc
}
// BufferedStage 带缓冲的阶段
type BufferedStage struct {
name string
processor func(interface{}) (interface{}, error)
bufferSize int
}
func NewBufferedPipeline(bufferSize int) *BufferedPipeline {
ctx, cancel := context.WithCancel(context.Background())
return &BufferedPipeline{
bufferSize: bufferSize,
ctx: ctx,
cancel: cancel,
}
}
func (bp *BufferedPipeline) AddStage(name string, processor func(interface{}) (interface{}, error)) {
stage := BufferedStage{
name: name,
processor: processor,
bufferSize: bp.bufferSize,
}
bp.stages = append(bp.stages, stage)
}
// Execute 执行带缓冲的Pipeline
func (bp *BufferedPipeline) Execute(input <-chan interface{}) <-chan interface{} {
current := input
for i, stage := range bp.stages {
next := make(chan interface{}, stage.bufferSize)
go bp.runBufferedStage(stage, current, next, i)
current = next
}
return current
}
// runBufferedStage 运行带缓冲的阶段
func (bp *BufferedPipeline) runBufferedStage(stage BufferedStage, input <-chan interface{}, output chan<- interface{}, stageIndex int) {
defer close(output)
processed := 0
startTime := time.Now()
for data := range input {
select {
case <-bp.ctx.Done():
return
default:
result, err := stage.processor(data)
if err != nil {
fmt.Printf("阶段 %s 处理错误: %v\n", stage.name, err)
continue
}
if result != nil {
output <- result
processed++
// 每处理100个数据输出一次统计
if processed%100 == 0 {
elapsed := time.Since(startTime)
rate := float64(processed) / elapsed.Seconds()
fmt.Printf("阶段 %s: 已处理 %d 个数据,处理速率: %.2f/秒\n",
stage.name, processed, rate)
}
}
}
}
elapsed := time.Since(startTime)
rate := float64(processed) / elapsed.Seconds()
fmt.Printf("阶段 %s 完成: 总共处理 %d 个数据,平均速率: %.2f/秒\n",
stage.name, processed, rate)
}
// Cancel 取消执行
func (bp *BufferedPipeline) Cancel() {
bp.cancel()
}
// 性能监控
type PipelineMonitor struct {
pipeline *BufferedPipeline
mu sync.RWMutex
stageStats map[string]*StageStats
}
type StageStats struct {
Processed int64
Errors int64
StartTime time.Time
LastUpdate time.Time
}
func NewPipelineMonitor(pipeline *BufferedPipeline) *PipelineMonitor {
return &PipelineMonitor{
pipeline: pipeline,
stageStats: make(map[string]*StageStats),
}
}
func (pm *PipelineMonitor) StartMonitoring() {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pm.printStats()
case <-pm.pipeline.ctx.Done():
return
}
}
}
func (pm *PipelineMonitor) printStats() {
pm.mu.RLock()
defer pm.mu.RUnlock()
fmt.Println("=== Pipeline性能统计 ===")
for stageName, stats := range pm.stageStats {
elapsed := time.Since(stats.StartTime)
rate := float64(stats.Processed) / elapsed.Seconds()
fmt.Printf("阶段 %s: 处理 %d, 错误 %d, 速率 %.2f/秒\n",
stageName, stats.Processed, stats.Errors, rate)
}
fmt.Println()
}
func main() {
// 创建大量输入数据
input := make(chan interface{}, 1000)
go func() {
defer close(input)
for i := 1; i <= 1000; i++ {
input <- i
}
}()
// 创建带缓冲的Pipeline
pipeline := NewBufferedPipeline(100) // 缓冲区大小为100
// 添加处理阶段
pipeline.AddStage("Multiply", func(data interface{}) (interface{}, error) {
if num, ok := data.(int); ok {
time.Sleep(time.Microsecond * 100) // 模拟计算时间
return num * 2, nil
}
return nil, fmt.Errorf("无效数据类型")
})
pipeline.AddStage("Filter", func(data interface{}) (interface{}, error) {
if num, ok := data.(int); ok {
time.Sleep(time.Microsecond * 50) // 模拟处理时间
if num%10 == 0 {
return num, nil
}
return nil, nil // 过滤掉
}
return nil, fmt.Errorf("无效数据类型")
})
pipeline.AddStage("Format", func(data interface{}) (interface{}, error) {
if num, ok := data.(int); ok {
time.Sleep(time.Microsecond * 200) // 模拟格式化时间
return fmt.Sprintf("结果: %d", num), nil
}
return nil, fmt.Errorf("无效数据类型")
})
// 执行Pipeline
results := pipeline.Execute(input)
// 消费结果
fmt.Println("开始处理数据...")
startTime := time.Now()
resultCount := 0
for result := range results {
resultCount++
if resultCount%20 == 0 {
fmt.Printf("已收到 %d 个结果: %v\n", resultCount, result)
}
}
elapsed := time.Since(startTime)
fmt.Printf("Pipeline完成: 总共处理 %d 个结果,总耗时: %v\n", resultCount, elapsed)
}
实际应用场景 #
1. 日志处理 Pipeline #
// LogProcessingPipeline 日志处理流水线
type LogProcessingPipeline struct {
parser *LogParserStage
filter *LogFilterStage
enricher *LogEnricherStage
formatter *LogFormatterStage
}
type LogEntry struct {
Timestamp time.Time
Level string
Message string
Source string
Metadata map[string]interface{}
}
// 实现各个处理阶段...
2. 图像处理 Pipeline #
// ImageProcessingPipeline 图像处理流水线
type ImageProcessingPipeline struct {
loader *ImageLoaderStage
resizer *ImageResizerStage
filter *ImageFilterStage
compressor *ImageCompressorStage
saver *ImageSaverStage
}
// 实现各个处理阶段...
性能优化技巧 #
1. 动态调整缓冲区大小 #
func (bp *BufferedPipeline) adjustBufferSize(stageName string, queueLength int) {
// 根据队列长度动态调整缓冲区大小
if queueLength > 80 {
// 增加缓冲区大小
} else if queueLength < 20 {
// 减少缓冲区大小
}
}
2. 背压处理 #
func (bp *BufferedPipeline) handleBackpressure(stage BufferedStage, input <-chan interface{}, output chan<- interface{}) {
for data := range input {
select {
case output <- data:
// 正常发送
default:
// 输出缓冲区满,实施背压策略
fmt.Printf("阶段 %s 出现背压\n", stage.name)
// 可以选择丢弃数据、等待或者通知上游减速
}
}
}
Pipeline 模式是构建高效数据处理系统的重要工具,通过合理的设计和优化,可以充分利用系统资源,提高处理效率。在实际应用中,要根据数据特点和处理需求选择合适的 Pipeline 架构,并注意性能监控和优化。