2.4.2 Fan-in/Fan-out 模式

2.4.2 Fan-in/Fan-out 模式 #

Fan-in/Fan-out 是并发编程中的重要模式,用于处理数据流的分发和聚合。Fan-out 将一个数据源分发给多个处理者,而 Fan-in 则将多个数据源的结果聚合到一个输出中。这种模式在数据处理、负载均衡和并行计算中应用广泛。

基本概念 #

Fan-out(扇出) #

Fan-out 模式将单个输入源的数据分发给多个处理者并行处理,提高处理效率。

Fan-in(扇入) #

Fan-in 模式将多个输入源的数据聚合到单个输出中,便于统一处理结果。

Fan-out 模式实现 #

基础 Fan-out 实现 #

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// 数据生成器
func dataGenerator(count int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)
        for i := 1; i <= count; i++ {
            out <- i
            time.Sleep(time.Millisecond * 100) // 模拟数据生成间隔
        }
    }()

    return out
}

// 数据处理器
func processor(id int, input <-chan int) <-chan string {
    out := make(chan string)

    go func() {
        defer close(out)
        for data := range input {
            // 模拟处理时间
            processingTime := time.Duration(rand.Intn(500)) * time.Millisecond
            time.Sleep(processingTime)

            result := fmt.Sprintf("处理器%d处理数据%d", id, data)
            out <- result
            fmt.Printf("处理器%d完成数据%d的处理\n", id, data)
        }
    }()

    return out
}

// Fan-out:将输入分发给多个处理器
func fanOut(input <-chan int, numProcessors int) []<-chan string {
    outputs := make([]<-chan string, numProcessors)

    for i := 0; i < numProcessors; i++ {
        outputs[i] = processor(i+1, input)
    }

    return outputs
}

func main() {
    rand.Seed(time.Now().UnixNano())

    // 生成数据
    data := dataGenerator(10)

    // Fan-out:分发给3个处理器
    processors := fanOut(data, 3)

    // 收集所有处理器的结果
    var wg sync.WaitGroup
    for i, proc := range processors {
        wg.Add(1)
        go func(id int, processor <-chan string) {
            defer wg.Done()
            for result := range processor {
                fmt.Printf("从处理器%d收到结果: %s\n", id+1, result)
            }
        }(i, proc)
    }

    wg.Wait()
    fmt.Println("所有处理完成")
}

负载均衡的 Fan-out #

package main

import (
    "fmt"
    "sync"
    "time"
)

// Task 任务结构
type Task struct {
    ID   int
    Data string
}

// Result 结果结构
type Result struct {
    TaskID int
    Output string
    Worker int
}

// LoadBalancer 负载均衡器
type LoadBalancer struct {
    workers   []chan Task
    results   chan Result
    wg        sync.WaitGroup
    workerWg  sync.WaitGroup
}

// NewLoadBalancer 创建负载均衡器
func NewLoadBalancer(numWorkers int) *LoadBalancer {
    lb := &LoadBalancer{
        workers: make([]chan Task, numWorkers),
        results: make(chan Result, 100),
    }

    // 创建worker
    for i := 0; i < numWorkers; i++ {
        lb.workers[i] = make(chan Task, 10)
        lb.workerWg.Add(1)
        go lb.worker(i+1, lb.workers[i])
    }

    return lb
}

// worker 工作者
func (lb *LoadBalancer) worker(id int, tasks <-chan Task) {
    defer lb.workerWg.Done()

    for task := range tasks {
        // 模拟处理时间
        time.Sleep(time.Millisecond * 200)

        result := Result{
            TaskID: task.ID,
            Output: fmt.Sprintf("Worker%d处理了任务%d: %s", id, task.ID, task.Data),
            Worker: id,
        }

        lb.results <- result
    }
}

// Distribute 分发任务(轮询方式)
func (lb *LoadBalancer) Distribute(tasks <-chan Task) {
    go func() {
        workerIndex := 0
        for task := range tasks {
            // 轮询分发
            lb.workers[workerIndex] <- task
            workerIndex = (workerIndex + 1) % len(lb.workers)
        }

        // 关闭所有worker channel
        for _, worker := range lb.workers {
            close(worker)
        }
    }()
}

