2.3.4 Cond 条件变量

2.3.4 Cond 条件变量 #

Cond 概述 #

Cond(条件变量)是 Go 语言中用于实现复杂等待和通知机制的同步原语。它允许 Goroutine 在某个条件满足之前进入等待状态,当条件满足时,其他 Goroutine 可以通知等待的 Goroutine 继续执行。

Cond 的特点 #

  • 条件等待:Goroutine 可以等待特定条件满足
  • 通知机制:支持单个通知和广播通知
  • 与锁结合:必须与 Mutex 或 RWMutex 结合使用
  • 避免虚假唤醒:需要在循环中检查条件

基本方法 #

  • Wait():释放锁并等待通知,被唤醒时重新获取锁
  • Signal():唤醒一个等待的 Goroutine
  • Broadcast():唤醒所有等待的 Goroutine

Cond 的基本使用 #

基本语法 #

package main

import (
    "fmt"
    "sync"
    "time"
)

type Queue struct {
    mu    sync.Mutex
    cond  *sync.Cond
    items []string
}

func NewQueue() *Queue {
    q := &Queue{
        items: make([]string, 0),
    }
    q.cond = sync.NewCond(&q.mu)
    return q
}

func (q *Queue) Put(item string) {
    q.mu.Lock()
    defer q.mu.Unlock()

    q.items = append(q.items, item)
    fmt.Printf("Put item: %s (queue size: %d)\n", item, len(q.items))

    // 通知一个等待的消费者
    q.cond.Signal()
}

func (q *Queue) Get() string {
    q.mu.Lock()
    defer q.mu.Unlock()

    // 等待直到队列不为空
    for len(q.items) == 0 {
        fmt.Println("Queue is empty, waiting...")
        q.cond.Wait() // 释放锁并等待通知
    }

    // 取出第一个元素
    item := q.items[0]
    q.items = q.items[1:]
    fmt.Printf("Got item: %s (queue size: %d)\n", item, len(q.items))

    return item
}

func (q *Queue) Size() int {
    q.mu.Lock()
    defer q.mu.Unlock()
    return len(q.items)
}

func basicCondExample() {
    queue := NewQueue()
    var wg sync.WaitGroup

    // 启动消费者
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            for j := 0; j < 3; j++ {
                item := queue.Get()
                fmt.Printf("Consumer %d got: %s\n", id, item)
                time.Sleep(100 * time.Millisecond)
            }
        }(i)
    }

    // 启动生产者
    wg.Add(1)
    go func() {
        defer wg.Done()

        for i := 1; i <= 9; i++ {
            item := fmt.Sprintf("item-%d", i)
            queue.Put(item)
            time.Sleep(200 * time.Millisecond)
        }
    }()

    wg.Wait()
    fmt.Printf("Final queue size: %d\n", queue.Size())
}

func main() {
    basicCondExample()
}

广播通知 #

package main

import (
    "fmt"
    "sync"
    "time"
)

type Barrier struct {
    mu       sync.Mutex
    cond     *sync.Cond
    count    int
    waiting  int
    capacity int
}

func NewBarrier(capacity int) *Barrier {
    b := &Barrier{
        capacity: capacity,
    }
    b.cond = sync.NewCond(&b.mu)
    return b
}

func (b *Barrier) Wait() {
    b.mu.Lock()
    defer b.mu.Unlock()

    b.waiting++
    fmt.Printf("Goroutine waiting at barrier (%d/%d)\n", b.waiting, b.capacity)

    if b.waiting == b.capacity {
        // 所有 Goroutine 都到达了屏障
        fmt.Println("All goroutines reached barrier, broadcasting...")
        b.count++
        b.waiting = 0
        b.cond.Broadcast() // 唤醒所有等待的 Goroutine
    } else {
        // 等待其他 Goroutine
        for b.waiting != 0 {
            b.cond.Wait()
        }
    }

    fmt.Printf("Goroutine passed barrier (round %d)\n", b.count)
}

func broadcastExample() {
    const numGoroutines = 5
    barrier := NewBarrier(numGoroutines)
    var wg sync.WaitGroup

    for i := 1; i <= numGoroutines; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            // 模拟不同的工作时间
            workTime := time.Duration(id) * 200 * time.Millisecond
            fmt.Printf("Goroutine %d working for %v\n", id, workTime)
            time.Sleep(workTime)

            fmt.Printf("Goroutine %d finished work, waiting at barrier\n", id)
            barrier.Wait()

            fmt.Printf("Goroutine %d continuing after barrier\n", id)
        }(i)
    }

    wg.Wait()
    fmt.Println("All goroutines completed")
}

