2.2.2 Channel 缓冲与关闭 #
有缓冲 Channel #
有缓冲 Channel 是 Go 语言中另一种重要的 Channel 类型,它在发送者和接收者之间提供了一个缓冲区,允许异步通信。理解有缓冲 Channel 的行为对于设计高效的并发程序至关重要。
有缓冲 Channel 的特点 #
- 异步通信:发送操作不需要立即有接收者
- 容量限制:可以存储指定数量的元素
- 阻塞条件:只有在缓冲区满时发送才阻塞,缓冲区空时接收才阻塞
- 性能优势:减少 Goroutine 间的同步开销
创建有缓冲 Channel #
package main
import "fmt"
func main() {
// 创建容量为3的有缓冲 Channel
ch := make(chan int, 3)
// 可以连续发送3个值而不阻塞
ch <- 1
ch <- 2
ch <- 3
fmt.Printf("Channel length: %d\n", len(ch))
fmt.Printf("Channel capacity: %d\n", cap(ch))
// 接收值
fmt.Printf("Received: %d\n", <-ch)
fmt.Printf("Received: %d\n", <-ch)
fmt.Printf("Received: %d\n", <-ch)
}
有缓冲 Channel 的行为分析 #
package main
import (
"fmt"
"time"
)
func bufferBehaviorDemo() {
ch := make(chan string, 2) // 容量为2的缓冲 Channel
// 发送方
go func() {
messages := []string{"Hello", "World", "Go", "Channel"}
for i, msg := range messages {
fmt.Printf("Sending message %d: %s\n", i+1, msg)
ch <- msg
fmt.Printf("Message %d sent (buffer len: %d)\n", i+1, len(ch))
time.Sleep(500 * time.Millisecond)
}
close(ch)
}()
// 接收方(延迟启动)
time.Sleep(1 * time.Second)
fmt.Println("Starting to receive...")
for msg := range ch {
fmt.Printf("Received: %s (buffer len: %d)\n", msg, len(ch))
time.Sleep(800 * time.Millisecond)
}
}
func main() {
bufferBehaviorDemo()
}
无缓冲 vs 有缓冲 Channel 对比 #
性能对比示例 #
package main
import (
"fmt"
"sync"
"time"
)
func benchmarkUnbuffered() time.Duration {
start := time.Now()
ch := make(chan int) // 无缓冲
var wg sync.WaitGroup
const numMessages = 10000
// 发送者
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < numMessages; i++ {
ch <- i
}
close(ch)
}()
// 接收者
wg.Add(1)
go func() {
defer wg.Done()
for range ch {
// 接收数据
}
}()
wg.Wait()
return time.Since(start)
}
func benchmarkBuffered(bufferSize int) time.Duration {
start := time.Now()
ch := make(chan int, bufferSize) // 有缓冲
var wg sync.WaitGroup
const numMessages = 10000
// 发送者
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < numMessages; i++ {
ch <- i
}
close(ch)
}()
// 接收者
wg.Add(1)
go func() {
defer wg.Done()
for range ch {
// 接收数据
}
}()
wg.Wait()
return time.Since(start)
}
func main() {
fmt.Println("Channel Performance Comparison:")
unbufferedTime := benchmarkUnbuffered()
fmt.Printf("Unbuffered channel: %v\n", unbufferedTime)
buffered10Time := benchmarkBuffered(10)
fmt.Printf("Buffered channel (10): %v\n", buffered10Time)
buffered100Time := benchmarkBuffered(100)
fmt.Printf("Buffered channel (100): %v\n", buffered100Time)
buffered1000Time := benchmarkBuffered(1000)
fmt.Printf("Buffered channel (1000): %v\n", buffered1000Time)
}
使用场景对比 #
package main
import (
"fmt"
"sync"
"time"
)
// 场景1:严格同步 - 使用无缓冲 Channel
func strictSynchronization() {
fmt.Println("=== Strict Synchronization (Unbuffered) ===")
ch := make(chan string)
go func() {
fmt.Println("Goroutine: Preparing data...")
time.Sleep(1 * time.Second)
fmt.Println("Goroutine: Sending data")
ch <- "Important Data"
fmt.Println("Goroutine: Data sent, receiver must be ready")
}()
time.Sleep(500 * time.Millisecond)
fmt.Println("Main: Ready to receive")
data := <-ch
fmt.Printf("Main: Received %s\n", data)
}
// 场景2:批量处理 - 使用有缓冲 Channel
func batchProcessing() {
fmt.Println("\n=== Batch Processing (Buffered) ===")
ch := make(chan int, 5) // 批量大小为5
// 生产者:快速生成数据
go func() {
for i := 1; i <= 10; i++ {
fmt.Printf("Producing: %d\n", i)
ch <- i
if i%5 == 0 {
fmt.Printf("Batch of 5 produced, buffer len: %d\n", len(ch))
}
}
close(ch)
}()
// 消费者:批量处理
time.Sleep(2 * time.Second) // 模拟消费者延迟
fmt.Println("Consumer starting...")
for num := range ch {
fmt.Printf("Processing: %d\n", num)
time.Sleep(200 * time.Millisecond)
}
}
func main() {
strictSynchronization()
batchProcessing()
}
Channel 的关闭机制 #
关闭 Channel 的基本概念 #
关闭 Channel 是一个重要的操作,它向接收者发出信号表示不会再有更多的数据发送。
package main
import "fmt"
func basicCloseExample() {
ch := make(chan int, 3)
// 发送一些数据
ch <- 1
ch <- 2
ch <- 3
// 关闭 Channel
close(ch)
// 从已关闭的 Channel 接收数据
for {
value, ok := <-ch
if !ok {
fmt.Println("Channel is closed")
break
}
fmt.Printf("Received: %d\n", value)
}
// 从已关闭且空的 Channel 接收会得到零值
value, ok := <-ch
fmt.Printf("After close: value=%d, ok=%v\n", value, ok)
}
func main() {
basicCloseExample()
}
使用 range 遍历 Channel #
range
语句提供了一种优雅的方式来接收 Channel 中的所有数据:
package main
import (
"fmt"
"time"
)
func producer(ch chan<- int) {
defer close(ch) // 确保关闭 Channel
for i := 1; i <= 5; i++ {
fmt.Printf("Producing: %d\n", i)
ch <- i
time.Sleep(500 * time.Millisecond)
}
fmt.Println("Producer finished")
}
func consumer(ch <-chan int) {
fmt.Println("Consumer starting...")
// 使用 range 自动处理 Channel 关闭
for value := range ch {
fmt.Printf("Consuming: %d\n", value)
time.Sleep(300 * time.Millisecond)
}
fmt.Println("Consumer finished")
}
func main() {
ch := make(chan int, 2)
go producer(ch)
consumer(ch) // 在主 Goroutine 中运行
}
关闭 Channel 的最佳实践 #
1. 谁负责关闭 Channel #
package main
import (
"fmt"
"sync"
"time"
)
// 原则:发送者关闭 Channel
func senderClosesChannel() {
fmt.Println("=== Sender Closes Channel ===")
ch := make(chan string, 2)
// 发送者
go func() {
defer close(ch) // 发送者负责关闭
messages := []string{"Hello", "World", "Goodbye"}
for _, msg := range messages {
ch <- msg
time.Sleep(500 * time.Millisecond)
}
}()
// 接收者
for msg := range ch {
fmt.Printf("Received: %s\n", msg)
}
}
// 多个发送者的情况:使用额外的信号 Channel
func multipleSenders() {
fmt.Println("\n=== Multiple Senders ===")
ch := make(chan int, 10)
done := make(chan bool)
var wg sync.WaitGroup
// 多个发送者
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 1; j <= 3; j++ {
value := id*10 + j
ch <- value
fmt.Printf("Sender %d: sent %d\n", id, value)
time.Sleep(200 * time.Millisecond)
}
}(i)
}
// 等待所有发送者完成,然后关闭 Channel
go func() {
wg.Wait()
close(ch)
done <- true
}()
// 接收者
go func() {
for value := range ch {
fmt.Printf("Received: %d\n", value)
}
}()
<-done
time.Sleep(100 * time.Millisecond)
}
func main() {
senderClosesChannel()
multipleSenders()
}
2. 安全关闭 Channel #
package main
import (
"fmt"
"sync"
)
// 安全关闭 Channel 的工具函数
type SafeChannel struct {
ch chan int
closed bool
mu sync.Mutex
}
func NewSafeChannel(buffer int) *SafeChannel {
return &SafeChannel{
ch: make(chan int, buffer),
}
}
func (sc *SafeChannel) Send(value int) bool {
sc.mu.Lock()
defer sc.mu.Unlock()
if sc.closed {
return false
}
select {
case sc.ch <- value:
return true
default:
return false // Channel 满了
}
}
func (sc *SafeChannel) Close() {
sc.mu.Lock()
defer sc.mu.Unlock()
if !sc.closed {
close(sc.ch)
sc.closed = true
}
}
func (sc *SafeChannel) Receive() (int, bool) {
value, ok := <-sc.ch
return value, ok
}
func (sc *SafeChannel) Range() <-chan int {
return sc.ch
}
func safeChannelExample() {
sc := NewSafeChannel(5)
var wg sync.WaitGroup
// 多个发送者
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 1; j <= 5; j++ {
value := id*10 + j
if sc.Send(value) {
fmt.Printf("Sender %d: sent %d\n", id, value)
} else {
fmt.Printf("Sender %d: failed to send %d\n", id, value)
}
}
}(i)
}
// 接收者
go func() {
for value := range sc.Range() {
fmt.Printf("Received: %d\n", value)
}
}()
wg.Wait()
sc.Close()
// 尝试再次发送(应该失败)
if !sc.Send(999) {
fmt.Println("Cannot send to closed channel")
}
}
func main() {
safeChannelExample()
}
Channel 的高级用法 #
1. Channel 作为信号量 #
package main
import (
"fmt"
"sync"
"time"
)
// 使用 Channel 实现信号量
type Semaphore struct {
ch chan struct{}
}
func NewSemaphore(capacity int) *Semaphore {
return &Semaphore{
ch: make(chan struct{}, capacity),
}
}
func (s *Semaphore) Acquire() {
s.ch <- struct{}{}
}
func (s *Semaphore) Release() {
<-s.ch
}
func worker(id int, sem *Semaphore, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d: Waiting for semaphore\n", id)
sem.Acquire()
defer sem.Release()
fmt.Printf("Worker %d: Acquired semaphore, working...\n", id)
time.Sleep(2 * time.Second)
fmt.Printf("Worker %d: Work completed\n", id)
}
func semaphoreExample() {
const maxConcurrency = 3
sem := NewSemaphore(maxConcurrency)
var wg sync.WaitGroup
// 启动10个工作者,但最多只有3个并发
for i := 1; i <= 10; i++ {
wg.Add(1)
go worker(i, sem, &wg)
}
wg.Wait()
fmt.Println("All workers completed")
}
func main() {
semaphoreExample()
}
2. Channel 实现互斥锁 #
package main
import (
"fmt"
"sync"
"time"
)
// 使用 Channel 实现互斥锁
type ChannelMutex struct {
ch chan struct{}
}
func NewChannelMutex() *ChannelMutex {
ch := make(chan struct{}, 1)
ch <- struct{}{} // 初始状态为可用
return &ChannelMutex{ch: ch}
}
func (m *ChannelMutex) Lock() {
<-m.ch // 获取锁
}
func (m *ChannelMutex) Unlock() {
select {
case m.ch <- struct{}{}: // 释放锁
default:
panic("unlock of unlocked mutex")
}
}
var counter int
var channelMutex = NewChannelMutex()
func increment(id int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 1000; i++ {
channelMutex.Lock()
counter++
channelMutex.Unlock()
}
fmt.Printf("Goroutine %d completed\n", id)
}
func channelMutexExample() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go increment(i, &wg)
}
wg.Wait()
fmt.Printf("Final counter value: %d\n", counter)
}
func main() {
channelMutexExample()
}
3. 超时和取消模式 #
package main
import (
"context"
"fmt"
"time"
)
// 使用 Channel 实现超时
func timeoutWithChannel() {
fmt.Println("=== Timeout with Channel ===")
result := make(chan string, 1)
go func() {
time.Sleep(3 * time.Second)
result <- "Operation completed"
}()
select {
case res := <-result:
fmt.Printf("Success: %s\n", res)
case <-time.After(2 * time.Second):
fmt.Println("Timeout: Operation took too long")
}
}
// 使用 Context 和 Channel 实现取消
func cancellationWithContext() {
fmt.Println("\n=== Cancellation with Context ===")
ctx, cancel := context.WithCancel(context.Background())
result := make(chan string, 1)
go func() {
select {
case <-time.After(3 * time.Second):
result <- "Long operation completed"
case <-ctx.Done():
result <- "Operation cancelled"
}
}()
// 1秒后取消操作
go func() {
time.Sleep(1 * time.Second)
cancel()
}()
res := <-result
fmt.Printf("Result: %s\n", res)
}
func main() {
timeoutWithChannel()
cancellationWithContext()
}
常见陷阱和解决方案 #
1. 向已关闭的 Channel 发送数据 #
package main
import (
"fmt"
"sync"
)
func avoidPanicOnClosedChannel() {
ch := make(chan int, 2)
var wg sync.WaitGroup
// 发送者
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
fmt.Printf("Recovered from panic: %v\n", r)
}
}()
ch <- 1
close(ch)
// 这会导致 panic
// ch <- 2
// 安全的方式:检查 Channel 是否已关闭
select {
case ch <- 2:
fmt.Println("Sent 2")
default:
fmt.Println("Channel is closed or full")
}
}()
wg.Wait()
// 接收剩余数据
for value := range ch {
fmt.Printf("Received: %d\n", value)
}
}
func main() {
avoidPanicOnClosedChannel()
}
2. Goroutine 泄漏 #
package main
import (
"context"
"fmt"
"runtime"
"time"
)
// 错误示例:可能导致 Goroutine 泄漏
func badExample() {
fmt.Printf("Before: %d goroutines\n", runtime.NumGoroutine())
ch := make(chan int)
// 这个 Goroutine 可能永远阻塞
go func() {
value := <-ch // 如果没有发送者,这里会永远阻塞
fmt.Printf("Received: %d\n", value)
}()
time.Sleep(100 * time.Millisecond)
fmt.Printf("After: %d goroutines\n", runtime.NumGoroutine())
}
// 正确示例:使用 Context 避免泄漏
func goodExample() {
fmt.Printf("Before: %d goroutines\n", runtime.NumGoroutine())
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
ch := make(chan int)
go func() {
select {
case value := <-ch:
fmt.Printf("Received: %d\n", value)
case <-ctx.Done():
fmt.Println("Goroutine cancelled")
}
}()
time.Sleep(1500 * time.Millisecond) // 超过超时时间
fmt.Printf("After: %d goroutines\n", runtime.NumGoroutine())
}
func main() {
fmt.Println("=== Bad Example ===")
badExample()
time.Sleep(2 * time.Second)
fmt.Println("\n=== Good Example ===")
goodExample()
}
小结 #
在本节中,我们深入学习了:
- 有缓冲 Channel:异步通信、性能优势和使用场景
- Channel 关闭:关闭机制、最佳实践和安全关闭方法
- 高级用法:信号量、互斥锁、超时和取消模式
- 常见陷阱:避免 panic 和 Goroutine 泄漏的方法
理解 Channel 的缓冲机制和关闭语义对于编写健壮的并发程序至关重要。在下一节中,我们将学习 select 语句,它提供了强大的多路复用能力。
练习题 #
- 实现一个带缓冲的消息队列,支持多个生产者和消费者,并能优雅地关闭
- 使用 Channel 实现一个简单的连接池,限制最大连接数
- 设计一个可取消的文件下载器,支持超时和手动取消功能