2.1.2 Goroutine 调度机制 #
Go 调度器概述 #
Go 语言的调度器是其并发编程能力的核心,它负责管理和调度数以万计的 Goroutine。与传统的操作系统线程调度不同,Go 调度器完全运行在用户态,这使得 Goroutine 的创建和切换成本极低。
调度器的设计目标 #
Go 调度器的设计有以下几个主要目标:
- 高效性:最小化调度开销,最大化 CPU 利用率
- 公平性:确保所有 Goroutine 都有机会被执行
- 可扩展性:支持大量 Goroutine 的并发执行
- 低延迟:快速响应 Goroutine 的调度需求
GMP 调度模型 #
Go 调度器采用 GMP 模型,这是理解 Go 并发机制的关键:
G(Goroutine) #
G 代表 Goroutine,是 Go 语言中的轻量级线程。每个 G 包含:
- 栈信息:Goroutine 的执行栈
- 程序计数器:当前执行位置
- 状态信息:运行状态(运行中、就绪、阻塞等)
- 关联的 M:当前运行在哪个 M 上
// Goroutine 的简化结构(实际结构更复杂)
type g struct {
stack stack // 栈信息
stackguard0 uintptr // 栈保护
m *m // 当前关联的 M
sched gobuf // 调度信息
atomicstatus uint32 // 状态
goid int64 // Goroutine ID
}
M(Machine) #
M 代表操作系统线程(Machine),是 Goroutine 运行的载体。每个 M 包含:
- 关联的 G:当前正在执行的 Goroutine
- 关联的 P:当前绑定的 P
- 系统调用信息:处理系统调用时的状态
// M 的简化结构
type m struct {
g0 *g // 用于调度的特殊 Goroutine
curg *g // 当前运行的 Goroutine
p puintptr // 关联的 P
nextp puintptr // 下一个要关联的 P
spinning bool // 是否在寻找工作
}
P(Processor) #
P 代表逻辑处理器(Processor),是 G 和 M 之间的桥梁。每个 P 包含:
- 本地运行队列:存储待执行的 Goroutine
- 关联的 M:当前绑定的 M
- 调度器状态:各种调度相关的信息
// P 的简化结构
type p struct {
m muintptr // 关联的 M
runqhead uint32 // 本地队列头部
runqtail uint32 // 本地队列尾部
runq [256]guintptr // 本地运行队列
runnext guintptr // 下一个要运行的 G
}
调度器的工作原理 #
初始化过程 #
程序启动时,Go 运行时会进行以下初始化:
package main
import (
"fmt"
"runtime"
)
func main() {
// 获取当前的 P 数量(通常等于 CPU 核心数)
fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
// 获取当前 Goroutine 数量
fmt.Printf("NumGoroutine: %d\n", runtime.NumGoroutine())
// 创建一些 Goroutine
for i := 0; i < 5; i++ {
go func(id int) {
fmt.Printf("Goroutine %d running on P\n", id)
}(i)
}
// 让主 Goroutine 等待一下
runtime.Gosched() // 主动让出 CPU
fmt.Printf("NumGoroutine after creation: %d\n", runtime.NumGoroutine())
}
调度时机 #
Goroutine 的调度发生在以下时机:
- 主动让出:调用
runtime.Gosched()
- 系统调用:进行系统调用时
- Channel 操作:Channel 发送或接收阻塞时
- 网络 I/O:网络操作阻塞时
- 垃圾回收:GC 期间的协作点
- 函数调用:在函数序言中检查是否需要调度
package main
import (
"fmt"
"runtime"
"time"
)
func cpuBoundTask(id int) {
fmt.Printf("Task %d started\n", id)
// CPU 密集型任务
count := 0
for i := 0; i < 1000000000; i++ {
count++
// 在长时间运行的循环中主动让出 CPU
if i%100000000 == 0 {
runtime.Gosched()
}
}
fmt.Printf("Task %d completed with count: %d\n", id, count)
}
func main() {
// 设置使用单个 P,更容易观察调度行为
runtime.GOMAXPROCS(1)
// 启动多个 CPU 密集型任务
for i := 0; i < 3; i++ {
go cpuBoundTask(i)
}
time.Sleep(2 * time.Second)
}
工作窃取算法 #
当一个 P 的本地队列为空时,它会尝试从其他 P 的队列中"窃取"工作:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
// 获取当前 Goroutine 运行的 P 的 ID
// 注意:这个函数在实际的 Go 版本中可能不存在,这里仅作演示
fmt.Printf("Worker %d starting\n", id)
// 模拟不同长度的工作
workTime := time.Duration(id*100) * time.Millisecond
time.Sleep(workTime)
fmt.Printf("Worker %d completed after %v\n", id, workTime)
}
func demonstrateWorkStealing() {
// 设置多个 P
runtime.GOMAXPROCS(4)
var wg sync.WaitGroup
// 创建大量 Goroutine
numWorkers := 20
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("All workers completed")
}
func main() {
demonstrateWorkStealing()
}
调度器的优化机制 #
本地队列优先 #
每个 P 都有自己的本地运行队列,调度器优先从本地队列获取 Goroutine,这样可以:
- 减少锁竞争
- 提高缓存局部性
- 降低调度延迟
全局队列平衡 #
当本地队列过长或为空时,调度器会与全局队列进行平衡:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func monitorScheduler() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for i := 0; i < 20; i++ {
<-ticker.C
fmt.Printf("Goroutines: %d\n", runtime.NumGoroutine())
}
}
func createBurstGoroutines() {
var wg sync.WaitGroup
// 突发创建大量 Goroutine
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(10 * time.Millisecond)
}(i)
}
wg.Wait()
}
func main() {
go monitorScheduler()
fmt.Println("Creating burst of goroutines...")
createBurstGoroutines()
time.Sleep(3 * time.Second)
}
系统调用处理 #
当 Goroutine 进行系统调用时,调度器会进行特殊处理:
package main
import (
"fmt"
"os"
"runtime"
"sync"
"time"
)
func fileOperation(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Goroutine %d: Starting file operation\n", id)
// 创建临时文件(系统调用)
file, err := os.CreateTemp("", fmt.Sprintf("temp_%d_", id))
if err != nil {
fmt.Printf("Goroutine %d: Error creating file: %v\n", id, err)
return
}
defer os.Remove(file.Name())
defer file.Close()
// 写入数据(系统调用)
data := fmt.Sprintf("Data from goroutine %d\n", id)
_, err = file.WriteString(data)
if err != nil {
fmt.Printf("Goroutine %d: Error writing file: %v\n", id, err)
return
}
// 同步到磁盘(系统调用)
err = file.Sync()
if err != nil {
fmt.Printf("Goroutine %d: Error syncing file: %v\n", id, err)
return
}
fmt.Printf("Goroutine %d: File operation completed\n", id)
}
func main() {
runtime.GOMAXPROCS(2) // 使用 2 个 P
var wg sync.WaitGroup
// 启动多个进行文件操作的 Goroutine
for i := 0; i < 5; i++ {
wg.Add(1)
go fileOperation(i, &wg)
}
// 同时启动一些 CPU 密集型任务
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("CPU task %d: Starting\n", id)
// CPU 密集型计算
sum := 0
for j := 0; j < 100000000; j++ {
sum += j
}
fmt.Printf("CPU task %d: Completed with sum %d\n", id, sum)
}(i)
}
wg.Wait()
fmt.Println("All tasks completed")
}
调度器的性能调优 #
GOMAXPROCS 设置 #
GOMAXPROCS
控制同时执行 Go 代码的操作系统线程数:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func benchmarkWithDifferentGOMAXPROCS(maxProcs int) time.Duration {
runtime.GOMAXPROCS(maxProcs)
var wg sync.WaitGroup
start := time.Now()
// 创建 CPU 密集型任务
numTasks := 8
for i := 0; i < numTasks; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// CPU 密集型计算
sum := 0
for j := 0; j < 50000000; j++ {
sum += j * j
}
}(i)
}
wg.Wait()
return time.Since(start)
}
func main() {
cpuCount := runtime.NumCPU()
fmt.Printf("CPU cores: %d\n", cpuCount)
// 测试不同的 GOMAXPROCS 设置
for _, procs := range []int{1, 2, 4, cpuCount, cpuCount * 2} {
duration := benchmarkWithDifferentGOMAXPROCS(procs)
fmt.Printf("GOMAXPROCS=%d: %v\n", procs, duration)
}
}
调度器状态监控 #
我们可以通过 runtime
包获取调度器的状态信息:
package main
import (
"fmt"
"runtime"
"time"
)
func printSchedulerStats() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("=== Scheduler Stats ===\n")
fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
fmt.Printf("NumGoroutine: %d\n", runtime.NumGoroutine())
fmt.Printf("NumCPU: %d\n", runtime.NumCPU())
// 内存统计
fmt.Printf("Alloc: %d KB\n", m.Alloc/1024)
fmt.Printf("TotalAlloc: %d KB\n", m.TotalAlloc/1024)
fmt.Printf("Sys: %d KB\n", m.Sys/1024)
fmt.Printf("NumGC: %d\n", m.NumGC)
fmt.Println()
}
func createGoroutineWaves() {
for wave := 0; wave < 3; wave++ {
fmt.Printf("Creating wave %d of goroutines...\n", wave+1)
for i := 0; i < 100; i++ {
go func(waveId, id int) {
time.Sleep(time.Duration(100+id*10) * time.Millisecond)
}(wave, i)
}
time.Sleep(50 * time.Millisecond)
printSchedulerStats()
}
}
func main() {
printSchedulerStats()
createGoroutineWaves()
// 等待所有 Goroutine 完成
time.Sleep(2 * time.Second)
printSchedulerStats()
}
调度器的陷阱和最佳实践 #
避免 Goroutine 泄漏 #
package main
import (
"context"
"fmt"
"runtime"
"time"
)
// 错误示例:可能导致 Goroutine 泄漏
func badExample() {
ch := make(chan int)
go func() {
// 这个 Goroutine 会永远阻塞,因为没有人向 channel 发送数据
val := <-ch
fmt.Printf("Received: %d\n", val)
}()
// 主函数结束,但上面的 Goroutine 仍在等待
}
// 正确示例:使用 context 控制 Goroutine 生命周期
func goodExample() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
ch := make(chan int)
go func() {
select {
case val := <-ch:
fmt.Printf("Received: %d\n", val)
case <-ctx.Done():
fmt.Println("Goroutine cancelled")
return
}
}()
// 模拟一些工作
time.Sleep(500 * time.Millisecond)
}
func main() {
fmt.Printf("Initial goroutines: %d\n", runtime.NumGoroutine())
badExample()
time.Sleep(100 * time.Millisecond)
fmt.Printf("After bad example: %d\n", runtime.NumGoroutine())
goodExample()
time.Sleep(1500 * time.Millisecond) // 等待 context 超时
fmt.Printf("After good example: %d\n", runtime.NumGoroutine())
}
合理使用 runtime.Gosched() #
package main
import (
"fmt"
"runtime"
"time"
)
func withoutGosched() {
start := time.Now()
go func() {
for i := 0; i < 1000000000; i++ {
// CPU 密集型循环,不主动让出 CPU
}
fmt.Println("Goroutine 1 completed")
}()
go func() {
for i := 0; i < 1000000000; i++ {
// 另一个 CPU 密集型循环
}
fmt.Println("Goroutine 2 completed")
}()
time.Sleep(2 * time.Second)
fmt.Printf("Without Gosched took: %v\n", time.Since(start))
}
func withGosched() {
start := time.Now()
go func() {
for i := 0; i < 1000000000; i++ {
if i%100000000 == 0 {
runtime.Gosched() // 主动让出 CPU
}
}
fmt.Println("Goroutine 1 completed")
}()
go func() {
for i := 0; i < 1000000000; i++ {
if i%100000000 == 0 {
runtime.Gosched() // 主动让出 CPU
}
}
fmt.Println("Goroutine 2 completed")
}()
time.Sleep(2 * time.Second)
fmt.Printf("With Gosched took: %v\n", time.Since(start))
}
func main() {
runtime.GOMAXPROCS(1) // 使用单个 P 来演示效果
fmt.Println("Testing without Gosched:")
withoutGosched()
fmt.Println("\nTesting with Gosched:")
withGosched()
}
小结 #
在本节中,我们深入了解了 Go 调度器的工作原理:
- GMP 模型:理解 Goroutine、Machine 和 Processor 的关系
- 调度时机:掌握 Goroutine 何时会被调度
- 工作窃取:了解调度器如何平衡负载
- 性能调优:学会合理设置 GOMAXPROCS 和监控调度器状态
- 最佳实践:避免常见的调度器使用陷阱
理解调度器的工作原理有助于我们编写更高效的并发程序,在下一节中,我们将学习常见的并发模式和设计模式。
练习题 #
- 编写程序测试不同 GOMAXPROCS 设置对 CPU 密集型和 I/O 密集型任务的影响
- 实现一个简单的 Goroutine 池,控制同时运行的 Goroutine 数量
- 编写程序演示工作窃取算法的效果,创建不均匀的工作负载并观察调度行为