func main() {
    broadcastExample()
}

Cond 的高级用法 #

1. 读写者问题 #

package main

import (
    "fmt"
    "sync"
    "time"
)

type ReadWriteResource struct {
    mu      sync.Mutex
    cond    *sync.Cond
    readers int
    writers int
    writing bool
    data    string
}

func NewReadWriteResource() *ReadWriteResource {
    rw := &ReadWriteResource{
        data: "initial data",
    }
    rw.cond = sync.NewCond(&rw.mu)
    return rw
}

func (rw *ReadWriteResource) StartRead() {
    rw.mu.Lock()
    defer rw.mu.Unlock()

    // 等待直到没有写者在写入
    for rw.writing {
        fmt.Println("Reader waiting (writer is active)")
        rw.cond.Wait()
    }

    rw.readers++
    fmt.Printf("Reader started (active readers: %d)\n", rw.readers)
}

func (rw *ReadWriteResource) EndRead() {
    rw.mu.Lock()
    defer rw.mu.Unlock()

    rw.readers--
    fmt.Printf("Reader finished (active readers: %d)\n", rw.readers)

    // 如果没有读者了,通知等待的写者
    if rw.readers == 0 {
        rw.cond.Broadcast()
    }
}

func (rw *ReadWriteResource) Read() string {
    rw.StartRead()
    defer rw.EndRead()

    // 模拟读取操作
    time.Sleep(100 * time.Millisecond)
    return rw.data
}

func (rw *ReadWriteResource) StartWrite() {
    rw.mu.Lock()
    defer rw.mu.Unlock()

    rw.writers++

    // 等待直到没有读者和写者
    for rw.readers > 0 || rw.writing {
        fmt.Printf("Writer waiting (readers: %d, writing: %v)\n", rw.readers, rw.writing)
        rw.cond.Wait()
    }

    rw.writing = true
    fmt.Println("Writer started")
}

func (rw *ReadWriteResource) EndWrite() {
    rw.mu.Lock()
    defer rw.mu.Unlock()

    rw.writing = false
    rw.writers--
    fmt.Println("Writer finished")

    // 通知所有等待的读者和写者
    rw.cond.Broadcast()
}

func (rw *ReadWriteResource) Write(data string) {
    rw.StartWrite()
    defer rw.EndWrite()

    // 模拟写入操作
    time.Sleep(200 * time.Millisecond)
    rw.data = data
    fmt.Printf("Data written: %s\n", data)
}

func readWriteExample() {
    resource := NewReadWriteResource()
    var wg sync.WaitGroup

    // 启动多个读者
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            for j := 0; j < 3; j++ {
                data := resource.Read()
                fmt.Printf("Reader %d read: %s\n", id, data)
                time.Sleep(50 * time.Millisecond)
            }
        }(i)
    }

    // 启动写者
    for i := 1; i <= 2; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            for j := 0; j < 2; j++ {
                data := fmt.Sprintf("data-from-writer-%d-%d", id, j)
                resource.Write(data)
                time.Sleep(300 * time.Millisecond)
            }
        }(i)
    }

    wg.Wait()
}

func main() {
    readWriteExample()
}

2. 生产者-消费者模式(有界缓冲区) #

package main

import (
    "fmt"
    "sync"
    "time"
)

type BoundedBuffer struct {
    mu       sync.Mutex
    notFull  *sync.Cond
    notEmpty *sync.Cond
    buffer   []interface{}
    capacity int
    size     int
    in       int
    out      int
}

func NewBoundedBuffer(capacity int) *BoundedBuffer {
    bb := &BoundedBuffer{
        buffer:   make([]interface{}, capacity),
        capacity: capacity,
    }
    bb.notFull = sync.NewCond(&bb.mu)
    bb.notEmpty = sync.NewCond(&bb.mu)
    return bb
}

func (bb *BoundedBuffer) Put(item interface{}) {
    bb.mu.Lock()
    defer bb.mu.Unlock()

    // 等待直到缓冲区不满
    for bb.size == bb.capacity {
        fmt.Printf("Buffer full, producer waiting... (size: %d/%d)\n", bb.size, bb.capacity)
        bb.notFull.Wait()
    }

    // 放入元素
    bb.buffer[bb.in] = item
    bb.in = (bb.in + 1) % bb.capacity
    bb.size++

    fmt.Printf("Produced: %v (size: %d/%d)\n", item, bb.size, bb.capacity)

    // 通知消费者
    bb.notEmpty.Signal()
}

