2.1.2 Goroutine 调度机制

2.1.2 Goroutine 调度机制 #

Go 调度器概述 #

Go 语言的调度器是其并发编程能力的核心,它负责管理和调度数以万计的 Goroutine。与传统的操作系统线程调度不同,Go 调度器完全运行在用户态,这使得 Goroutine 的创建和切换成本极低。

调度器的设计目标 #

Go 调度器的设计有以下几个主要目标:

  1. 高效性:最小化调度开销,最大化 CPU 利用率
  2. 公平性:确保所有 Goroutine 都有机会被执行
  3. 可扩展性:支持大量 Goroutine 的并发执行
  4. 低延迟:快速响应 Goroutine 的调度需求

GMP 调度模型 #

Go 调度器采用 GMP 模型,这是理解 Go 并发机制的关键:

G(Goroutine) #

G 代表 Goroutine,是 Go 语言中的轻量级线程。每个 G 包含:

  • 栈信息:Goroutine 的执行栈
  • 程序计数器:当前执行位置
  • 状态信息:运行状态(运行中、就绪、阻塞等)
  • 关联的 M:当前运行在哪个 M 上
// Goroutine 的简化结构(实际结构更复杂)
type g struct {
    stack       stack     // 栈信息
    stackguard0 uintptr   // 栈保护
    m           *m        // 当前关联的 M
    sched       gobuf     // 调度信息
    atomicstatus uint32   // 状态
    goid        int64     // Goroutine ID
}

M(Machine) #

M 代表操作系统线程(Machine),是 Goroutine 运行的载体。每个 M 包含:

  • 关联的 G:当前正在执行的 Goroutine
  • 关联的 P:当前绑定的 P
  • 系统调用信息:处理系统调用时的状态
// M 的简化结构
type m struct {
    g0      *g        // 用于调度的特殊 Goroutine
    curg    *g        // 当前运行的 Goroutine
    p       puintptr  // 关联的 P
    nextp   puintptr  // 下一个要关联的 P
    spinning bool     // 是否在寻找工作
}

P(Processor) #

P 代表逻辑处理器(Processor),是 G 和 M 之间的桥梁。每个 P 包含:

  • 本地运行队列:存储待执行的 Goroutine
  • 关联的 M:当前绑定的 M
  • 调度器状态:各种调度相关的信息
// P 的简化结构
type p struct {
    m           muintptr   // 关联的 M
    runqhead    uint32     // 本地队列头部
    runqtail    uint32     // 本地队列尾部
    runq        [256]guintptr // 本地运行队列
    runnext     guintptr   // 下一个要运行的 G
}

调度器的工作原理 #

初始化过程 #

程序启动时,Go 运行时会进行以下初始化:

package main

import (
    "fmt"
    "runtime"
)

func main() {
    // 获取当前的 P 数量(通常等于 CPU 核心数)
    fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))

    // 获取当前 Goroutine 数量
    fmt.Printf("NumGoroutine: %d\n", runtime.NumGoroutine())

    // 创建一些 Goroutine
    for i := 0; i < 5; i++ {
        go func(id int) {
            fmt.Printf("Goroutine %d running on P\n", id)
        }(i)
    }

    // 让主 Goroutine 等待一下
    runtime.Gosched() // 主动让出 CPU
    fmt.Printf("NumGoroutine after creation: %d\n", runtime.NumGoroutine())
}

调度时机 #

Goroutine 的调度发生在以下时机:

  1. 主动让出:调用 runtime.Gosched()
  2. 系统调用:进行系统调用时
  3. Channel 操作:Channel 发送或接收阻塞时
  4. 网络 I/O:网络操作阻塞时
  5. 垃圾回收:GC 期间的协作点
  6. 函数调用:在函数序言中检查是否需要调度
package main

import (
    "fmt"
    "runtime"
    "time"
)

func cpuBoundTask(id int) {
    fmt.Printf("Task %d started\n", id)

    // CPU 密集型任务
    count := 0
    for i := 0; i < 1000000000; i++ {
        count++
        // 在长时间运行的循环中主动让出 CPU
        if i%100000000 == 0 {
            runtime.Gosched()
        }
    }

    fmt.Printf("Task %d completed with count: %d\n", id, count)
}

func main() {
    // 设置使用单个 P,更容易观察调度行为
    runtime.GOMAXPROCS(1)

    // 启动多个 CPU 密集型任务
    for i := 0; i < 3; i++ {
        go cpuBoundTask(i)
    }

    time.Sleep(2 * time.Second)
}

工作窃取算法 #

