2.3.4 Cond 条件变量 #
Cond 概述 #
Cond(条件变量)是 Go 语言中用于实现复杂等待和通知机制的同步原语。它允许 Goroutine 在某个条件满足之前进入等待状态,当条件满足时,其他 Goroutine 可以通知等待的 Goroutine 继续执行。
Cond 的特点 #
- 条件等待:Goroutine 可以等待特定条件满足
- 通知机制:支持单个通知和广播通知
- 与锁结合:必须与 Mutex 或 RWMutex 结合使用
- 避免虚假唤醒:需要在循环中检查条件
基本方法 #
Wait()
:释放锁并等待通知,被唤醒时重新获取锁Signal()
:唤醒一个等待的 GoroutineBroadcast()
:唤醒所有等待的 Goroutine
Cond 的基本使用 #
基本语法 #
package main
import (
"fmt"
"sync"
"time"
)
type Queue struct {
mu sync.Mutex
cond *sync.Cond
items []string
}
func NewQueue() *Queue {
q := &Queue{
items: make([]string, 0),
}
q.cond = sync.NewCond(&q.mu)
return q
}
func (q *Queue) Put(item string) {
q.mu.Lock()
defer q.mu.Unlock()
q.items = append(q.items, item)
fmt.Printf("Put item: %s (queue size: %d)\n", item, len(q.items))
// 通知一个等待的消费者
q.cond.Signal()
}
func (q *Queue) Get() string {
q.mu.Lock()
defer q.mu.Unlock()
// 等待直到队列不为空
for len(q.items) == 0 {
fmt.Println("Queue is empty, waiting...")
q.cond.Wait() // 释放锁并等待通知
}
// 取出第一个元素
item := q.items[0]
q.items = q.items[1:]
fmt.Printf("Got item: %s (queue size: %d)\n", item, len(q.items))
return item
}
func (q *Queue) Size() int {
q.mu.Lock()
defer q.mu.Unlock()
return len(q.items)
}
func basicCondExample() {
queue := NewQueue()
var wg sync.WaitGroup
// 启动消费者
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 3; j++ {
item := queue.Get()
fmt.Printf("Consumer %d got: %s\n", id, item)
time.Sleep(100 * time.Millisecond)
}
}(i)
}
// 启动生产者
wg.Add(1)
go func() {
defer wg.Done()
for i := 1; i <= 9; i++ {
item := fmt.Sprintf("item-%d", i)
queue.Put(item)
time.Sleep(200 * time.Millisecond)
}
}()
wg.Wait()
fmt.Printf("Final queue size: %d\n", queue.Size())
}
func main() {
basicCondExample()
}
广播通知 #
package main
import (
"fmt"
"sync"
"time"
)
type Barrier struct {
mu sync.Mutex
cond *sync.Cond
count int
waiting int
capacity int
}
func NewBarrier(capacity int) *Barrier {
b := &Barrier{
capacity: capacity,
}
b.cond = sync.NewCond(&b.mu)
return b
}
func (b *Barrier) Wait() {
b.mu.Lock()
defer b.mu.Unlock()
b.waiting++
fmt.Printf("Goroutine waiting at barrier (%d/%d)\n", b.waiting, b.capacity)
if b.waiting == b.capacity {
// 所有 Goroutine 都到达了屏障
fmt.Println("All goroutines reached barrier, broadcasting...")
b.count++
b.waiting = 0
b.cond.Broadcast() // 唤醒所有等待的 Goroutine
} else {
// 等待其他 Goroutine
for b.waiting != 0 {
b.cond.Wait()
}
}
fmt.Printf("Goroutine passed barrier (round %d)\n", b.count)
}
func broadcastExample() {
const numGoroutines = 5
barrier := NewBarrier(numGoroutines)
var wg sync.WaitGroup
for i := 1; i <= numGoroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 模拟不同的工作时间
workTime := time.Duration(id) * 200 * time.Millisecond
fmt.Printf("Goroutine %d working for %v\n", id, workTime)
time.Sleep(workTime)
fmt.Printf("Goroutine %d finished work, waiting at barrier\n", id)
barrier.Wait()
fmt.Printf("Goroutine %d continuing after barrier\n", id)
}(i)
}
wg.Wait()
fmt.Println("All goroutines completed")
}
func main() {
broadcastExample()
}
Cond 的高级用法 #
1. 读写者问题 #
package main
import (
"fmt"
"sync"
"time"
)
type ReadWriteResource struct {
mu sync.Mutex
cond *sync.Cond
readers int
writers int
writing bool
data string
}
func NewReadWriteResource() *ReadWriteResource {
rw := &ReadWriteResource{
data: "initial data",
}
rw.cond = sync.NewCond(&rw.mu)
return rw
}
func (rw *ReadWriteResource) StartRead() {
rw.mu.Lock()
defer rw.mu.Unlock()
// 等待直到没有写者在写入
for rw.writing {
fmt.Println("Reader waiting (writer is active)")
rw.cond.Wait()
}
rw.readers++
fmt.Printf("Reader started (active readers: %d)\n", rw.readers)
}
func (rw *ReadWriteResource) EndRead() {
rw.mu.Lock()
defer rw.mu.Unlock()
rw.readers--
fmt.Printf("Reader finished (active readers: %d)\n", rw.readers)
// 如果没有读者了,通知等待的写者
if rw.readers == 0 {
rw.cond.Broadcast()
}
}
func (rw *ReadWriteResource) Read() string {
rw.StartRead()
defer rw.EndRead()
// 模拟读取操作
time.Sleep(100 * time.Millisecond)
return rw.data
}
func (rw *ReadWriteResource) StartWrite() {
rw.mu.Lock()
defer rw.mu.Unlock()
rw.writers++
// 等待直到没有读者和写者
for rw.readers > 0 || rw.writing {
fmt.Printf("Writer waiting (readers: %d, writing: %v)\n", rw.readers, rw.writing)
rw.cond.Wait()
}
rw.writing = true
fmt.Println("Writer started")
}
func (rw *ReadWriteResource) EndWrite() {
rw.mu.Lock()
defer rw.mu.Unlock()
rw.writing = false
rw.writers--
fmt.Println("Writer finished")
// 通知所有等待的读者和写者
rw.cond.Broadcast()
}
func (rw *ReadWriteResource) Write(data string) {
rw.StartWrite()
defer rw.EndWrite()
// 模拟写入操作
time.Sleep(200 * time.Millisecond)
rw.data = data
fmt.Printf("Data written: %s\n", data)
}
func readWriteExample() {
resource := NewReadWriteResource()
var wg sync.WaitGroup
// 启动多个读者
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 3; j++ {
data := resource.Read()
fmt.Printf("Reader %d read: %s\n", id, data)
time.Sleep(50 * time.Millisecond)
}
}(i)
}
// 启动写者
for i := 1; i <= 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 2; j++ {
data := fmt.Sprintf("data-from-writer-%d-%d", id, j)
resource.Write(data)
time.Sleep(300 * time.Millisecond)
}
}(i)
}
wg.Wait()
}
func main() {
readWriteExample()
}
2. 生产者-消费者模式(有界缓冲区) #
package main
import (
"fmt"
"sync"
"time"
)
type BoundedBuffer struct {
mu sync.Mutex
notFull *sync.Cond
notEmpty *sync.Cond
buffer []interface{}
capacity int
size int
in int
out int
}
func NewBoundedBuffer(capacity int) *BoundedBuffer {
bb := &BoundedBuffer{
buffer: make([]interface{}, capacity),
capacity: capacity,
}
bb.notFull = sync.NewCond(&bb.mu)
bb.notEmpty = sync.NewCond(&bb.mu)
return bb
}
func (bb *BoundedBuffer) Put(item interface{}) {
bb.mu.Lock()
defer bb.mu.Unlock()
// 等待直到缓冲区不满
for bb.size == bb.capacity {
fmt.Printf("Buffer full, producer waiting... (size: %d/%d)\n", bb.size, bb.capacity)
bb.notFull.Wait()
}
// 放入元素
bb.buffer[bb.in] = item
bb.in = (bb.in + 1) % bb.capacity
bb.size++
fmt.Printf("Produced: %v (size: %d/%d)\n", item, bb.size, bb.capacity)
// 通知消费者
bb.notEmpty.Signal()
}
func (bb *BoundedBuffer) Get() interface{} {
bb.mu.Lock()
defer bb.mu.Unlock()
// 等待直到缓冲区不空
for bb.size == 0 {
fmt.Printf("Buffer empty, consumer waiting... (size: %d/%d)\n", bb.size, bb.capacity)
bb.notEmpty.Wait()
}
// 取出元素
item := bb.buffer[bb.out]
bb.buffer[bb.out] = nil // 清空引用
bb.out = (bb.out + 1) % bb.capacity
bb.size--
fmt.Printf("Consumed: %v (size: %d/%d)\n", item, bb.size, bb.capacity)
// 通知生产者
bb.notFull.Signal()
return item
}
func (bb *BoundedBuffer) Size() int {
bb.mu.Lock()
defer bb.mu.Unlock()
return bb.size
}
func boundedBufferExample() {
buffer := NewBoundedBuffer(3)
var wg sync.WaitGroup
// 启动生产者
for i := 1; i <= 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 1; j <= 5; j++ {
item := fmt.Sprintf("P%d-Item%d", id, j)
buffer.Put(item)
time.Sleep(200 * time.Millisecond)
}
fmt.Printf("Producer %d finished\n", id)
}(i)
}
// 启动消费者
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 1; j <= 3; j++ {
item := buffer.Get()
fmt.Printf("Consumer %d got: %v\n", id, item)
time.Sleep(300 * time.Millisecond)
}
fmt.Printf("Consumer %d finished\n", id)
}(i)
}
wg.Wait()
fmt.Printf("Final buffer size: %d\n", buffer.Size())
}
func main() {
boundedBufferExample()
}
3. 工作池模式 #
package main
import (
"fmt"
"sync"
"time"
)
type Task struct {
ID int
Data string
}
type WorkerPool struct {
mu sync.Mutex
cond *sync.Cond
tasks []Task
workers int
activeWorkers int
shutdown bool
}
func NewWorkerPool(numWorkers int) *WorkerPool {
wp := &WorkerPool{
workers: numWorkers,
}
wp.cond = sync.NewCond(&wp.mu)
// 启动工作者
for i := 0; i < numWorkers; i++ {
go wp.worker(i + 1)
}
return wp
}
func (wp *WorkerPool) worker(id int) {
fmt.Printf("Worker %d started\n", id)
for {
wp.mu.Lock()
// 等待任务或关闭信号
for len(wp.tasks) == 0 && !wp.shutdown {
fmt.Printf("Worker %d waiting for tasks\n", id)
wp.cond.Wait()
}
// 检查是否需要关闭
if wp.shutdown && len(wp.tasks) == 0 {
wp.mu.Unlock()
fmt.Printf("Worker %d shutting down\n", id)
return
}
// 获取任务
task := wp.tasks[0]
wp.tasks = wp.tasks[1:]
wp.activeWorkers++
wp.mu.Unlock()
// 执行任务
fmt.Printf("Worker %d processing task %d: %s\n", id, task.ID, task.Data)
time.Sleep(time.Duration(task.ID%3+1) * 200 * time.Millisecond)
fmt.Printf("Worker %d completed task %d\n", id, task.ID)
wp.mu.Lock()
wp.activeWorkers--
// 如果没有活跃的工作者且没有任务,通知可能在等待的关闭操作
if wp.activeWorkers == 0 && len(wp.tasks) == 0 {
wp.cond.Broadcast()
}
wp.mu.Unlock()
}
}
func (wp *WorkerPool) Submit(task Task) {
wp.mu.Lock()
defer wp.mu.Unlock()
if wp.shutdown {
fmt.Printf("Cannot submit task %d: pool is shutdown\n", task.ID)
return
}
wp.tasks = append(wp.tasks, task)
fmt.Printf("Task %d submitted (queue size: %d)\n", task.ID, len(wp.tasks))
// 通知一个等待的工作者
wp.cond.Signal()
}
func (wp *WorkerPool) Shutdown() {
wp.mu.Lock()
defer wp.mu.Unlock()
fmt.Println("Shutting down worker pool...")
wp.shutdown = true
// 唤醒所有等待的工作者
wp.cond.Broadcast()
}
func (wp *WorkerPool) WaitForCompletion() {
wp.mu.Lock()
defer wp.mu.Unlock()
// 等待所有任务完成
for len(wp.tasks) > 0 || wp.activeWorkers > 0 {
fmt.Printf("Waiting for completion (tasks: %d, active workers: %d)\n",
len(wp.tasks), wp.activeWorkers)
wp.cond.Wait()
}
fmt.Println("All tasks completed")
}
func workerPoolExample() {
pool := NewWorkerPool(3)
// 提交任务
go func() {
for i := 1; i <= 10; i++ {
task := Task{
ID: i,
Data: fmt.Sprintf("Task data %d", i),
}
pool.Submit(task)
time.Sleep(100 * time.Millisecond)
}
}()
// 等待一段时间后关闭
time.Sleep(2 * time.Second)
// 等待所有任务完成
pool.WaitForCompletion()
// 关闭工作池
pool.Shutdown()
time.Sleep(500 * time.Millisecond)
fmt.Println("Worker pool example completed")
}
func main() {
workerPoolExample()
}
4. 事件通知系统 #
package main
import (
"fmt"
"sync"
"time"
)
type Event struct {
Type string
Data interface{}
Time time.Time
}
type EventBus struct {
mu sync.Mutex
cond *sync.Cond
events []Event
listeners map[string][]*Listener
}
type Listener struct {
ID string
EventType string
Channel chan Event
}
func NewEventBus() *EventBus {
eb := &EventBus{
listeners: make(map[string][]*Listener),
}
eb.cond = sync.NewCond(&eb.mu)
return eb
}
func (eb *EventBus) Subscribe(listenerID, eventType string) *Listener {
eb.mu.Lock()
defer eb.mu.Unlock()
listener := &Listener{
ID: listenerID,
EventType: eventType,
Channel: make(chan Event, 10),
}
eb.listeners[eventType] = append(eb.listeners[eventType], listener)
fmt.Printf("Listener %s subscribed to %s events\n", listenerID, eventType)
return listener
}
func (eb *EventBus) Publish(eventType string, data interface{}) {
eb.mu.Lock()
defer eb.mu.Unlock()
event := Event{
Type: eventType,
Data: data,
Time: time.Now(),
}
eb.events = append(eb.events, event)
fmt.Printf("Published %s event: %v\n", eventType, data)
// 通知相关的监听者
if listeners, exists := eb.listeners[eventType]; exists {
for _, listener := range listeners {
select {
case listener.Channel <- event:
fmt.Printf("Event sent to listener %s\n", listener.ID)
default:
fmt.Printf("Listener %s channel full, dropping event\n", listener.ID)
}
}
}
// 通知等待特定事件的 Goroutine
eb.cond.Broadcast()
}
func (eb *EventBus) WaitForEvent(eventType string, timeout time.Duration) (*Event, bool) {
eb.mu.Lock()
defer eb.mu.Unlock()
deadline := time.Now().Add(timeout)
for {
// 检查是否有匹配的事件
for i := len(eb.events) - 1; i >= 0; i-- {
if eb.events[i].Type == eventType {
event := eb.events[i]
return &event, true
}
}
// 检查超时
if time.Now().After(deadline) {
return nil, false
}
// 等待新事件
eb.cond.Wait()
}
}
func eventBusExample() {
bus := NewEventBus()
var wg sync.WaitGroup
// 启动监听者
listener1 := bus.Subscribe("listener1", "user_action")
listener2 := bus.Subscribe("listener2", "system_event")
listener3 := bus.Subscribe("listener3", "user_action")
// 监听者处理事件
wg.Add(3)
go func() {
defer wg.Done()
for i := 0; i < 3; i++ {
event := <-listener1.Channel
fmt.Printf("Listener1 processed: %s - %v\n", event.Type, event.Data)
}
}()
go func() {
defer wg.Done()
for i := 0; i < 2; i++ {
event := <-listener2.Channel
fmt.Printf("Listener2 processed: %s - %v\n", event.Type, event.Data)
}
}()
go func() {
defer wg.Done()
for i := 0; i < 3; i++ {
event := <-listener3.Channel
fmt.Printf("Listener3 processed: %s - %v\n", event.Type, event.Data)
}
}()
// 等待特定事件的 Goroutine
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("Waiting for system_event...")
if event, ok := bus.WaitForEvent("system_event", 3*time.Second); ok {
fmt.Printf("Got awaited system_event: %v\n", event.Data)
} else {
fmt.Println("Timeout waiting for system_event")
}
}()
// 发布事件
time.Sleep(500 * time.Millisecond)
bus.Publish("user_action", "user clicked button")
time.Sleep(500 * time.Millisecond)
bus.Publish("system_event", "system started")
time.Sleep(500 * time.Millisecond)
bus.Publish("user_action", "user logged in")
time.Sleep(500 * time.Millisecond)
bus.Publish("user_action", "user logged out")
time.Sleep(500 * time.Millisecond)
bus.Publish("system_event", "system shutdown")
wg.Wait()
}
func main() {
eventBusExample()
}
常见陷阱和最佳实践 #
1. 避免虚假唤醒 #
package main
import (
"fmt"
"sync"
"time"
)
type SafeCounter struct {
mu sync.Mutex
cond *sync.Cond
value int
target int
}
func NewSafeCounter(target int) *SafeCounter {
sc := &SafeCounter{
target: target,
}
sc.cond = sync.NewCond(&sc.mu)
return sc
}
// 错误示例:不在循环中检查条件
func (sc *SafeCounter) BadWaitForTarget() {
sc.mu.Lock()
defer sc.mu.Unlock()
if sc.value < sc.target {
fmt.Printf("Bad wait: value=%d, target=%d\n", sc.value, sc.target)
sc.cond.Wait() // 可能会有虚假唤醒
}
fmt.Printf("Bad wait completed: value=%d\n", sc.value)
}
// 正确示例:在循环中检查条件
func (sc *SafeCounter) GoodWaitForTarget() {
sc.mu.Lock()
defer sc.mu.Unlock()
for sc.value < sc.target {
fmt.Printf("Good wait: value=%d, target=%d\n", sc.value, sc.target)
sc.cond.Wait()
}
fmt.Printf("Good wait completed: value=%d\n", sc.value)
}
func (sc *SafeCounter) Increment() {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.value++
fmt.Printf("Incremented to: %d\n", sc.value)
if sc.value >= sc.target {
sc.cond.Broadcast()
}
}
func spuriousWakeupExample() {
counter := NewSafeCounter(5)
var wg sync.WaitGroup
// 启动等待者
wg.Add(2)
go func() {
defer wg.Done()
counter.GoodWaitForTarget()
}()
go func() {
defer wg.Done()
counter.BadWaitForTarget()
}()
// 启动递增者
go func() {
for i := 0; i < 6; i++ {
time.Sleep(200 * time.Millisecond)
counter.Increment()
}
}()
wg.Wait()
}
func main() {
spuriousWakeupExample()
}
2. 正确的锁管理 #
package main
import (
"fmt"
"sync"
"time"
)
type Resource struct {
mu sync.Mutex
cond *sync.Cond
available bool
users int
}
func NewResource() *Resource {
r := &Resource{
available: true,
}
r.cond = sync.NewCond(&r.mu)
return r
}
// 错误示例:在 Wait() 后忘记重新检查条件
func (r *Resource) BadAcquire() {
r.mu.Lock()
defer r.mu.Unlock()
if !r.available {
r.cond.Wait()
// 错误:没有重新检查条件
}
r.available = false
r.users++
fmt.Printf("Bad acquire: users=%d\n", r.users)
}
// 正确示例:在循环中检查条件
func (r *Resource) GoodAcquire() {
r.mu.Lock()
defer r.mu.Unlock()
for !r.available {
r.cond.Wait()
}
r.available = false
r.users++
fmt.Printf("Good acquire: users=%d\n", r.users)
}
func (r *Resource) Release() {
r.mu.Lock()
defer r.mu.Unlock()
r.available = true
r.users--
fmt.Printf("Released: users=%d\n", r.users)
r.cond.Signal()
}
func lockManagementExample() {
resource := NewResource()
var wg sync.WaitGroup
// 启动多个使用者
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("User %d trying to acquire resource\n", id)
resource.GoodAcquire()
fmt.Printf("User %d using resource\n", id)
time.Sleep(500 * time.Millisecond)
resource.Release()
fmt.Printf("User %d released resource\n", id)
}(i)
}
wg.Wait()
}
func main() {
lockManagementExample()
}
3. 避免死锁 #
package main
import (
"fmt"
"sync"
"time"
)
type DeadlockDemo struct {
mu1 sync.Mutex
mu2 sync.Mutex
cond1 *sync.Cond
cond2 *sync.Cond
flag1 bool
flag2 bool
}
func NewDeadlockDemo() *DeadlockDemo {
dd := &DeadlockDemo{}
dd.cond1 = sync.NewCond(&dd.mu1)
dd.cond2 = sync.NewCond(&dd.mu2)
return dd
}
// 错误示例:可能导致死锁
func (dd *DeadlockDemo) BadMethod1() {
dd.mu1.Lock()
defer dd.mu1.Unlock()
fmt.Println("Method1: acquired mu1")
// 尝试获取 mu2
dd.mu2.Lock()
defer dd.mu2.Unlock()
fmt.Println("Method1: acquired mu2")
dd.flag1 = true
dd.cond2.Signal()
}
func (dd *DeadlockDemo) BadMethod2() {
dd.mu2.Lock()
defer dd.mu2.Unlock()
fmt.Println("Method2: acquired mu2")
// 尝试获取 mu1
dd.mu1.Lock()
defer dd.mu1.Unlock()
fmt.Println("Method2: acquired mu1")
dd.flag2 = true
dd.cond1.Signal()
}
// 正确示例:按固定顺序获取锁
func (dd *DeadlockDemo) GoodMethod1() {
// 总是先获取 mu1,再获取 mu2
dd.mu1.Lock()
defer dd.mu1.Unlock()
dd.mu2.Lock()
defer dd.mu2.Unlock()
fmt.Println("GoodMethod1: acquired both locks")
dd.flag1 = true
dd.cond2.Signal()
}
func (dd *DeadlockDemo) GoodMethod2() {
// 总是先获取 mu1,再获取 mu2
dd.mu1.Lock()
defer dd.mu1.Unlock()
dd.mu2.Lock()
defer dd.mu2.Unlock()
fmt.Println("GoodMethod2: acquired both locks")
dd.flag2 = true
dd.cond1.Signal()
}
func deadlockAvoidanceExample() {
demo := NewDeadlockDemo()
var wg sync.WaitGroup
fmt.Println("=== Good Example (No Deadlock) ===")
wg.Add(2)
go func() {
defer wg.Done()
demo.GoodMethod1()
}()
go func() {
defer wg.Done()
demo.GoodMethod2()
}()
wg.Wait()
fmt.Println("Good example completed successfully")
}
func main() {
deadlockAvoidanceExample()
}
小结 #
在本节中,我们深入学习了:
- Cond 基础:条件变量的基本概念和使用方法
- 基本操作:Wait、Signal、Broadcast 的使用
- 高级应用:
- 读写者问题
- 生产者-消费者模式(有界缓冲区)
- 工作池模式
- 事件通知系统
- 最佳实践:
- 在循环中检查条件,避免虚假唤醒
- 正确的锁管理
- 避免死锁
Cond 是一个强大但复杂的同步原语,它适用于需要复杂等待和通知机制的场景。虽然在很多情况下 Channel 可能是更好的选择,但 Cond 在某些特定场景下仍然是不可替代的。
通过本章的学习,您已经掌握了 Go 语言中所有主要的同步原语:Mutex、RWMutex、WaitGroup、Once 和 Cond。这些工具为您提供了构建复杂并发系统的基础。
练习题 #
- 实现一个支持优先级的任务队列,使用 Cond 实现等待和通知机制
- 设计一个资源池,支持不同类型的资源,使用 Cond 管理资源的分配和释放
- 创建一个多阶段的数据处理管道,使用 Cond 协调各个阶段之间的同步