func (bb *BoundedBuffer) Get() interface{} {
    bb.mu.Lock()
    defer bb.mu.Unlock()

    // 等待直到缓冲区不空
    for bb.size == 0 {
        fmt.Printf("Buffer empty, consumer waiting... (size: %d/%d)\n", bb.size, bb.capacity)
        bb.notEmpty.Wait()
    }

    // 取出元素
    item := bb.buffer[bb.out]
    bb.buffer[bb.out] = nil // 清空引用
    bb.out = (bb.out + 1) % bb.capacity
    bb.size--

    fmt.Printf("Consumed: %v (size: %d/%d)\n", item, bb.size, bb.capacity)

    // 通知生产者
    bb.notFull.Signal()

    return item
}

func (bb *BoundedBuffer) Size() int {
    bb.mu.Lock()
    defer bb.mu.Unlock()
    return bb.size
}

func boundedBufferExample() {
    buffer := NewBoundedBuffer(3)
    var wg sync.WaitGroup

    // 启动生产者
    for i := 1; i <= 2; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            for j := 1; j <= 5; j++ {
                item := fmt.Sprintf("P%d-Item%d", id, j)
                buffer.Put(item)
                time.Sleep(200 * time.Millisecond)
            }
            fmt.Printf("Producer %d finished\n", id)
        }(i)
    }

    // 启动消费者
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            for j := 1; j <= 3; j++ {
                item := buffer.Get()
                fmt.Printf("Consumer %d got: %v\n", id, item)
                time.Sleep(300 * time.Millisecond)
            }
            fmt.Printf("Consumer %d finished\n", id)
        }(i)
    }

    wg.Wait()
    fmt.Printf("Final buffer size: %d\n", buffer.Size())
}

func main() {
    boundedBufferExample()
}

3. 工作池模式 #

package main

import (
    "fmt"
    "sync"
    "time"
)

type Task struct {
    ID   int
    Data string
}

type WorkerPool struct {
    mu          sync.Mutex
    cond        *sync.Cond
    tasks       []Task
    workers     int
    activeWorkers int
    shutdown    bool
}

func NewWorkerPool(numWorkers int) *WorkerPool {
    wp := &WorkerPool{
        workers: numWorkers,
    }
    wp.cond = sync.NewCond(&wp.mu)

    // 启动工作者
    for i := 0; i < numWorkers; i++ {
        go wp.worker(i + 1)
    }

    return wp
}

func (wp *WorkerPool) worker(id int) {
    fmt.Printf("Worker %d started\n", id)

    for {
        wp.mu.Lock()

        // 等待任务或关闭信号
        for len(wp.tasks) == 0 && !wp.shutdown {
            fmt.Printf("Worker %d waiting for tasks\n", id)
            wp.cond.Wait()
        }

        // 检查是否需要关闭
        if wp.shutdown && len(wp.tasks) == 0 {
            wp.mu.Unlock()
            fmt.Printf("Worker %d shutting down\n", id)
            return
        }

        // 获取任务
        task := wp.tasks[0]
        wp.tasks = wp.tasks[1:]
        wp.activeWorkers++

        wp.mu.Unlock()

        // 执行任务
        fmt.Printf("Worker %d processing task %d: %s\n", id, task.ID, task.Data)
        time.Sleep(time.Duration(task.ID%3+1) * 200 * time.Millisecond)
        fmt.Printf("Worker %d completed task %d\n", id, task.ID)

        wp.mu.Lock()
        wp.activeWorkers--
        // 如果没有活跃的工作者且没有任务,通知可能在等待的关闭操作
        if wp.activeWorkers == 0 && len(wp.tasks) == 0 {
            wp.cond.Broadcast()
        }
        wp.mu.Unlock()
    }
}

func (wp *WorkerPool) Submit(task Task) {
    wp.mu.Lock()
    defer wp.mu.Unlock()

    if wp.shutdown {
        fmt.Printf("Cannot submit task %d: pool is shutdown\n", task.ID)
        return
    }

    wp.tasks = append(wp.tasks, task)
    fmt.Printf("Task %d submitted (queue size: %d)\n", task.ID, len(wp.tasks))

    // 通知一个等待的工作者
    wp.cond.Signal()
}

func (wp *WorkerPool) Shutdown() {
    wp.mu.Lock()
    defer wp.mu.Unlock()

    fmt.Println("Shutting down worker pool...")
    wp.shutdown = true

    // 唤醒所有等待的工作者
    wp.cond.Broadcast()
}

