2.2.1 Channel 基础与使用

2.2.1 Channel 基础与使用 #

Channel 概述 #

Channel 是 Go 语言中用于 Goroutine 间通信的核心机制。它就像一个管道,允许一个 Goroutine 向另一个 Goroutine 发送数据。Channel 的设计遵循了 CSP(Communicating Sequential Processes)理论,提供了一种安全、优雅的并发编程方式。

Channel 的基本概念 #

Channel 可以被看作是一个有类型的消息队列,具有以下特点:

  • 类型安全:每个 Channel 只能传输特定类型的数据
  • 同步机制:Channel 操作提供了天然的同步点
  • 方向性:Channel 可以是双向的,也可以限制为单向
  • 阻塞语义:在特定条件下,发送和接收操作会阻塞

Channel 的声明和创建 #

声明 Channel #

Channel 的声明语法如下:

var ch chan int        // 声明一个传输 int 类型的 Channel
var ch chan string     // 声明一个传输 string 类型的 Channel
var ch chan []byte     // 声明一个传输 []byte 类型的 Channel

创建 Channel #

使用 make 函数创建 Channel:

package main

import "fmt"

func main() {
    // 创建无缓冲 Channel
    ch1 := make(chan int)
    ch2 := make(chan string)

    // 创建有缓冲 Channel
    ch3 := make(chan int, 10)    // 缓冲区大小为 10
    ch4 := make(chan string, 5)  // 缓冲区大小为 5

    fmt.Printf("ch1 type: %T\n", ch1)
    fmt.Printf("ch2 type: %T\n", ch2)
    fmt.Printf("ch3 type: %T\n", ch3)
    fmt.Printf("ch4 type: %T\n", ch4)
}

Channel 的零值 #

Channel 的零值是 nil,nil Channel 有特殊的行为:

package main

import "fmt"

func main() {
    var ch chan int
    fmt.Printf("ch == nil: %v\n", ch == nil)

    // 对 nil Channel 的操作会永远阻塞
    // ch <- 1    // 这会导致死锁
    // <-ch       // 这也会导致死锁

    // 必须使用 make 初始化
    ch = make(chan int)
    fmt.Printf("After make, ch == nil: %v\n", ch == nil)
}

Channel 的基本操作 #

发送操作 #

使用 <- 操作符向 Channel 发送数据:

ch <- value  // 向 Channel ch 发送 value

接收操作 #

从 Channel 接收数据也使用 <- 操作符:

value := <-ch           // 从 Channel ch 接收数据并赋值给 value
value, ok := <-ch       // 接收数据,ok 表示 Channel 是否已关闭

基本使用示例 #

package main

import (
    "fmt"
    "time"
)

func sender(ch chan string) {
    messages := []string{"Hello", "World", "From", "Goroutine"}

    for _, msg := range messages {
        fmt.Printf("Sending: %s\n", msg)
        ch <- msg  // 发送消息
        time.Sleep(500 * time.Millisecond)
    }

    close(ch)  // 关闭 Channel
}

func receiver(ch chan string) {
    for {
        msg, ok := <-ch  // 接收消息
        if !ok {
            fmt.Println("Channel closed, receiver stopping")
            break
        }
        fmt.Printf("Received: %s\n", msg)
    }
}

func main() {
    ch := make(chan string)

    go sender(ch)
    go receiver(ch)

    // 等待足够的时间让 Goroutine 完成
    time.Sleep(3 * time.Second)
}

无缓冲 Channel #

无缓冲 Channel(也称为同步 Channel)是最基本的 Channel 类型,它具有以下特点:

  • 同步通信:发送和接收操作必须同时发生
  • 阻塞语义:发送操作会阻塞直到有接收者,接收操作会阻塞直到有发送者
  • 零容量:不能存储任何数据

无缓冲 Channel 示例 #

package main

import (
    "fmt"
    "time"
)

func worker(id int, ch chan int) {
    for {
        task, ok := <-ch
        if !ok {
            fmt.Printf("Worker %d: Channel closed\n", id)
            return
        }

        fmt.Printf("Worker %d: Processing task %d\n", id, task)
        time.Sleep(time.Second) // 模拟工作
        fmt.Printf("Worker %d: Task %d completed\n", id, task)
    }
}

func main() {
    ch := make(chan int) // 无缓冲 Channel

    // 启动两个工作者
    go worker(1, ch)
    go worker(2, ch)

    // 发送任务
    for i := 1; i <= 5; i++ {
        fmt.Printf("Sending task %d\n", i)
        ch <- i  // 这里会阻塞直到有工作者接收
        fmt.Printf("Task %d sent\n", i)
    }

    close(ch)
    time.Sleep(3 * time.Second) // 等待工作者完成
}

