2.4.2 Fan-in/Fan-out 模式 #
Fan-in/Fan-out 是并发编程中的重要模式,用于处理数据流的分发和聚合。Fan-out 将一个数据源分发给多个处理者,而 Fan-in 则将多个数据源的结果聚合到一个输出中。这种模式在数据处理、负载均衡和并行计算中应用广泛。
基本概念 #
Fan-out(扇出) #
Fan-out 模式将单个输入源的数据分发给多个处理者并行处理,提高处理效率。
Fan-in(扇入) #
Fan-in 模式将多个输入源的数据聚合到单个输出中,便于统一处理结果。
Fan-out 模式实现 #
基础 Fan-out 实现 #
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 数据生成器
func dataGenerator(count int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= count; i++ {
out <- i
time.Sleep(time.Millisecond * 100) // 模拟数据生成间隔
}
}()
return out
}
// 数据处理器
func processor(id int, input <-chan int) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for data := range input {
// 模拟处理时间
processingTime := time.Duration(rand.Intn(500)) * time.Millisecond
time.Sleep(processingTime)
result := fmt.Sprintf("处理器%d处理数据%d", id, data)
out <- result
fmt.Printf("处理器%d完成数据%d的处理\n", id, data)
}
}()
return out
}
// Fan-out:将输入分发给多个处理器
func fanOut(input <-chan int, numProcessors int) []<-chan string {
outputs := make([]<-chan string, numProcessors)
for i := 0; i < numProcessors; i++ {
outputs[i] = processor(i+1, input)
}
return outputs
}
func main() {
rand.Seed(time.Now().UnixNano())
// 生成数据
data := dataGenerator(10)
// Fan-out:分发给3个处理器
processors := fanOut(data, 3)
// 收集所有处理器的结果
var wg sync.WaitGroup
for i, proc := range processors {
wg.Add(1)
go func(id int, processor <-chan string) {
defer wg.Done()
for result := range processor {
fmt.Printf("从处理器%d收到结果: %s\n", id+1, result)
}
}(i, proc)
}
wg.Wait()
fmt.Println("所有处理完成")
}
负载均衡的 Fan-out #
package main
import (
"fmt"
"sync"
"time"
)
// Task 任务结构
type Task struct {
ID int
Data string
}
// Result 结果结构
type Result struct {
TaskID int
Output string
Worker int
}
// LoadBalancer 负载均衡器
type LoadBalancer struct {
workers []chan Task
results chan Result
wg sync.WaitGroup
workerWg sync.WaitGroup
}
// NewLoadBalancer 创建负载均衡器
func NewLoadBalancer(numWorkers int) *LoadBalancer {
lb := &LoadBalancer{
workers: make([]chan Task, numWorkers),
results: make(chan Result, 100),
}
// 创建worker
for i := 0; i < numWorkers; i++ {
lb.workers[i] = make(chan Task, 10)
lb.workerWg.Add(1)
go lb.worker(i+1, lb.workers[i])
}
return lb
}
// worker 工作者
func (lb *LoadBalancer) worker(id int, tasks <-chan Task) {
defer lb.workerWg.Done()
for task := range tasks {
// 模拟处理时间
time.Sleep(time.Millisecond * 200)
result := Result{
TaskID: task.ID,
Output: fmt.Sprintf("Worker%d处理了任务%d: %s", id, task.ID, task.Data),
Worker: id,
}
lb.results <- result
}
}
// Distribute 分发任务(轮询方式)
func (lb *LoadBalancer) Distribute(tasks <-chan Task) {
go func() {
workerIndex := 0
for task := range tasks {
// 轮询分发
lb.workers[workerIndex] <- task
workerIndex = (workerIndex + 1) % len(lb.workers)
}
// 关闭所有worker channel
for _, worker := range lb.workers {
close(worker)
}
}()
}
// Results 获取结果channel
func (lb *LoadBalancer) Results() <-chan Result {
return lb.results
}
// Close 关闭负载均衡器
func (lb *LoadBalancer) Close() {
lb.workerWg.Wait()
close(lb.results)
}
func main() {
// 创建任务
tasks := make(chan Task, 20)
go func() {
defer close(tasks)
for i := 1; i <= 15; i++ {
tasks <- Task{
ID: i,
Data: fmt.Sprintf("任务数据%d", i),
}
}
}()
// 创建负载均衡器
lb := NewLoadBalancer(3)
// 分发任务
lb.Distribute(tasks)
// 收集结果
go func() {
time.Sleep(time.Second * 5) // 5秒后关闭
lb.Close()
}()
workerStats := make(map[int]int)
for result := range lb.Results() {
fmt.Println(result.Output)
workerStats[result.Worker]++
}
// 显示工作负载统计
fmt.Println("\n工作负载统计:")
for worker, count := range workerStats {
fmt.Printf("Worker%d 处理了 %d 个任务\n", worker, count)
}
}
Fan-in 模式实现 #
基础 Fan-in 实现 #
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 数据生产者
func producer(id int, count int) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for i := 1; i <= count; i++ {
// 模拟不同的生产速度
delay := time.Duration(rand.Intn(300)) * time.Millisecond
time.Sleep(delay)
data := fmt.Sprintf("生产者%d-数据%d", id, i)
out <- data
fmt.Printf("生产者%d生产了: %s\n", id, data)
}
fmt.Printf("生产者%d完成生产\n", id)
}()
return out
}
// Fan-in:合并多个输入到单个输出
func fanIn(inputs ...<-chan string) <-chan string {
out := make(chan string)
var wg sync.WaitGroup
// 为每个输入启动一个goroutine
for i, input := range inputs {
wg.Add(1)
go func(id int, ch <-chan string) {
defer wg.Done()
for data := range ch {
out <- data
}
fmt.Printf("输入通道%d已关闭\n", id+1)
}(i, input)
}
// 等待所有输入完成后关闭输出
go func() {
wg.Wait()
close(out)
fmt.Println("所有输入通道已关闭,输出通道关闭")
}()
return out
}
func main() {
rand.Seed(time.Now().UnixNano())
// 创建多个生产者
producer1 := producer(1, 5)
producer2 := producer(2, 4)
producer3 := producer(3, 6)
// Fan-in:合并所有生产者的输出
merged := fanIn(producer1, producer2, producer3)
// 消费合并后的数据
fmt.Println("开始消费合并后的数据:")
for data := range merged {
fmt.Printf("消费者收到: %s\n", data)
}
fmt.Println("所有数据消费完成")
}
带优先级的 Fan-in #
package main
import (
"container/heap"
"fmt"
"sync"
"time"
)
// PriorityData 带优先级的数据
type PriorityData struct {
Data string
Priority int
Source int
}
// PriorityQueue 优先级队列
type PriorityQueue []*PriorityData
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
// 优先级高的排在前面
return pq[i].Priority > pq[j].Priority
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *PriorityQueue) Push(x interface{}) {
*pq = append(*pq, x.(*PriorityData))
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
// PriorityFanIn 带优先级的扇入
type PriorityFanIn struct {
inputs []<-chan *PriorityData
output chan *PriorityData
pq *PriorityQueue
mu sync.Mutex
wg sync.WaitGroup
done chan struct{}
}
// NewPriorityFanIn 创建优先级扇入
func NewPriorityFanIn(inputs ...<-chan *PriorityData) *PriorityFanIn {
pfi := &PriorityFanIn{
inputs: inputs,
output: make(chan *PriorityData),
pq: &PriorityQueue{},
done: make(chan struct{}),
}
heap.Init(pfi.pq)
return pfi
}
// Start 启动优先级扇入
func (pfi *PriorityFanIn) Start() <-chan *PriorityData {
// 为每个输入启动goroutine
for i, input := range pfi.inputs {
pfi.wg.Add(1)
go pfi.readInput(i, input)
}
// 启动输出goroutine
go pfi.writeOutput()
// 等待所有输入完成
go func() {
pfi.wg.Wait()
close(pfi.done)
}()
return pfi.output
}
// readInput 读取输入数据
func (pfi *PriorityFanIn) readInput(source int, input <-chan *PriorityData) {
defer pfi.wg.Done()
for data := range input {
data.Source = source
pfi.mu.Lock()
heap.Push(pfi.pq, data)
pfi.mu.Unlock()
}
}
// writeOutput 输出数据
func (pfi *PriorityFanIn) writeOutput() {
defer close(pfi.output)
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pfi.mu.Lock()
if pfi.pq.Len() > 0 {
data := heap.Pop(pfi.pq).(*PriorityData)
pfi.mu.Unlock()
pfi.output <- data
} else {
pfi.mu.Unlock()
}
case <-pfi.done:
// 输出剩余数据
pfi.mu.Lock()
for pfi.pq.Len() > 0 {
data := heap.Pop(pfi.pq).(*PriorityData)
pfi.mu.Unlock()
pfi.output <- data
pfi.mu.Lock()
}
pfi.mu.Unlock()
return
}
}
}
// 优先级数据生产者
func priorityProducer(id int, priorities []int) <-chan *PriorityData {
out := make(chan *PriorityData)
go func() {
defer close(out)
for i, priority := range priorities {
time.Sleep(time.Millisecond * 200)
data := &PriorityData{
Data: fmt.Sprintf("生产者%d-数据%d", id, i+1),
Priority: priority,
}
out <- data
fmt.Printf("生产者%d生产了优先级%d的数据: %s\n", id, priority, data.Data)
}
}()
return out
}
func main() {
// 创建不同优先级的生产者
producer1 := priorityProducer(1, []int{1, 3, 5, 2})
producer2 := priorityProducer(2, []int{4, 2, 6, 1})
producer3 := priorityProducer(3, []int{3, 5, 1, 4})
// 创建优先级扇入
pfi := NewPriorityFanIn(producer1, producer2, producer3)
output := pfi.Start()
// 消费按优先级排序的数据
fmt.Println("\n按优先级消费数据:")
for data := range output {
fmt.Printf("消费者收到: %s (优先级: %d, 来源: %d)\n",
data.Data, data.Priority, data.Source)
}
fmt.Println("所有数据消费完成")
}
组合 Fan-out 和 Fan-in #
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Pipeline 数据处理管道
type Pipeline struct {
input <-chan int
fanOutCount int
fanInOutput chan string
}
// NewPipeline 创建新的管道
func NewPipeline(input <-chan int, fanOutCount int) *Pipeline {
return &Pipeline{
input: input,
fanOutCount: fanOutCount,
fanInOutput: make(chan string, 100),
}
}
// processor 数据处理器
func (p *Pipeline) processor(id int, input <-chan int, output chan<- string) {
defer func() {
fmt.Printf("处理器%d退出\n", id)
}()
for data := range input {
// 模拟处理时间
processingTime := time.Duration(rand.Intn(500)) * time.Millisecond
time.Sleep(processingTime)
result := fmt.Sprintf("处理器%d处理数据%d得到结果%d", id, data, data*data)
output <- result
}
}
// Start 启动管道
func (p *Pipeline) Start() <-chan string {
// Fan-out: 创建多个处理器
processors := make([]chan int, p.fanOutCount)
outputs := make([]<-chan string, p.fanOutCount)
for i := 0; i < p.fanOutCount; i++ {
processors[i] = make(chan int, 10)
output := make(chan string, 10)
outputs[i] = output
go p.processor(i+1, processors[i], output)
}
// 分发输入数据到处理器(轮询)
go func() {
defer func() {
for _, proc := range processors {
close(proc)
}
}()
index := 0
for data := range p.input {
processors[index] <- data
index = (index + 1) % len(processors)
}
}()
// Fan-in: 合并所有处理器的输出
go func() {
defer close(p.fanInOutput)
var wg sync.WaitGroup
for i, output := range outputs {
wg.Add(1)
go func(id int, ch <-chan string) {
defer wg.Done()
for result := range ch {
p.fanInOutput <- result
}
}(i+1, output)
}
wg.Wait()
}()
return p.fanInOutput
}
// 数据生成器
func dataGenerator(count int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= count; i++ {
out <- i
time.Sleep(time.Millisecond * 50)
}
fmt.Println("数据生成完成")
}()
return out
}
func main() {
rand.Seed(time.Now().UnixNano())
// 生成数据
data := dataGenerator(20)
// 创建处理管道
pipeline := NewPipeline(data, 3)
results := pipeline.Start()
// 消费结果
fmt.Println("开始消费处理结果:")
resultCount := 0
for result := range results {
resultCount++
fmt.Printf("结果%d: %s\n", resultCount, result)
}
fmt.Printf("总共处理了%d个结果\n", resultCount)
}
实际应用场景 #
1. 日志处理系统 #
// LogProcessor 日志处理系统
type LogProcessor struct {
parsers []chan string
analyzers []chan LogEntry
output chan AnalysisResult
}
type LogEntry struct {
Timestamp time.Time
Level string
Message string
Source string
}
type AnalysisResult struct {
Type string
Count int
Details map[string]interface{}
}
// 实现日志的Fan-out解析和Fan-in分析
2. 图像处理管道 #
// ImageProcessor 图像处理管道
type ImageProcessor struct {
resizers []chan ImageTask
compressors []chan ImageTask
output chan ProcessedImage
}
// 实现图像的并行处理和结果合并
性能优化建议 #
1. 缓冲区大小优化 #
// 根据处理速度差异调整缓冲区大小
func optimizedFanOut(input <-chan int, numProcessors int) []<-chan string {
outputs := make([]<-chan string, numProcessors)
for i := 0; i < numProcessors; i++ {
// 根据处理器性能调整缓冲区大小
bufferSize := 10 * (i + 1) // 示例:不同处理器不同缓冲区
output := make(chan string, bufferSize)
outputs[i] = output
go func(id int, out chan<- string) {
defer close(out)
// 处理逻辑...
}(i, output)
}
return outputs
}
2. 动态负载均衡 #
// 基于队列长度的动态负载均衡
func dynamicLoadBalance(task Task, workers []chan Task) {
minQueueLen := len(workers[0])
selectedWorker := 0
for i, worker := range workers {
if len(worker) < minQueueLen {
minQueueLen = len(worker)
selectedWorker = i
}
}
workers[selectedWorker] <- task
}
Fan-in/Fan-out 模式是构建高效并发系统的重要工具,通过合理的设计和实现,可以显著提高系统的处理能力和资源利用率。在实际应用中,要根据具体的业务需求选择合适的实现策略,并注意性能优化和资源管理。