2.2.1 Channel 基础与使用 #
Channel 概述 #
Channel 是 Go 语言中用于 Goroutine 间通信的核心机制。它就像一个管道,允许一个 Goroutine 向另一个 Goroutine 发送数据。Channel 的设计遵循了 CSP(Communicating Sequential Processes)理论,提供了一种安全、优雅的并发编程方式。
Channel 的基本概念 #
Channel 可以被看作是一个有类型的消息队列,具有以下特点:
- 类型安全:每个 Channel 只能传输特定类型的数据
- 同步机制:Channel 操作提供了天然的同步点
- 方向性:Channel 可以是双向的,也可以限制为单向
- 阻塞语义:在特定条件下,发送和接收操作会阻塞
Channel 的声明和创建 #
声明 Channel #
Channel 的声明语法如下:
var ch chan int // 声明一个传输 int 类型的 Channel
var ch chan string // 声明一个传输 string 类型的 Channel
var ch chan []byte // 声明一个传输 []byte 类型的 Channel
创建 Channel #
使用 make
函数创建 Channel:
package main
import "fmt"
func main() {
// 创建无缓冲 Channel
ch1 := make(chan int)
ch2 := make(chan string)
// 创建有缓冲 Channel
ch3 := make(chan int, 10) // 缓冲区大小为 10
ch4 := make(chan string, 5) // 缓冲区大小为 5
fmt.Printf("ch1 type: %T\n", ch1)
fmt.Printf("ch2 type: %T\n", ch2)
fmt.Printf("ch3 type: %T\n", ch3)
fmt.Printf("ch4 type: %T\n", ch4)
}
Channel 的零值 #
Channel 的零值是 nil
,nil Channel 有特殊的行为:
package main
import "fmt"
func main() {
var ch chan int
fmt.Printf("ch == nil: %v\n", ch == nil)
// 对 nil Channel 的操作会永远阻塞
// ch <- 1 // 这会导致死锁
// <-ch // 这也会导致死锁
// 必须使用 make 初始化
ch = make(chan int)
fmt.Printf("After make, ch == nil: %v\n", ch == nil)
}
Channel 的基本操作 #
发送操作 #
使用 <-
操作符向 Channel 发送数据:
ch <- value // 向 Channel ch 发送 value
接收操作 #
从 Channel 接收数据也使用 <-
操作符:
value := <-ch // 从 Channel ch 接收数据并赋值给 value
value, ok := <-ch // 接收数据,ok 表示 Channel 是否已关闭
基本使用示例 #
package main
import (
"fmt"
"time"
)
func sender(ch chan string) {
messages := []string{"Hello", "World", "From", "Goroutine"}
for _, msg := range messages {
fmt.Printf("Sending: %s\n", msg)
ch <- msg // 发送消息
time.Sleep(500 * time.Millisecond)
}
close(ch) // 关闭 Channel
}
func receiver(ch chan string) {
for {
msg, ok := <-ch // 接收消息
if !ok {
fmt.Println("Channel closed, receiver stopping")
break
}
fmt.Printf("Received: %s\n", msg)
}
}
func main() {
ch := make(chan string)
go sender(ch)
go receiver(ch)
// 等待足够的时间让 Goroutine 完成
time.Sleep(3 * time.Second)
}
无缓冲 Channel #
无缓冲 Channel(也称为同步 Channel)是最基本的 Channel 类型,它具有以下特点:
- 同步通信:发送和接收操作必须同时发生
- 阻塞语义:发送操作会阻塞直到有接收者,接收操作会阻塞直到有发送者
- 零容量:不能存储任何数据
无缓冲 Channel 示例 #
package main
import (
"fmt"
"time"
)
func worker(id int, ch chan int) {
for {
task, ok := <-ch
if !ok {
fmt.Printf("Worker %d: Channel closed\n", id)
return
}
fmt.Printf("Worker %d: Processing task %d\n", id, task)
time.Sleep(time.Second) // 模拟工作
fmt.Printf("Worker %d: Task %d completed\n", id, task)
}
}
func main() {
ch := make(chan int) // 无缓冲 Channel
// 启动两个工作者
go worker(1, ch)
go worker(2, ch)
// 发送任务
for i := 1; i <= 5; i++ {
fmt.Printf("Sending task %d\n", i)
ch <- i // 这里会阻塞直到有工作者接收
fmt.Printf("Task %d sent\n", i)
}
close(ch)
time.Sleep(3 * time.Second) // 等待工作者完成
}
Channel 的方向性 #
Channel 可以被限制为只读或只写,这在函数参数中特别有用:
单向 Channel 声明 #
var sendOnly chan<- int // 只写 Channel
var recvOnly <-chan int // 只读 Channel
单向 Channel 示例 #
package main
import (
"fmt"
"time"
)
// 只能向 Channel 发送数据
func producer(ch chan<- string) {
messages := []string{"Apple", "Banana", "Cherry", "Date"}
for _, msg := range messages {
fmt.Printf("Producing: %s\n", msg)
ch <- msg
time.Sleep(500 * time.Millisecond)
}
close(ch)
}
// 只能从 Channel 接收数据
func consumer(ch <-chan string) {
for msg := range ch {
fmt.Printf("Consuming: %s\n", msg)
time.Sleep(300 * time.Millisecond)
}
fmt.Println("Consumer finished")
}
func main() {
ch := make(chan string, 2) // 有缓冲 Channel
go producer(ch)
go consumer(ch)
time.Sleep(4 * time.Second)
}
Channel 与 Goroutine 的协作模式 #
1. 简单的请求-响应模式 #
package main
import (
"fmt"
"math/rand"
"time"
)
type Request struct {
ID int
Data string
Response chan string
}
func server(requests chan Request) {
for req := range requests {
// 模拟处理时间
processingTime := time.Duration(rand.Intn(1000)) * time.Millisecond
time.Sleep(processingTime)
// 发送响应
response := fmt.Sprintf("Processed request %d: %s", req.ID, req.Data)
req.Response <- response
close(req.Response)
}
}
func client(id int, requests chan Request) {
req := Request{
ID: id,
Data: fmt.Sprintf("Data from client %d", id),
Response: make(chan string),
}
fmt.Printf("Client %d: Sending request\n", id)
requests <- req
// 等待响应
response := <-req.Response
fmt.Printf("Client %d: Received response: %s\n", id, response)
}
func main() {
requests := make(chan Request, 10)
// 启动服务器
go server(requests)
// 启动多个客户端
for i := 1; i <= 5; i++ {
go client(i, requests)
}
time.Sleep(3 * time.Second)
close(requests)
}
2. 数据流水线模式 #
package main
import (
"fmt"
"time"
)
// 阶段1:生成数据
func generator(out chan<- int) {
defer close(out)
for i := 1; i <= 10; i++ {
fmt.Printf("Generating: %d\n", i)
out <- i
time.Sleep(100 * time.Millisecond)
}
}
// 阶段2:处理数据
func processor(in <-chan int, out chan<- int) {
defer close(out)
for num := range in {
processed := num * num
fmt.Printf("Processing: %d -> %d\n", num, processed)
out <- processed
time.Sleep(150 * time.Millisecond)
}
}
// 阶段3:输出数据
func printer(in <-chan int) {
for result := range in {
fmt.Printf("Final result: %d\n", result)
}
}
func main() {
// 创建管道
numbers := make(chan int)
processed := make(chan int)
// 启动流水线各阶段
go generator(numbers)
go processor(numbers, processed)
go printer(processed)
// 等待完成
time.Sleep(5 * time.Second)
}
Channel 的常见用法模式 #
1. 信号通知 #
package main
import (
"fmt"
"time"
)
func worker(id int, done chan bool) {
fmt.Printf("Worker %d: Starting work\n", id)
// 模拟工作
time.Sleep(time.Duration(id) * time.Second)
fmt.Printf("Worker %d: Work completed\n", id)
done <- true // 发送完成信号
}
func main() {
done := make(chan bool)
// 启动多个工作者
for i := 1; i <= 3; i++ {
go worker(i, done)
}
// 等待所有工作者完成
for i := 1; i <= 3; i++ {
<-done // 接收完成信号
fmt.Printf("Received completion signal %d\n", i)
}
fmt.Println("All workers completed")
}
2. 限制并发数量 #
package main
import (
"fmt"
"sync"
"time"
)
func limitedWorker(id int, semaphore chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
// 获取信号量
semaphore <- struct{}{}
defer func() { <-semaphore }() // 释放信号量
fmt.Printf("Worker %d: Starting (limited concurrency)\n", id)
time.Sleep(2 * time.Second) // 模拟工作
fmt.Printf("Worker %d: Completed\n", id)
}
func main() {
const maxConcurrency = 3
semaphore := make(chan struct{}, maxConcurrency)
var wg sync.WaitGroup
// 启动10个工作者,但最多只有3个并发执行
for i := 1; i <= 10; i++ {
wg.Add(1)
go limitedWorker(i, semaphore, &wg)
}
wg.Wait()
fmt.Println("All limited workers completed")
}
3. 超时控制 #
package main
import (
"fmt"
"time"
)
func slowOperation(result chan string) {
// 模拟慢操作
time.Sleep(3 * time.Second)
result <- "Operation completed"
}
func main() {
result := make(chan string)
go slowOperation(result)
select {
case res := <-result:
fmt.Printf("Success: %s\n", res)
case <-time.After(2 * time.Second):
fmt.Println("Timeout: Operation took too long")
}
}
Channel 的内存模型 #
理解 Channel 的内存模型对于正确使用 Channel 非常重要:
Happens-Before 关系 #
package main
import (
"fmt"
"time"
)
var sharedData int
func sender(ch chan bool) {
sharedData = 42 // 写操作1
ch <- true // 发送操作
}
func receiver(ch chan bool) {
<-ch // 接收操作
fmt.Printf("Shared data: %d\n", sharedData) // 读操作2
}
func main() {
ch := make(chan bool)
go sender(ch)
go receiver(ch)
time.Sleep(100 * time.Millisecond)
}
在这个例子中,由于 Channel 的 happens-before 保证:
- 写操作 1 happens-before 发送操作
- 发送操作 happens-before 接收操作
- 接收操作 happens-before 读操作 2
因此,读操作 2 一定能看到写操作 1 的结果。
常见错误和最佳实践 #
1. 避免在同一个 Goroutine 中发送和接收 #
// 错误示例:会导致死锁
func badExample() {
ch := make(chan int)
ch <- 1 // 这里会永远阻塞
<-ch
}
// 正确示例:使用不同的 Goroutine
func goodExample() {
ch := make(chan int)
go func() {
ch <- 1
}()
value := <-ch
fmt.Printf("Received: %d\n", value)
}
2. 正确处理 Channel 关闭 #
package main
import "fmt"
func main() {
ch := make(chan int, 3)
// 发送一些数据
ch <- 1
ch <- 2
ch <- 3
close(ch)
// 正确的接收方式
for {
value, ok := <-ch
if !ok {
fmt.Println("Channel closed")
break
}
fmt.Printf("Received: %d\n", value)
}
// 或者使用 range
ch2 := make(chan int, 2)
ch2 <- 10
ch2 <- 20
close(ch2)
for value := range ch2 {
fmt.Printf("Range received: %d\n", value)
}
}
3. 避免关闭已关闭的 Channel #
package main
import (
"fmt"
"sync"
)
func safeClose(ch chan int, once *sync.Once) {
once.Do(func() {
close(ch)
fmt.Println("Channel closed safely")
})
}
func main() {
ch := make(chan int)
var once sync.Once
// 多个 Goroutine 尝试关闭同一个 Channel
go safeClose(ch, &once)
go safeClose(ch, &once)
go safeClose(ch, &once)
// 等待一下
<-ch // 这会阻塞,因为 Channel 已关闭且为空
}
小结 #
在本节中,我们学习了:
- Channel 基础:声明、创建和基本操作
- 无缓冲 Channel:同步通信的特点和使用场景
- 单向 Channel:限制 Channel 的方向性
- 协作模式:请求-响应、流水线等常见模式
- 实用技巧:信号通知、并发限制、超时控制
- 内存模型:happens-before 关系的保证
- 最佳实践:避免常见错误和陷阱
Channel 是 Go 语言并发编程的核心,掌握其基本使用方法是编写高效并发程序的基础。在下一节中,我们将学习有缓冲 Channel 和 Channel 的关闭机制。
练习题 #
- 编写一个程序,使用 Channel 实现生产者-消费者模式,生产者生成 1-100 的数字,消费者计算这些数字的平方
- 实现一个简单的工作池,限制同时运行的 Goroutine 数量不超过 5 个
- 使用 Channel 实现一个简单的发布-订阅系统,支持多个订阅者接收消息