Channel 的方向性 #

Channel 可以被限制为只读或只写,这在函数参数中特别有用:

单向 Channel 声明 #

var sendOnly chan<- int    // 只写 Channel
var recvOnly <-chan int    // 只读 Channel

单向 Channel 示例 #

package main

import (
    "fmt"
    "time"
)

// 只能向 Channel 发送数据
func producer(ch chan<- string) {
    messages := []string{"Apple", "Banana", "Cherry", "Date"}

    for _, msg := range messages {
        fmt.Printf("Producing: %s\n", msg)
        ch <- msg
        time.Sleep(500 * time.Millisecond)
    }

    close(ch)
}

// 只能从 Channel 接收数据
func consumer(ch <-chan string) {
    for msg := range ch {
        fmt.Printf("Consuming: %s\n", msg)
        time.Sleep(300 * time.Millisecond)
    }
    fmt.Println("Consumer finished")
}

func main() {
    ch := make(chan string, 2) // 有缓冲 Channel

    go producer(ch)
    go consumer(ch)

    time.Sleep(4 * time.Second)
}

Channel 与 Goroutine 的协作模式 #

1. 简单的请求-响应模式 #

package main

import (
    "fmt"
    "math/rand"
    "time"
)

type Request struct {
    ID       int
    Data     string
    Response chan string
}

func server(requests chan Request) {
    for req := range requests {
        // 模拟处理时间
        processingTime := time.Duration(rand.Intn(1000)) * time.Millisecond
        time.Sleep(processingTime)

        // 发送响应
        response := fmt.Sprintf("Processed request %d: %s", req.ID, req.Data)
        req.Response <- response
        close(req.Response)
    }
}

func client(id int, requests chan Request) {
    req := Request{
        ID:       id,
        Data:     fmt.Sprintf("Data from client %d", id),
        Response: make(chan string),
    }

    fmt.Printf("Client %d: Sending request\n", id)
    requests <- req

    // 等待响应
    response := <-req.Response
    fmt.Printf("Client %d: Received response: %s\n", id, response)
}

func main() {
    requests := make(chan Request, 10)

    // 启动服务器
    go server(requests)

    // 启动多个客户端
    for i := 1; i <= 5; i++ {
        go client(i, requests)
    }

    time.Sleep(3 * time.Second)
    close(requests)
}

2. 数据流水线模式 #

package main

import (
    "fmt"
    "time"
)

// 阶段1:生成数据
func generator(out chan<- int) {
    defer close(out)
    for i := 1; i <= 10; i++ {
        fmt.Printf("Generating: %d\n", i)
        out <- i
        time.Sleep(100 * time.Millisecond)
    }
}

// 阶段2:处理数据
func processor(in <-chan int, out chan<- int) {
    defer close(out)
    for num := range in {
        processed := num * num
        fmt.Printf("Processing: %d -> %d\n", num, processed)
        out <- processed
        time.Sleep(150 * time.Millisecond)
    }
}

// 阶段3:输出数据
func printer(in <-chan int) {
    for result := range in {
        fmt.Printf("Final result: %d\n", result)
    }
}

func main() {
    // 创建管道
    numbers := make(chan int)
    processed := make(chan int)

    // 启动流水线各阶段
    go generator(numbers)
    go processor(numbers, processed)
    go printer(processed)

    // 等待完成
    time.Sleep(5 * time.Second)
}

Channel 的常见用法模式 #

1. 信号通知 #

package main

import (
    "fmt"
    "time"
)

func worker(id int, done chan bool) {
    fmt.Printf("Worker %d: Starting work\n", id)

    // 模拟工作
    time.Sleep(time.Duration(id) * time.Second)

    fmt.Printf("Worker %d: Work completed\n", id)
    done <- true // 发送完成信号
}

func main() {
    done := make(chan bool)

    // 启动多个工作者
    for i := 1; i <= 3; i++ {
        go worker(i, done)
    }

    // 等待所有工作者完成
    for i := 1; i <= 3; i++ {
        <-done // 接收完成信号
        fmt.Printf("Received completion signal %d\n", i)
    }

    fmt.Println("All workers completed")
}

2. 限制并发数量 #

package main

import (
    "fmt"
    "sync"
    "time"
)

func limitedWorker(id int, semaphore chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()

    // 获取信号量
    semaphore <- struct{}{}
    defer func() { <-semaphore }() // 释放信号量

    fmt.Printf("Worker %d: Starting (limited concurrency)\n", id)
    time.Sleep(2 * time.Second) // 模拟工作
    fmt.Printf("Worker %d: Completed\n", id)
}

