2.2.2 Channel 缓冲与关闭

2.2.2 Channel 缓冲与关闭 #

有缓冲 Channel #

有缓冲 Channel 是 Go 语言中另一种重要的 Channel 类型,它在发送者和接收者之间提供了一个缓冲区,允许异步通信。理解有缓冲 Channel 的行为对于设计高效的并发程序至关重要。

有缓冲 Channel 的特点 #

  • 异步通信:发送操作不需要立即有接收者
  • 容量限制:可以存储指定数量的元素
  • 阻塞条件:只有在缓冲区满时发送才阻塞,缓冲区空时接收才阻塞
  • 性能优势:减少 Goroutine 间的同步开销

创建有缓冲 Channel #

package main

import "fmt"

func main() {
    // 创建容量为3的有缓冲 Channel
    ch := make(chan int, 3)

    // 可以连续发送3个值而不阻塞
    ch <- 1
    ch <- 2
    ch <- 3

    fmt.Printf("Channel length: %d\n", len(ch))
    fmt.Printf("Channel capacity: %d\n", cap(ch))

    // 接收值
    fmt.Printf("Received: %d\n", <-ch)
    fmt.Printf("Received: %d\n", <-ch)
    fmt.Printf("Received: %d\n", <-ch)
}

有缓冲 Channel 的行为分析 #

package main

import (
    "fmt"
    "time"
)

func bufferBehaviorDemo() {
    ch := make(chan string, 2) // 容量为2的缓冲 Channel

    // 发送方
    go func() {
        messages := []string{"Hello", "World", "Go", "Channel"}

        for i, msg := range messages {
            fmt.Printf("Sending message %d: %s\n", i+1, msg)
            ch <- msg
            fmt.Printf("Message %d sent (buffer len: %d)\n", i+1, len(ch))
            time.Sleep(500 * time.Millisecond)
        }
        close(ch)
    }()

    // 接收方(延迟启动)
    time.Sleep(1 * time.Second)
    fmt.Println("Starting to receive...")

    for msg := range ch {
        fmt.Printf("Received: %s (buffer len: %d)\n", msg, len(ch))
        time.Sleep(800 * time.Millisecond)
    }
}

func main() {
    bufferBehaviorDemo()
}

无缓冲 vs 有缓冲 Channel 对比 #

性能对比示例 #

package main

import (
    "fmt"
    "sync"
    "time"
)

func benchmarkUnbuffered() time.Duration {
    start := time.Now()
    ch := make(chan int) // 无缓冲
    var wg sync.WaitGroup

    const numMessages = 10000

    // 发送者
    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 0; i < numMessages; i++ {
            ch <- i
        }
        close(ch)
    }()

    // 接收者
    wg.Add(1)
    go func() {
        defer wg.Done()
        for range ch {
            // 接收数据
        }
    }()

    wg.Wait()
    return time.Since(start)
}

func benchmarkBuffered(bufferSize int) time.Duration {
    start := time.Now()
    ch := make(chan int, bufferSize) // 有缓冲
    var wg sync.WaitGroup

    const numMessages = 10000

    // 发送者
    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 0; i < numMessages; i++ {
            ch <- i
        }
        close(ch)
    }()

    // 接收者
    wg.Add(1)
    go func() {
        defer wg.Done()
        for range ch {
            // 接收数据
        }
    }()

    wg.Wait()
    return time.Since(start)
}

func main() {
    fmt.Println("Channel Performance Comparison:")

    unbufferedTime := benchmarkUnbuffered()
    fmt.Printf("Unbuffered channel: %v\n", unbufferedTime)

    buffered10Time := benchmarkBuffered(10)
    fmt.Printf("Buffered channel (10): %v\n", buffered10Time)

    buffered100Time := benchmarkBuffered(100)
    fmt.Printf("Buffered channel (100): %v\n", buffered100Time)

    buffered1000Time := benchmarkBuffered(1000)
    fmt.Printf("Buffered channel (1000): %v\n", buffered1000Time)
}

使用场景对比 #

package main

import (
    "fmt"
    "sync"
    "time"
)