当一个 P 的本地队列为空时,它会尝试从其他 P 的队列中"窃取"工作:

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()

    // 获取当前 Goroutine 运行的 P 的 ID
    // 注意:这个函数在实际的 Go 版本中可能不存在,这里仅作演示
    fmt.Printf("Worker %d starting\n", id)

    // 模拟不同长度的工作
    workTime := time.Duration(id*100) * time.Millisecond
    time.Sleep(workTime)

    fmt.Printf("Worker %d completed after %v\n", id, workTime)
}

func demonstrateWorkStealing() {
    // 设置多个 P
    runtime.GOMAXPROCS(4)

    var wg sync.WaitGroup

    // 创建大量 Goroutine
    numWorkers := 20
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }

    wg.Wait()
    fmt.Println("All workers completed")
}

func main() {
    demonstrateWorkStealing()
}

调度器的优化机制 #

本地队列优先 #

每个 P 都有自己的本地运行队列,调度器优先从本地队列获取 Goroutine,这样可以:

  • 减少锁竞争
  • 提高缓存局部性
  • 降低调度延迟

全局队列平衡 #

当本地队列过长或为空时,调度器会与全局队列进行平衡:

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func monitorScheduler() {
    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()

    for i := 0; i < 20; i++ {
        <-ticker.C
        fmt.Printf("Goroutines: %d\n", runtime.NumGoroutine())
    }
}

func createBurstGoroutines() {
    var wg sync.WaitGroup

    // 突发创建大量 Goroutine
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            time.Sleep(10 * time.Millisecond)
        }(i)
    }

    wg.Wait()
}

func main() {
    go monitorScheduler()

    fmt.Println("Creating burst of goroutines...")
    createBurstGoroutines()

    time.Sleep(3 * time.Second)
}

系统调用处理 #

当 Goroutine 进行系统调用时,调度器会进行特殊处理:

package main

import (
    "fmt"
    "os"
    "runtime"
    "sync"
    "time"
)

func fileOperation(id int, wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Printf("Goroutine %d: Starting file operation\n", id)

    // 创建临时文件(系统调用)
    file, err := os.CreateTemp("", fmt.Sprintf("temp_%d_", id))
    if err != nil {
        fmt.Printf("Goroutine %d: Error creating file: %v\n", id, err)
        return
    }
    defer os.Remove(file.Name())
    defer file.Close()

    // 写入数据(系统调用)
    data := fmt.Sprintf("Data from goroutine %d\n", id)
    _, err = file.WriteString(data)
    if err != nil {
        fmt.Printf("Goroutine %d: Error writing file: %v\n", id, err)
        return
    }

    // 同步到磁盘(系统调用)
    err = file.Sync()
    if err != nil {
        fmt.Printf("Goroutine %d: Error syncing file: %v\n", id, err)
        return
    }

    fmt.Printf("Goroutine %d: File operation completed\n", id)
}

func main() {
    runtime.GOMAXPROCS(2) // 使用 2 个 P

    var wg sync.WaitGroup

    // 启动多个进行文件操作的 Goroutine
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go fileOperation(i, &wg)
    }

    // 同时启动一些 CPU 密集型任务
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("CPU task %d: Starting\n", id)

            // CPU 密集型计算
            sum := 0
            for j := 0; j < 100000000; j++ {
                sum += j
            }

            fmt.Printf("CPU task %d: Completed with sum %d\n", id, sum)
        }(i)
    }

    wg.Wait()
    fmt.Println("All tasks completed")
}

调度器的性能调优 #

GOMAXPROCS 设置 #

GOMAXPROCS 控制同时执行 Go 代码的操作系统线程数:

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func benchmarkWithDifferentGOMAXPROCS(maxProcs int) time.Duration {
    runtime.GOMAXPROCS(maxProcs)

    var wg sync.WaitGroup
    start := time.Now()

    // 创建 CPU 密集型任务
    numTasks := 8
    for i := 0; i < numTasks; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            // CPU 密集型计算
            sum := 0
            for j := 0; j < 50000000; j++ {
                sum += j * j
            }
        }(i)
    }

    wg.Wait()
    return time.Since(start)
}

func main() {
    cpuCount := runtime.NumCPU()
    fmt.Printf("CPU cores: %d\n", cpuCount)

    // 测试不同的 GOMAXPROCS 设置
    for _, procs := range []int{1, 2, 4, cpuCount, cpuCount * 2} {
        duration := benchmarkWithDifferentGOMAXPROCS(procs)
        fmt.Printf("GOMAXPROCS=%d: %v\n", procs, duration)
    }
}

调度器状态监控 #

我们可以通过 runtime 包获取调度器的状态信息:

package main

import (
    "fmt"
    "runtime"
    "time"
)