func (wp *WorkerPool) WaitForCompletion() {
    wp.mu.Lock()
    defer wp.mu.Unlock()

    // 等待所有任务完成
    for len(wp.tasks) > 0 || wp.activeWorkers > 0 {
        fmt.Printf("Waiting for completion (tasks: %d, active workers: %d)\n",
            len(wp.tasks), wp.activeWorkers)
        wp.cond.Wait()
    }

    fmt.Println("All tasks completed")
}

func workerPoolExample() {
    pool := NewWorkerPool(3)

    // 提交任务
    go func() {
        for i := 1; i <= 10; i++ {
            task := Task{
                ID:   i,
                Data: fmt.Sprintf("Task data %d", i),
            }
            pool.Submit(task)
            time.Sleep(100 * time.Millisecond)
        }
    }()

    // 等待一段时间后关闭
    time.Sleep(2 * time.Second)

    // 等待所有任务完成
    pool.WaitForCompletion()

    // 关闭工作池
    pool.Shutdown()

    time.Sleep(500 * time.Millisecond)
    fmt.Println("Worker pool example completed")
}

func main() {
    workerPoolExample()
}

4. 事件通知系统 #

package main

import (
    "fmt"
    "sync"
    "time"
)

type Event struct {
    Type string
    Data interface{}
    Time time.Time
}

type EventBus struct {
    mu        sync.Mutex
    cond      *sync.Cond
    events    []Event
    listeners map[string][]*Listener
}

type Listener struct {
    ID       string
    EventType string
    Channel  chan Event
}

func NewEventBus() *EventBus {
    eb := &EventBus{
        listeners: make(map[string][]*Listener),
    }
    eb.cond = sync.NewCond(&eb.mu)
    return eb
}

func (eb *EventBus) Subscribe(listenerID, eventType string) *Listener {
    eb.mu.Lock()
    defer eb.mu.Unlock()

    listener := &Listener{
        ID:        listenerID,
        EventType: eventType,
        Channel:   make(chan Event, 10),
    }

    eb.listeners[eventType] = append(eb.listeners[eventType], listener)
    fmt.Printf("Listener %s subscribed to %s events\n", listenerID, eventType)

    return listener
}

func (eb *EventBus) Publish(eventType string, data interface{}) {
    eb.mu.Lock()
    defer eb.mu.Unlock()

    event := Event{
        Type: eventType,
        Data: data,
        Time: time.Now(),
    }

    eb.events = append(eb.events, event)
    fmt.Printf("Published %s event: %v\n", eventType, data)

    // 通知相关的监听者
    if listeners, exists := eb.listeners[eventType]; exists {
        for _, listener := range listeners {
            select {
            case listener.Channel <- event:
                fmt.Printf("Event sent to listener %s\n", listener.ID)
            default:
                fmt.Printf("Listener %s channel full, dropping event\n", listener.ID)
            }
        }
    }

    // 通知等待特定事件的 Goroutine
    eb.cond.Broadcast()
}

func (eb *EventBus) WaitForEvent(eventType string, timeout time.Duration) (*Event, bool) {
    eb.mu.Lock()
    defer eb.mu.Unlock()

    deadline := time.Now().Add(timeout)

    for {
        // 检查是否有匹配的事件
        for i := len(eb.events) - 1; i >= 0; i-- {
            if eb.events[i].Type == eventType {
                event := eb.events[i]
                return &event, true
            }
        }

        // 检查超时
        if time.Now().After(deadline) {
            return nil, false
        }

        // 等待新事件
        eb.cond.Wait()
    }
}