// 场景1:严格同步 - 使用无缓冲 Channel
func strictSynchronization() {
    fmt.Println("=== Strict Synchronization (Unbuffered) ===")
    ch := make(chan string)

    go func() {
        fmt.Println("Goroutine: Preparing data...")
        time.Sleep(1 * time.Second)
        fmt.Println("Goroutine: Sending data")
        ch <- "Important Data"
        fmt.Println("Goroutine: Data sent, receiver must be ready")
    }()

    time.Sleep(500 * time.Millisecond)
    fmt.Println("Main: Ready to receive")
    data := <-ch
    fmt.Printf("Main: Received %s\n", data)
}

// 场景2:批量处理 - 使用有缓冲 Channel
func batchProcessing() {
    fmt.Println("\n=== Batch Processing (Buffered) ===")
    ch := make(chan int, 5) // 批量大小为5

    // 生产者:快速生成数据
    go func() {
        for i := 1; i <= 10; i++ {
            fmt.Printf("Producing: %d\n", i)
            ch <- i
            if i%5 == 0 {
                fmt.Printf("Batch of 5 produced, buffer len: %d\n", len(ch))
            }
        }
        close(ch)
    }()

    // 消费者:批量处理
    time.Sleep(2 * time.Second) // 模拟消费者延迟
    fmt.Println("Consumer starting...")

    for num := range ch {
        fmt.Printf("Processing: %d\n", num)
        time.Sleep(200 * time.Millisecond)
    }
}

func main() {
    strictSynchronization()
    batchProcessing()
}

Channel 的关闭机制 #

关闭 Channel 的基本概念 #

关闭 Channel 是一个重要的操作,它向接收者发出信号表示不会再有更多的数据发送。

package main

import "fmt"

func basicCloseExample() {
    ch := make(chan int, 3)

    // 发送一些数据
    ch <- 1
    ch <- 2
    ch <- 3

    // 关闭 Channel
    close(ch)

    // 从已关闭的 Channel 接收数据
    for {
        value, ok := <-ch
        if !ok {
            fmt.Println("Channel is closed")
            break
        }
        fmt.Printf("Received: %d\n", value)
    }

    // 从已关闭且空的 Channel 接收会得到零值
    value, ok := <-ch
    fmt.Printf("After close: value=%d, ok=%v\n", value, ok)
}

func main() {
    basicCloseExample()
}

使用 range 遍历 Channel #

range 语句提供了一种优雅的方式来接收 Channel 中的所有数据:

package main

import (
    "fmt"
    "time"
)

func producer(ch chan<- int) {
    defer close(ch) // 确保关闭 Channel

    for i := 1; i <= 5; i++ {
        fmt.Printf("Producing: %d\n", i)
        ch <- i
        time.Sleep(500 * time.Millisecond)
    }
    fmt.Println("Producer finished")
}

func consumer(ch <-chan int) {
    fmt.Println("Consumer starting...")

    // 使用 range 自动处理 Channel 关闭
    for value := range ch {
        fmt.Printf("Consuming: %d\n", value)
        time.Sleep(300 * time.Millisecond)
    }

    fmt.Println("Consumer finished")
}

func main() {
    ch := make(chan int, 2)

    go producer(ch)
    consumer(ch) // 在主 Goroutine 中运行
}

关闭 Channel 的最佳实践 #

1. 谁负责关闭 Channel #

package main

import (
    "fmt"
    "sync"
    "time"
)

// 原则:发送者关闭 Channel
func senderClosesChannel() {
    fmt.Println("=== Sender Closes Channel ===")
    ch := make(chan string, 2)

    // 发送者
    go func() {
        defer close(ch) // 发送者负责关闭

        messages := []string{"Hello", "World", "Goodbye"}
        for _, msg := range messages {
            ch <- msg
            time.Sleep(500 * time.Millisecond)
        }
    }()

    // 接收者
    for msg := range ch {
        fmt.Printf("Received: %s\n", msg)
    }
}

// 多个发送者的情况:使用额外的信号 Channel
func multipleSenders() {
    fmt.Println("\n=== Multiple Senders ===")
    ch := make(chan int, 10)
    done := make(chan bool)
    var wg sync.WaitGroup

    // 多个发送者
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 1; j <= 3; j++ {
                value := id*10 + j
                ch <- value
                fmt.Printf("Sender %d: sent %d\n", id, value)
                time.Sleep(200 * time.Millisecond)
            }
        }(i)
    }

    // 等待所有发送者完成,然后关闭 Channel
    go func() {
        wg.Wait()
        close(ch)
        done <- true
    }()

    // 接收者
    go func() {
        for value := range ch {
            fmt.Printf("Received: %d\n", value)
        }
    }()

    <-done
    time.Sleep(100 * time.Millisecond)
}