// Results 获取结果channel
func (lb *LoadBalancer) Results() <-chan Result {
    return lb.results
}

// Close 关闭负载均衡器
func (lb *LoadBalancer) Close() {
    lb.workerWg.Wait()
    close(lb.results)
}

func main() {
    // 创建任务
    tasks := make(chan Task, 20)
    go func() {
        defer close(tasks)
        for i := 1; i <= 15; i++ {
            tasks <- Task{
                ID:   i,
                Data: fmt.Sprintf("任务数据%d", i),
            }
        }
    }()

    // 创建负载均衡器
    lb := NewLoadBalancer(3)

    // 分发任务
    lb.Distribute(tasks)

    // 收集结果
    go func() {
        time.Sleep(time.Second * 5) // 5秒后关闭
        lb.Close()
    }()

    workerStats := make(map[int]int)
    for result := range lb.Results() {
        fmt.Println(result.Output)
        workerStats[result.Worker]++
    }

    // 显示工作负载统计
    fmt.Println("\n工作负载统计:")
    for worker, count := range workerStats {
        fmt.Printf("Worker%d 处理了 %d 个任务\n", worker, count)
    }
}

Fan-in 模式实现 #

基础 Fan-in 实现 #

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// 数据生产者
func producer(id int, count int) <-chan string {
    out := make(chan string)

    go func() {
        defer close(out)
        for i := 1; i <= count; i++ {
            // 模拟不同的生产速度
            delay := time.Duration(rand.Intn(300)) * time.Millisecond
            time.Sleep(delay)

            data := fmt.Sprintf("生产者%d-数据%d", id, i)
            out <- data
            fmt.Printf("生产者%d生产了: %s\n", id, data)
        }
        fmt.Printf("生产者%d完成生产\n", id)
    }()

    return out
}

// Fan-in:合并多个输入到单个输出
func fanIn(inputs ...<-chan string) <-chan string {
    out := make(chan string)
    var wg sync.WaitGroup

    // 为每个输入启动一个goroutine
    for i, input := range inputs {
        wg.Add(1)
        go func(id int, ch <-chan string) {
            defer wg.Done()
            for data := range ch {
                out <- data
            }
            fmt.Printf("输入通道%d已关闭\n", id+1)
        }(i, input)
    }

    // 等待所有输入完成后关闭输出
    go func() {
        wg.Wait()
        close(out)
        fmt.Println("所有输入通道已关闭,输出通道关闭")
    }()

    return out
}

func main() {
    rand.Seed(time.Now().UnixNano())

    // 创建多个生产者
    producer1 := producer(1, 5)
    producer2 := producer(2, 4)
    producer3 := producer(3, 6)

    // Fan-in:合并所有生产者的输出
    merged := fanIn(producer1, producer2, producer3)

    // 消费合并后的数据
    fmt.Println("开始消费合并后的数据:")
    for data := range merged {
        fmt.Printf("消费者收到: %s\n", data)
    }

    fmt.Println("所有数据消费完成")
}

带优先级的 Fan-in #

package main

import (
    "container/heap"
    "fmt"
    "sync"
    "time"
)

// PriorityData 带优先级的数据
type PriorityData struct {
    Data     string
    Priority int
    Source   int
}

// PriorityQueue 优先级队列
type PriorityQueue []*PriorityData

func (pq PriorityQueue) Len() int { return len(pq) }

func (pq PriorityQueue) Less(i, j int) bool {
    // 优先级高的排在前面
    return pq[i].Priority > pq[j].Priority
}

func (pq PriorityQueue) Swap(i, j int) {
    pq[i], pq[j] = pq[j], pq[i]
}

func (pq *PriorityQueue) Push(x interface{}) {
    *pq = append(*pq, x.(*PriorityData))
}

func (pq *PriorityQueue) Pop() interface{} {
    old := *pq
    n := len(old)
    item := old[n-1]
    *pq = old[0 : n-1]
    return item
}