func main() {
    const maxConcurrency = 3
    semaphore := make(chan struct{}, maxConcurrency)

    var wg sync.WaitGroup

    // 启动10个工作者,但最多只有3个并发执行
    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go limitedWorker(i, semaphore, &wg)
    }

    wg.Wait()
    fmt.Println("All limited workers completed")
}

3. 超时控制 #

package main

import (
    "fmt"
    "time"
)

func slowOperation(result chan string) {
    // 模拟慢操作
    time.Sleep(3 * time.Second)
    result <- "Operation completed"
}

func main() {
    result := make(chan string)

    go slowOperation(result)

    select {
    case res := <-result:
        fmt.Printf("Success: %s\n", res)
    case <-time.After(2 * time.Second):
        fmt.Println("Timeout: Operation took too long")
    }
}

Channel 的内存模型 #

理解 Channel 的内存模型对于正确使用 Channel 非常重要:

Happens-Before 关系 #

package main

import (
    "fmt"
    "time"
)

var sharedData int

func sender(ch chan bool) {
    sharedData = 42  // 写操作1
    ch <- true       // 发送操作
}

func receiver(ch chan bool) {
    <-ch                                    // 接收操作
    fmt.Printf("Shared data: %d\n", sharedData) // 读操作2
}

func main() {
    ch := make(chan bool)

    go sender(ch)
    go receiver(ch)

    time.Sleep(100 * time.Millisecond)
}

在这个例子中,由于 Channel 的 happens-before 保证:

  • 写操作 1 happens-before 发送操作
  • 发送操作 happens-before 接收操作
  • 接收操作 happens-before 读操作 2

因此,读操作 2 一定能看到写操作 1 的结果。

常见错误和最佳实践 #

1. 避免在同一个 Goroutine 中发送和接收 #

// 错误示例:会导致死锁
func badExample() {
    ch := make(chan int)
    ch <- 1  // 这里会永远阻塞
    <-ch
}

// 正确示例:使用不同的 Goroutine
func goodExample() {
    ch := make(chan int)

    go func() {
        ch <- 1
    }()

    value := <-ch
    fmt.Printf("Received: %d\n", value)
}

2. 正确处理 Channel 关闭 #

package main

import "fmt"

func main() {
    ch := make(chan int, 3)

    // 发送一些数据
    ch <- 1
    ch <- 2
    ch <- 3
    close(ch)

    // 正确的接收方式
    for {
        value, ok := <-ch
        if !ok {
            fmt.Println("Channel closed")
            break
        }
        fmt.Printf("Received: %d\n", value)
    }

    // 或者使用 range
    ch2 := make(chan int, 2)
    ch2 <- 10
    ch2 <- 20
    close(ch2)

    for value := range ch2 {
        fmt.Printf("Range received: %d\n", value)
    }
}

3. 避免关闭已关闭的 Channel #

package main

import (
    "fmt"
    "sync"
)

func safeClose(ch chan int, once *sync.Once) {
    once.Do(func() {
        close(ch)
        fmt.Println("Channel closed safely")
    })
}

func main() {
    ch := make(chan int)
    var once sync.Once

    // 多个 Goroutine 尝试关闭同一个 Channel
    go safeClose(ch, &once)
    go safeClose(ch, &once)
    go safeClose(ch, &once)

    // 等待一下
    <-ch // 这会阻塞,因为 Channel 已关闭且为空
}

小结 #

在本节中,我们学习了:

  1. Channel 基础:声明、创建和基本操作
  2. 无缓冲 Channel:同步通信的特点和使用场景
  3. 单向 Channel:限制 Channel 的方向性
  4. 协作模式:请求-响应、流水线等常见模式
  5. 实用技巧:信号通知、并发限制、超时控制
  6. 内存模型:happens-before 关系的保证
  7. 最佳实践:避免常见错误和陷阱

Channel 是 Go 语言并发编程的核心,掌握其基本使用方法是编写高效并发程序的基础。在下一节中,我们将学习有缓冲 Channel 和 Channel 的关闭机制。

练习题 #

  1. 编写一个程序,使用 Channel 实现生产者-消费者模式,生产者生成 1-100 的数字,消费者计算这些数字的平方
  2. 实现一个简单的工作池,限制同时运行的 Goroutine 数量不超过 5 个
  3. 使用 Channel 实现一个简单的发布-订阅系统,支持多个订阅者接收消息