func eventBusExample() {
    bus := NewEventBus()
    var wg sync.WaitGroup

    // 启动监听者
    listener1 := bus.Subscribe("listener1", "user_action")
    listener2 := bus.Subscribe("listener2", "system_event")
    listener3 := bus.Subscribe("listener3", "user_action")

    // 监听者处理事件
    wg.Add(3)
    go func() {
        defer wg.Done()
        for i := 0; i < 3; i++ {
            event := <-listener1.Channel
            fmt.Printf("Listener1 processed: %s - %v\n", event.Type, event.Data)
        }
    }()

    go func() {
        defer wg.Done()
        for i := 0; i < 2; i++ {
            event := <-listener2.Channel
            fmt.Printf("Listener2 processed: %s - %v\n", event.Type, event.Data)
        }
    }()

    go func() {
        defer wg.Done()
        for i := 0; i < 3; i++ {
            event := <-listener3.Channel
            fmt.Printf("Listener3 processed: %s - %v\n", event.Type, event.Data)
        }
    }()

    // 等待特定事件的 Goroutine
    wg.Add(1)
    go func() {
        defer wg.Done()

        fmt.Println("Waiting for system_event...")
        if event, ok := bus.WaitForEvent("system_event", 3*time.Second); ok {
            fmt.Printf("Got awaited system_event: %v\n", event.Data)
        } else {
            fmt.Println("Timeout waiting for system_event")
        }
    }()

    // 发布事件
    time.Sleep(500 * time.Millisecond)
    bus.Publish("user_action", "user clicked button")

    time.Sleep(500 * time.Millisecond)
    bus.Publish("system_event", "system started")

    time.Sleep(500 * time.Millisecond)
    bus.Publish("user_action", "user logged in")

    time.Sleep(500 * time.Millisecond)
    bus.Publish("user_action", "user logged out")

    time.Sleep(500 * time.Millisecond)
    bus.Publish("system_event", "system shutdown")

    wg.Wait()
}

func main() {
    eventBusExample()
}

常见陷阱和最佳实践 #

1. 避免虚假唤醒 #

package main

import (
    "fmt"
    "sync"
    "time"
)

type SafeCounter struct {
    mu    sync.Mutex
    cond  *sync.Cond
    value int
    target int
}

func NewSafeCounter(target int) *SafeCounter {
    sc := &SafeCounter{
        target: target,
    }
    sc.cond = sync.NewCond(&sc.mu)
    return sc
}

// 错误示例:不在循环中检查条件
func (sc *SafeCounter) BadWaitForTarget() {
    sc.mu.Lock()
    defer sc.mu.Unlock()

    if sc.value < sc.target {
        fmt.Printf("Bad wait: value=%d, target=%d\n", sc.value, sc.target)
        sc.cond.Wait() // 可能会有虚假唤醒
    }

    fmt.Printf("Bad wait completed: value=%d\n", sc.value)
}

// 正确示例:在循环中检查条件
func (sc *SafeCounter) GoodWaitForTarget() {
    sc.mu.Lock()
    defer sc.mu.Unlock()

    for sc.value < sc.target {
        fmt.Printf("Good wait: value=%d, target=%d\n", sc.value, sc.target)
        sc.cond.Wait()
    }

    fmt.Printf("Good wait completed: value=%d\n", sc.value)
}

func (sc *SafeCounter) Increment() {
    sc.mu.Lock()
    defer sc.mu.Unlock()

    sc.value++
    fmt.Printf("Incremented to: %d\n", sc.value)

    if sc.value >= sc.target {
        sc.cond.Broadcast()
    }
}

func spuriousWakeupExample() {
    counter := NewSafeCounter(5)
    var wg sync.WaitGroup

    // 启动等待者
    wg.Add(2)
    go func() {
        defer wg.Done()
        counter.GoodWaitForTarget()
    }()

    go func() {
        defer wg.Done()
        counter.BadWaitForTarget()
    }()

    // 启动递增者
    go func() {
        for i := 0; i < 6; i++ {
            time.Sleep(200 * time.Millisecond)
            counter.Increment()
        }
    }()

    wg.Wait()
}

func main() {
    spuriousWakeupExample()
}

2. 正确的锁管理 #

package main

import (
    "fmt"
    "sync"
    "time"
)

type Resource struct {
    mu        sync.Mutex
    cond      *sync.Cond
    available bool
    users     int
}

func NewResource() *Resource {
    r := &Resource{
        available: true,
    }
    r.cond = sync.NewCond(&r.mu)
    return r
}

// 错误示例:在 Wait() 后忘记重新检查条件
func (r *Resource) BadAcquire() {
    r.mu.Lock()
    defer r.mu.Unlock()

    if !r.available {
        r.cond.Wait()
        // 错误:没有重新检查条件
    }

    r.available = false
    r.users++
    fmt.Printf("Bad acquire: users=%d\n", r.users)
}

// 正确示例:在循环中检查条件
func (r *Resource) GoodAcquire() {
    r.mu.Lock()
    defer r.mu.Unlock()

    for !r.available {
        r.cond.Wait()
    }

    r.available = false
    r.users++
    fmt.Printf("Good acquire: users=%d\n", r.users)
}

func (r *Resource) Release() {
    r.mu.Lock()
    defer r.mu.Unlock()

    r.available = true
    r.users--
    fmt.Printf("Released: users=%d\n", r.users)

    r.cond.Signal()
}