func main() {
    senderClosesChannel()
    multipleSenders()
}

2. 安全关闭 Channel #

package main

import (
    "fmt"
    "sync"
)

// 安全关闭 Channel 的工具函数
type SafeChannel struct {
    ch     chan int
    closed bool
    mu     sync.Mutex
}

func NewSafeChannel(buffer int) *SafeChannel {
    return &SafeChannel{
        ch: make(chan int, buffer),
    }
}

func (sc *SafeChannel) Send(value int) bool {
    sc.mu.Lock()
    defer sc.mu.Unlock()

    if sc.closed {
        return false
    }

    select {
    case sc.ch <- value:
        return true
    default:
        return false // Channel 满了
    }
}

func (sc *SafeChannel) Close() {
    sc.mu.Lock()
    defer sc.mu.Unlock()

    if !sc.closed {
        close(sc.ch)
        sc.closed = true
    }
}

func (sc *SafeChannel) Receive() (int, bool) {
    value, ok := <-sc.ch
    return value, ok
}

func (sc *SafeChannel) Range() <-chan int {
    return sc.ch
}

func safeChannelExample() {
    sc := NewSafeChannel(5)
    var wg sync.WaitGroup

    // 多个发送者
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 1; j <= 5; j++ {
                value := id*10 + j
                if sc.Send(value) {
                    fmt.Printf("Sender %d: sent %d\n", id, value)
                } else {
                    fmt.Printf("Sender %d: failed to send %d\n", id, value)
                }
            }
        }(i)
    }

    // 接收者
    go func() {
        for value := range sc.Range() {
            fmt.Printf("Received: %d\n", value)
        }
    }()

    wg.Wait()
    sc.Close()

    // 尝试再次发送(应该失败)
    if !sc.Send(999) {
        fmt.Println("Cannot send to closed channel")
    }
}

func main() {
    safeChannelExample()
}

Channel 的高级用法 #

1. Channel 作为信号量 #

package main

import (
    "fmt"
    "sync"
    "time"
)

// 使用 Channel 实现信号量
type Semaphore struct {
    ch chan struct{}
}

func NewSemaphore(capacity int) *Semaphore {
    return &Semaphore{
        ch: make(chan struct{}, capacity),
    }
}

func (s *Semaphore) Acquire() {
    s.ch <- struct{}{}
}

func (s *Semaphore) Release() {
    <-s.ch
}

func worker(id int, sem *Semaphore, wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Printf("Worker %d: Waiting for semaphore\n", id)
    sem.Acquire()
    defer sem.Release()

    fmt.Printf("Worker %d: Acquired semaphore, working...\n", id)
    time.Sleep(2 * time.Second)
    fmt.Printf("Worker %d: Work completed\n", id)
}

func semaphoreExample() {
    const maxConcurrency = 3
    sem := NewSemaphore(maxConcurrency)
    var wg sync.WaitGroup

    // 启动10个工作者,但最多只有3个并发
    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go worker(i, sem, &wg)
    }

    wg.Wait()
    fmt.Println("All workers completed")
}

func main() {
    semaphoreExample()
}

2. Channel 实现互斥锁 #

package main

import (
    "fmt"
    "sync"
    "time"
)

// 使用 Channel 实现互斥锁
type ChannelMutex struct {
    ch chan struct{}
}

func NewChannelMutex() *ChannelMutex {
    ch := make(chan struct{}, 1)
    ch <- struct{}{} // 初始状态为可用
    return &ChannelMutex{ch: ch}
}

func (m *ChannelMutex) Lock() {
    <-m.ch // 获取锁
}

func (m *ChannelMutex) Unlock() {
    select {
    case m.ch <- struct{}{}: // 释放锁
    default:
        panic("unlock of unlocked mutex")
    }
}

var counter int
var channelMutex = NewChannelMutex()

func increment(id int, wg *sync.WaitGroup) {
    defer wg.Done()

    for i := 0; i < 1000; i++ {
        channelMutex.Lock()
        counter++
        channelMutex.Unlock()
    }

    fmt.Printf("Goroutine %d completed\n", id)
}

func channelMutexExample() {
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go increment(i, &wg)
    }

    wg.Wait()
    fmt.Printf("Final counter value: %d\n", counter)
}