func printSchedulerStats() {
    var m runtime.MemStats
    runtime.ReadMemStats(&m)

    fmt.Printf("=== Scheduler Stats ===\n")
    fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
    fmt.Printf("NumGoroutine: %d\n", runtime.NumGoroutine())
    fmt.Printf("NumCPU: %d\n", runtime.NumCPU())

    // 内存统计
    fmt.Printf("Alloc: %d KB\n", m.Alloc/1024)
    fmt.Printf("TotalAlloc: %d KB\n", m.TotalAlloc/1024)
    fmt.Printf("Sys: %d KB\n", m.Sys/1024)
    fmt.Printf("NumGC: %d\n", m.NumGC)
    fmt.Println()
}

func createGoroutineWaves() {
    for wave := 0; wave < 3; wave++ {
        fmt.Printf("Creating wave %d of goroutines...\n", wave+1)

        for i := 0; i < 100; i++ {
            go func(waveId, id int) {
                time.Sleep(time.Duration(100+id*10) * time.Millisecond)
            }(wave, i)
        }

        time.Sleep(50 * time.Millisecond)
        printSchedulerStats()
    }
}

func main() {
    printSchedulerStats()
    createGoroutineWaves()

    // 等待所有 Goroutine 完成
    time.Sleep(2 * time.Second)
    printSchedulerStats()
}

调度器的陷阱和最佳实践 #

避免 Goroutine 泄漏 #

package main

import (
    "context"
    "fmt"
    "runtime"
    "time"
)

// 错误示例:可能导致 Goroutine 泄漏
func badExample() {
    ch := make(chan int)

    go func() {
        // 这个 Goroutine 会永远阻塞,因为没有人向 channel 发送数据
        val := <-ch
        fmt.Printf("Received: %d\n", val)
    }()

    // 主函数结束,但上面的 Goroutine 仍在等待
}

// 正确示例:使用 context 控制 Goroutine 生命周期
func goodExample() {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    ch := make(chan int)

    go func() {
        select {
        case val := <-ch:
            fmt.Printf("Received: %d\n", val)
        case <-ctx.Done():
            fmt.Println("Goroutine cancelled")
            return
        }
    }()

    // 模拟一些工作
    time.Sleep(500 * time.Millisecond)
}

func main() {
    fmt.Printf("Initial goroutines: %d\n", runtime.NumGoroutine())

    badExample()
    time.Sleep(100 * time.Millisecond)
    fmt.Printf("After bad example: %d\n", runtime.NumGoroutine())

    goodExample()
    time.Sleep(1500 * time.Millisecond) // 等待 context 超时
    fmt.Printf("After good example: %d\n", runtime.NumGoroutine())
}

合理使用 runtime.Gosched() #

package main

import (
    "fmt"
    "runtime"
    "time"
)

func withoutGosched() {
    start := time.Now()

    go func() {
        for i := 0; i < 1000000000; i++ {
            // CPU 密集型循环,不主动让出 CPU
        }
        fmt.Println("Goroutine 1 completed")
    }()

    go func() {
        for i := 0; i < 1000000000; i++ {
            // 另一个 CPU 密集型循环
        }
        fmt.Println("Goroutine 2 completed")
    }()

    time.Sleep(2 * time.Second)
    fmt.Printf("Without Gosched took: %v\n", time.Since(start))
}

func withGosched() {
    start := time.Now()

    go func() {
        for i := 0; i < 1000000000; i++ {
            if i%100000000 == 0 {
                runtime.Gosched() // 主动让出 CPU
            }
        }
        fmt.Println("Goroutine 1 completed")
    }()

    go func() {
        for i := 0; i < 1000000000; i++ {
            if i%100000000 == 0 {
                runtime.Gosched() // 主动让出 CPU
            }
        }
        fmt.Println("Goroutine 2 completed")
    }()

    time.Sleep(2 * time.Second)
    fmt.Printf("With Gosched took: %v\n", time.Since(start))
}

func main() {
    runtime.GOMAXPROCS(1) // 使用单个 P 来演示效果

    fmt.Println("Testing without Gosched:")
    withoutGosched()

    fmt.Println("\nTesting with Gosched:")
    withGosched()
}

小结 #

在本节中,我们深入了解了 Go 调度器的工作原理:

  1. GMP 模型:理解 Goroutine、Machine 和 Processor 的关系
  2. 调度时机:掌握 Goroutine 何时会被调度
  3. 工作窃取:了解调度器如何平衡负载
  4. 性能调优:学会合理设置 GOMAXPROCS 和监控调度器状态
  5. 最佳实践:避免常见的调度器使用陷阱

理解调度器的工作原理有助于我们编写更高效的并发程序,在下一节中,我们将学习常见的并发模式和设计模式。

练习题 #

  1. 编写程序测试不同 GOMAXPROCS 设置对 CPU 密集型和 I/O 密集型任务的影响
  2. 实现一个简单的 Goroutine 池,控制同时运行的 Goroutine 数量
  3. 编写程序演示工作窃取算法的效果,创建不均匀的工作负载并观察调度行为