func lockManagementExample() {
    resource := NewResource()
    var wg sync.WaitGroup

    // 启动多个使用者
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            fmt.Printf("User %d trying to acquire resource\n", id)
            resource.GoodAcquire()

            fmt.Printf("User %d using resource\n", id)
            time.Sleep(500 * time.Millisecond)

            resource.Release()
            fmt.Printf("User %d released resource\n", id)
        }(i)
    }

    wg.Wait()
}

func main() {
    lockManagementExample()
}

3. 避免死锁 #

package main

import (
    "fmt"
    "sync"
    "time"
)

type DeadlockDemo struct {
    mu1   sync.Mutex
    mu2   sync.Mutex
    cond1 *sync.Cond
    cond2 *sync.Cond
    flag1 bool
    flag2 bool
}

func NewDeadlockDemo() *DeadlockDemo {
    dd := &DeadlockDemo{}
    dd.cond1 = sync.NewCond(&dd.mu1)
    dd.cond2 = sync.NewCond(&dd.mu2)
    return dd
}

// 错误示例:可能导致死锁
func (dd *DeadlockDemo) BadMethod1() {
    dd.mu1.Lock()
    defer dd.mu1.Unlock()

    fmt.Println("Method1: acquired mu1")

    // 尝试获取 mu2
    dd.mu2.Lock()
    defer dd.mu2.Unlock()

    fmt.Println("Method1: acquired mu2")
    dd.flag1 = true
    dd.cond2.Signal()
}

func (dd *DeadlockDemo) BadMethod2() {
    dd.mu2.Lock()
    defer dd.mu2.Unlock()

    fmt.Println("Method2: acquired mu2")

    // 尝试获取 mu1
    dd.mu1.Lock()
    defer dd.mu1.Unlock()

    fmt.Println("Method2: acquired mu1")
    dd.flag2 = true
    dd.cond1.Signal()
}

// 正确示例:按固定顺序获取锁
func (dd *DeadlockDemo) GoodMethod1() {
    // 总是先获取 mu1,再获取 mu2
    dd.mu1.Lock()
    defer dd.mu1.Unlock()

    dd.mu2.Lock()
    defer dd.mu2.Unlock()

    fmt.Println("GoodMethod1: acquired both locks")
    dd.flag1 = true
    dd.cond2.Signal()
}

func (dd *DeadlockDemo) GoodMethod2() {
    // 总是先获取 mu1,再获取 mu2
    dd.mu1.Lock()
    defer dd.mu1.Unlock()

    dd.mu2.Lock()
    defer dd.mu2.Unlock()

    fmt.Println("GoodMethod2: acquired both locks")
    dd.flag2 = true
    dd.cond1.Signal()
}

func deadlockAvoidanceExample() {
    demo := NewDeadlockDemo()
    var wg sync.WaitGroup

    fmt.Println("=== Good Example (No Deadlock) ===")
    wg.Add(2)

    go func() {
        defer wg.Done()
        demo.GoodMethod1()
    }()

    go func() {
        defer wg.Done()
        demo.GoodMethod2()
    }()

    wg.Wait()
    fmt.Println("Good example completed successfully")
}

func main() {
    deadlockAvoidanceExample()
}

小结 #

在本节中,我们深入学习了:

  1. Cond 基础:条件变量的基本概念和使用方法
  2. 基本操作:Wait、Signal、Broadcast 的使用
  3. 高级应用
    • 读写者问题
    • 生产者-消费者模式(有界缓冲区)
    • 工作池模式
    • 事件通知系统
  4. 最佳实践
    • 在循环中检查条件,避免虚假唤醒
    • 正确的锁管理
    • 避免死锁

Cond 是一个强大但复杂的同步原语,它适用于需要复杂等待和通知机制的场景。虽然在很多情况下 Channel 可能是更好的选择,但 Cond 在某些特定场景下仍然是不可替代的。

通过本章的学习,您已经掌握了 Go 语言中所有主要的同步原语:Mutex、RWMutex、WaitGroup、Once 和 Cond。这些工具为您提供了构建复杂并发系统的基础。

练习题 #

  1. 实现一个支持优先级的任务队列,使用 Cond 实现等待和通知机制
  2. 设计一个资源池,支持不同类型的资源,使用 Cond 管理资源的分配和释放
  3. 创建一个多阶段的数据处理管道,使用 Cond 协调各个阶段之间的同步