func main() {
    channelMutexExample()
}

3. 超时和取消模式 #

package main

import (
    "context"
    "fmt"
    "time"
)

// 使用 Channel 实现超时
func timeoutWithChannel() {
    fmt.Println("=== Timeout with Channel ===")

    result := make(chan string, 1)

    go func() {
        time.Sleep(3 * time.Second)
        result <- "Operation completed"
    }()

    select {
    case res := <-result:
        fmt.Printf("Success: %s\n", res)
    case <-time.After(2 * time.Second):
        fmt.Println("Timeout: Operation took too long")
    }
}

// 使用 Context 和 Channel 实现取消
func cancellationWithContext() {
    fmt.Println("\n=== Cancellation with Context ===")

    ctx, cancel := context.WithCancel(context.Background())
    result := make(chan string, 1)

    go func() {
        select {
        case <-time.After(3 * time.Second):
            result <- "Long operation completed"
        case <-ctx.Done():
            result <- "Operation cancelled"
        }
    }()

    // 1秒后取消操作
    go func() {
        time.Sleep(1 * time.Second)
        cancel()
    }()

    res := <-result
    fmt.Printf("Result: %s\n", res)
}

func main() {
    timeoutWithChannel()
    cancellationWithContext()
}

常见陷阱和解决方案 #

1. 向已关闭的 Channel 发送数据 #

package main

import (
    "fmt"
    "sync"
)

func avoidPanicOnClosedChannel() {
    ch := make(chan int, 2)
    var wg sync.WaitGroup

    // 发送者
    wg.Add(1)
    go func() {
        defer wg.Done()
        defer func() {
            if r := recover(); r != nil {
                fmt.Printf("Recovered from panic: %v\n", r)
            }
        }()

        ch <- 1
        close(ch)

        // 这会导致 panic
        // ch <- 2

        // 安全的方式:检查 Channel 是否已关闭
        select {
        case ch <- 2:
            fmt.Println("Sent 2")
        default:
            fmt.Println("Channel is closed or full")
        }
    }()

    wg.Wait()

    // 接收剩余数据
    for value := range ch {
        fmt.Printf("Received: %d\n", value)
    }
}

func main() {
    avoidPanicOnClosedChannel()
}

2. Goroutine 泄漏 #

package main

import (
    "context"
    "fmt"
    "runtime"
    "time"
)

// 错误示例:可能导致 Goroutine 泄漏
func badExample() {
    fmt.Printf("Before: %d goroutines\n", runtime.NumGoroutine())

    ch := make(chan int)

    // 这个 Goroutine 可能永远阻塞
    go func() {
        value := <-ch // 如果没有发送者,这里会永远阻塞
        fmt.Printf("Received: %d\n", value)
    }()

    time.Sleep(100 * time.Millisecond)
    fmt.Printf("After: %d goroutines\n", runtime.NumGoroutine())
}

// 正确示例:使用 Context 避免泄漏
func goodExample() {
    fmt.Printf("Before: %d goroutines\n", runtime.NumGoroutine())

    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    ch := make(chan int)

    go func() {
        select {
        case value := <-ch:
            fmt.Printf("Received: %d\n", value)
        case <-ctx.Done():
            fmt.Println("Goroutine cancelled")
        }
    }()

    time.Sleep(1500 * time.Millisecond) // 超过超时时间
    fmt.Printf("After: %d goroutines\n", runtime.NumGoroutine())
}

func main() {
    fmt.Println("=== Bad Example ===")
    badExample()

    time.Sleep(2 * time.Second)

    fmt.Println("\n=== Good Example ===")
    goodExample()
}

小结 #

在本节中,我们深入学习了:

  1. 有缓冲 Channel:异步通信、性能优势和使用场景
  2. Channel 关闭:关闭机制、最佳实践和安全关闭方法
  3. 高级用法:信号量、互斥锁、超时和取消模式
  4. 常见陷阱:避免 panic 和 Goroutine 泄漏的方法

理解 Channel 的缓冲机制和关闭语义对于编写健壮的并发程序至关重要。在下一节中,我们将学习 select 语句,它提供了强大的多路复用能力。

练习题 #

  1. 实现一个带缓冲的消息队列,支持多个生产者和消费者,并能优雅地关闭
  2. 使用 Channel 实现一个简单的连接池,限制最大连接数
  3. 设计一个可取消的文件下载器,支持超时和手动取消功能