// PriorityFanIn 带优先级的扇入
type PriorityFanIn struct {
    inputs   []<-chan *PriorityData
    output   chan *PriorityData
    pq       *PriorityQueue
    mu       sync.Mutex
    wg       sync.WaitGroup
    done     chan struct{}
}

// NewPriorityFanIn 创建优先级扇入
func NewPriorityFanIn(inputs ...<-chan *PriorityData) *PriorityFanIn {
    pfi := &PriorityFanIn{
        inputs: inputs,
        output: make(chan *PriorityData),
        pq:     &PriorityQueue{},
        done:   make(chan struct{}),
    }

    heap.Init(pfi.pq)
    return pfi
}

// Start 启动优先级扇入
func (pfi *PriorityFanIn) Start() <-chan *PriorityData {
    // 为每个输入启动goroutine
    for i, input := range pfi.inputs {
        pfi.wg.Add(1)
        go pfi.readInput(i, input)
    }

    // 启动输出goroutine
    go pfi.writeOutput()

    // 等待所有输入完成
    go func() {
        pfi.wg.Wait()
        close(pfi.done)
    }()

    return pfi.output
}

// readInput 读取输入数据
func (pfi *PriorityFanIn) readInput(source int, input <-chan *PriorityData) {
    defer pfi.wg.Done()

    for data := range input {
        data.Source = source
        pfi.mu.Lock()
        heap.Push(pfi.pq, data)
        pfi.mu.Unlock()
    }
}

// writeOutput 输出数据
func (pfi *PriorityFanIn) writeOutput() {
    defer close(pfi.output)

    ticker := time.NewTicker(time.Millisecond * 100)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            pfi.mu.Lock()
            if pfi.pq.Len() > 0 {
                data := heap.Pop(pfi.pq).(*PriorityData)
                pfi.mu.Unlock()
                pfi.output <- data
            } else {
                pfi.mu.Unlock()
            }

        case <-pfi.done:
            // 输出剩余数据
            pfi.mu.Lock()
            for pfi.pq.Len() > 0 {
                data := heap.Pop(pfi.pq).(*PriorityData)
                pfi.mu.Unlock()
                pfi.output <- data
                pfi.mu.Lock()
            }
            pfi.mu.Unlock()
            return
        }
    }
}

// 优先级数据生产者
func priorityProducer(id int, priorities []int) <-chan *PriorityData {
    out := make(chan *PriorityData)

    go func() {
        defer close(out)
        for i, priority := range priorities {
            time.Sleep(time.Millisecond * 200)
            data := &PriorityData{
                Data:     fmt.Sprintf("生产者%d-数据%d", id, i+1),
                Priority: priority,
            }
            out <- data
            fmt.Printf("生产者%d生产了优先级%d的数据: %s\n", id, priority, data.Data)
        }
    }()

    return out
}

func main() {
    // 创建不同优先级的生产者
    producer1 := priorityProducer(1, []int{1, 3, 5, 2})
    producer2 := priorityProducer(2, []int{4, 2, 6, 1})
    producer3 := priorityProducer(3, []int{3, 5, 1, 4})

    // 创建优先级扇入
    pfi := NewPriorityFanIn(producer1, producer2, producer3)
    output := pfi.Start()

    // 消费按优先级排序的数据
    fmt.Println("\n按优先级消费数据:")
    for data := range output {
        fmt.Printf("消费者收到: %s (优先级: %d, 来源: %d)\n",
            data.Data, data.Priority, data.Source)
    }

    fmt.Println("所有数据消费完成")
}

组合 Fan-out 和 Fan-in #

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Pipeline 数据处理管道
type Pipeline struct {
    input       <-chan int
    fanOutCount int
    fanInOutput chan string
}

// NewPipeline 创建新的管道
func NewPipeline(input <-chan int, fanOutCount int) *Pipeline {
    return &Pipeline{
        input:       input,
        fanOutCount: fanOutCount,
        fanInOutput: make(chan string, 100),
    }
}

// processor 数据处理器
func (p *Pipeline) processor(id int, input <-chan int, output chan<- string) {
    defer func() {
        fmt.Printf("处理器%d退出\n", id)
    }()

    for data := range input {
        // 模拟处理时间
        processingTime := time.Duration(rand.Intn(500)) * time.Millisecond
        time.Sleep(processingTime)

        result := fmt.Sprintf("处理器%d处理数据%d得到结果%d", id, data, data*data)
        output <- result
    }
}

// Start 启动管道
func (p *Pipeline) Start() <-chan string {
    // Fan-out: 创建多个处理器
    processors := make([]chan int, p.fanOutCount)
    outputs := make([]<-chan string, p.fanOutCount)

    for i := 0; i < p.fanOutCount; i++ {
        processors[i] = make(chan int, 10)
        output := make(chan string, 10)
        outputs[i] = output

        go p.processor(i+1, processors[i], output)
    }

    // 分发输入数据到处理器(轮询)
    go func() {
        defer func() {
            for _, proc := range processors {
                close(proc)
            }
        }()

        index := 0
        for data := range p.input {
            processors[index] <- data
            index = (index + 1) % len(processors)
        }
    }()

    // Fan-in: 合并所有处理器的输出
    go func() {
        defer close(p.fanInOutput)

        var wg sync.WaitGroup
        for i, output := range outputs {
            wg.Add(1)
            go func(id int, ch <-chan string) {
                defer wg.Done()
                for result := range ch {
                    p.fanInOutput <- result
                }
            }(i+1, output)
        }

        wg.Wait()
    }()

    return p.fanInOutput
}

// 数据生成器
func dataGenerator(count int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)
        for i := 1; i <= count; i++ {
            out <- i
            time.Sleep(time.Millisecond * 50)
        }
        fmt.Println("数据生成完成")
    }()

    return out
}

func main() {
    rand.Seed(time.Now().UnixNano())

    // 生成数据
    data := dataGenerator(20)

    // 创建处理管道
    pipeline := NewPipeline(data, 3)
    results := pipeline.Start()

    // 消费结果
    fmt.Println("开始消费处理结果:")
    resultCount := 0
    for result := range results {
        resultCount++
        fmt.Printf("结果%d: %s\n", resultCount, result)
    }

    fmt.Printf("总共处理了%d个结果\n", resultCount)
}

实际应用场景 #

1. 日志处理系统 #

// LogProcessor 日志处理系统
type LogProcessor struct {
    parsers   []chan string
    analyzers []chan LogEntry
    output    chan AnalysisResult
}

type LogEntry struct {
    Timestamp time.Time
    Level     string
    Message   string
    Source    string
}

type AnalysisResult struct {
    Type    string
    Count   int
    Details map[string]interface{}
}

// 实现日志的Fan-out解析和Fan-in分析

2. 图像处理管道 #

// ImageProcessor 图像处理管道
type ImageProcessor struct {
    resizers    []chan ImageTask
    compressors []chan ImageTask
    output      chan ProcessedImage
}

// 实现图像的并行处理和结果合并

性能优化建议 #

1. 缓冲区大小优化 #

// 根据处理速度差异调整缓冲区大小
func optimizedFanOut(input <-chan int, numProcessors int) []<-chan string {
    outputs := make([]<-chan string, numProcessors)

    for i := 0; i < numProcessors; i++ {
        // 根据处理器性能调整缓冲区大小
        bufferSize := 10 * (i + 1) // 示例:不同处理器不同缓冲区
        output := make(chan string, bufferSize)
        outputs[i] = output

        go func(id int, out chan<- string) {
            defer close(out)
            // 处理逻辑...
        }(i, output)
    }

    return outputs
}

2. 动态负载均衡 #

// 基于队列长度的动态负载均衡
func dynamicLoadBalance(task Task, workers []chan Task) {
    minQueueLen := len(workers[0])
    selectedWorker := 0

    for i, worker := range workers {
        if len(worker) < minQueueLen {
            minQueueLen = len(worker)
            selectedWorker = i
        }
    }

    workers[selectedWorker] <- task
}

Fan-in/Fan-out 模式是构建高效并发系统的重要工具,通过合理的设计和实现,可以显著提高系统的处理能力和资源利用率。在实际应用中,要根据具体的业务需求选择合适的实现策略,并注意性能优化和资源管理。