2.5.1 原子操作与原子类型 #
原子操作是并发编程中的重要概念,它保证操作的不可分割性,即操作要么完全执行,要么完全不执行,不会被其他 goroutine 中断。Go 语言通过 sync/atomic 包提供了丰富的原子操作支持,这些操作在某些场景下比使用互斥锁更加高效。
原子操作基础概念 #
什么是原子操作 #
原子操作具有以下特性:
- 不可分割性:操作过程中不会被中断
- 一致性:操作前后系统状态保持一致
- 可见性:操作结果对所有 goroutine 立即可见
- 有序性:提供内存屏障,防止指令重排
原子操作的优势 #
相比互斥锁,原子操作具有以下优势:
- 性能更高:避免了锁的开销
- 无死锁风险:不存在锁竞争
- 更细粒度:可以对单个变量进行原子操作
sync/atomic 包详解 #
基本原子操作函数 #
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
func main() {
// 演示基本的原子操作
demonstrateBasicAtomicOps()
// 演示原子操作的并发安全性
demonstrateConcurrentSafety()
// 演示不同数据类型的原子操作
demonstrateAtomicTypes()
}
// 基本原子操作演示
func demonstrateBasicAtomicOps() {
fmt.Println("=== 基本原子操作演示 ===")
var counter int64 = 10
// 原子加载
value := atomic.LoadInt64(&counter)
fmt.Printf("原子加载: %d\n", value)
// 原子存储
atomic.StoreInt64(&counter, 20)
fmt.Printf("原子存储后: %d\n", atomic.LoadInt64(&counter))
// 原子增加
newValue := atomic.AddInt64(&counter, 5)
fmt.Printf("原子增加5后: %d\n", newValue)
// 原子交换
oldValue := atomic.SwapInt64(&counter, 100)
fmt.Printf("原子交换: 旧值=%d, 新值=%d\n", oldValue, atomic.LoadInt64(&counter))
// 原子比较并交换
swapped := atomic.CompareAndSwapInt64(&counter, 100, 200)
fmt.Printf("CAS操作: 成功=%t, 当前值=%d\n", swapped, atomic.LoadInt64(&counter))
// 再次尝试CAS,这次应该失败
swapped = atomic.CompareAndSwapInt64(&counter, 100, 300)
fmt.Printf("CAS操作: 成功=%t, 当前值=%d\n", swapped, atomic.LoadInt64(&counter))
fmt.Println()
}
// 并发安全性演示
func demonstrateConcurrentSafety() {
fmt.Println("=== 并发安全性演示 ===")
var atomicCounter int64
var mutexCounter int64
var mu sync.Mutex
const numGoroutines = 1000
const incrementsPerGoroutine = 1000
// 使用原子操作的计数器
var wg1 sync.WaitGroup
start := time.Now()
for i := 0; i < numGoroutines; i++ {
wg1.Add(1)
go func() {
defer wg1.Done()
for j := 0; j < incrementsPerGoroutine; j++ {
atomic.AddInt64(&atomicCounter, 1)
}
}()
}
wg1.Wait()
atomicDuration := time.Since(start)
// 使用互斥锁的计数器
var wg2 sync.WaitGroup
start = time.Now()
for i := 0; i < numGoroutines; i++ {
wg2.Add(1)
go func() {
defer wg2.Done()
for j := 0; j < incrementsPerGoroutine; j++ {
mu.Lock()
mutexCounter++
mu.Unlock()
}
}()
}
wg2.Wait()
mutexDuration := time.Since(start)
expected := int64(numGoroutines * incrementsPerGoroutine)
fmt.Printf("期望值: %d\n", expected)
fmt.Printf("原子操作结果: %d, 耗时: %v\n", atomicCounter, atomicDuration)
fmt.Printf("互斥锁结果: %d, 耗时: %v\n", mutexCounter, mutexDuration)
fmt.Printf("性能提升: %.2fx\n", float64(mutexDuration)/float64(atomicDuration))
fmt.Println()
}
// 不同数据类型的原子操作
func demonstrateAtomicTypes() {
fmt.Println("=== 不同数据类型的原子操作 ===")
// int32原子操作
var int32Val int32 = 42
atomic.AddInt32(&int32Val, 8)
fmt.Printf("int32原子操作: %d\n", atomic.LoadInt32(&int32Val))
// uint32原子操作
var uint32Val uint32 = 100
atomic.AddUint32(&uint32Val, 50)
fmt.Printf("uint32原子操作: %d\n", atomic.LoadUint32(&uint32Val))
// uint64原子操作
var uint64Val uint64 = 1000
atomic.AddUint64(&uint64Val, 500)
fmt.Printf("uint64原子操作: %d\n", atomic.LoadUint64(&uint64Val))
// uintptr原子操作
var uintptrVal uintptr = 0x1000
atomic.AddUintptr(&uintptrVal, 0x100)
fmt.Printf("uintptr原子操作: 0x%x\n", atomic.LoadUintptr(&uintptrVal))
// 指针原子操作
type Data struct {
value int
}
var ptr unsafe.Pointer
data1 := &Data{value: 1}
data2 := &Data{value: 2}
atomic.StorePointer(&ptr, unsafe.Pointer(data1))
loadedPtr := (*Data)(atomic.LoadPointer(&ptr))
fmt.Printf("指针原子操作: %d\n", loadedPtr.value)
// 原子交换指针
oldPtr := (*Data)(atomic.SwapPointer(&ptr, unsafe.Pointer(data2)))
fmt.Printf("指针交换: 旧值=%d, 新值=%d\n", oldPtr.value, (*Data)(atomic.LoadPointer(&ptr)).value)
}
原子操作的实际应用 #
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// 原子计数器
type AtomicCounter struct {
value int64
}
func (c *AtomicCounter) Increment() int64 {
return atomic.AddInt64(&c.value, 1)
}
func (c *AtomicCounter) Decrement() int64 {
return atomic.AddInt64(&c.value, -1)
}
func (c *AtomicCounter) Get() int64 {
return atomic.LoadInt64(&c.value)
}
func (c *AtomicCounter) Set(value int64) {
atomic.StoreInt64(&c.value, value)
}
func (c *AtomicCounter) CompareAndSwap(old, new int64) bool {
return atomic.CompareAndSwapInt64(&c.value, old, new)
}
// 原子布尔值
type AtomicBool struct {
flag int32
}
func (b *AtomicBool) Set(value bool) {
var i int32 = 0
if value {
i = 1
}
atomic.StoreInt32(&b.flag, i)
}
func (b *AtomicBool) Get() bool {
return atomic.LoadInt32(&b.flag) != 0
}
func (b *AtomicBool) CompareAndSwap(old, new bool) bool {
var oldVal, newVal int32
if old {
oldVal = 1
}
if new {
newVal = 1
}
return atomic.CompareAndSwapInt32(&b.flag, oldVal, newVal)
}
// 无锁队列(简化版)
type LockFreeQueue struct {
head unsafe.Pointer
tail unsafe.Pointer
}
type node struct {
data interface{}
next unsafe.Pointer
}
func NewLockFreeQueue() *LockFreeQueue {
n := &node{}
return &LockFreeQueue{
head: unsafe.Pointer(n),
tail: unsafe.Pointer(n),
}
}
func (q *LockFreeQueue) Enqueue(data interface{}) {
n := &node{data: data}
for {
tail := (*node)(atomic.LoadPointer(&q.tail))
next := (*node)(atomic.LoadPointer(&tail.next))
if tail == (*node)(atomic.LoadPointer(&q.tail)) {
if next == nil {
if atomic.CompareAndSwapPointer(&tail.next, unsafe.Pointer(next), unsafe.Pointer(n)) {
break
}
} else {
atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), unsafe.Pointer(next))
}
}
}
atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer((*node)(atomic.LoadPointer(&q.tail))), unsafe.Pointer(n))
}
func (q *LockFreeQueue) Dequeue() interface{} {
for {
head := (*node)(atomic.LoadPointer(&q.head))
tail := (*node)(atomic.LoadPointer(&q.tail))
next := (*node)(atomic.LoadPointer(&head.next))
if head == (*node)(atomic.LoadPointer(&q.head)) {
if head == tail {
if next == nil {
return nil // 队列为空
}
atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), unsafe.Pointer(next))
} else {
if next == nil {
continue
}
data := next.data
if atomic.CompareAndSwapPointer(&q.head, unsafe.Pointer(head), unsafe.Pointer(next)) {
return data
}
}
}
}
}
func main() {
// 测试原子计数器
testAtomicCounter()
// 测试原子布尔值
testAtomicBool()
// 测试无锁队列
testLockFreeQueue()
}
func testAtomicCounter() {
fmt.Println("=== 原子计数器测试 ===")
counter := &AtomicCounter{}
var wg sync.WaitGroup
// 启动多个goroutine进行并发操作
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 100; j++ {
counter.Increment()
}
fmt.Printf("Goroutine %d 完成,当前计数: %d\n", id, counter.Get())
}(i)
}
wg.Wait()
fmt.Printf("最终计数: %d\n", counter.Get())
// 测试CAS操作
if counter.CompareAndSwap(1000, 2000) {
fmt.Printf("CAS成功,新值: %d\n", counter.Get())
} else {
fmt.Printf("CAS失败,当前值: %d\n", counter.Get())
}
fmt.Println()
}
func testAtomicBool() {
fmt.Println("=== 原子布尔值测试 ===")
flag := &AtomicBool{}
var wg sync.WaitGroup
// 一个goroutine设置为true
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Millisecond * 100)
flag.Set(true)
fmt.Println("标志设置为true")
}()
// 另一个goroutine等待标志变为true
wg.Add(1)
go func() {
defer wg.Done()
for !flag.Get() {
time.Sleep(time.Millisecond * 10)
}
fmt.Println("检测到标志为true")
}()
wg.Wait()
fmt.Printf("最终标志值: %t\n", flag.Get())
fmt.Println()
}
func testLockFreeQueue() {
fmt.Println("=== 无锁队列测试 ===")
queue := NewLockFreeQueue()
var wg sync.WaitGroup
// 生产者
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
queue.Enqueue(fmt.Sprintf("item-%d", i))
fmt.Printf("入队: item-%d\n", i)
time.Sleep(time.Millisecond * 50)
}
}()
// 消费者
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
for {
item := queue.Dequeue()
if item != nil {
fmt.Printf("出队: %s\n", item)
break
}
time.Sleep(time.Millisecond * 10)
}
}
}()
wg.Wait()
fmt.Println("队列测试完成")
}
原子操作的高级应用 #
实现自旋锁 #
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
)
// 自旋锁实现
type SpinLock struct {
flag int32
}
func (sl *SpinLock) Lock() {
for !atomic.CompareAndSwapInt32(&sl.flag, 0, 1) {
runtime.Gosched() // 让出CPU时间片
}
}
func (sl *SpinLock) Unlock() {
atomic.StoreInt32(&sl.flag, 0)
}
// 性能对比测试
func compareSpinLockWithMutex() {
fmt.Println("=== 自旋锁与互斥锁性能对比 ===")
const numGoroutines = 100
const numOperations = 1000
var counter int64
// 测试自旋锁
spinLock := &SpinLock{}
start := time.Now()
var wg1 sync.WaitGroup
for i := 0; i < numGoroutines; i++ {
wg1.Add(1)
go func() {
defer wg1.Done()
for j := 0; j < numOperations; j++ {
spinLock.Lock()
counter++
spinLock.Unlock()
}
}()
}
wg1.Wait()
spinLockDuration := time.Since(start)
spinLockResult := counter
// 重置计数器
counter = 0
// 测试互斥锁
var mutex sync.Mutex
start = time.Now()
var wg2 sync.WaitGroup
for i := 0; i < numGoroutines; i++ {
wg2.Add(1)
go func() {
defer wg2.Done()
for j := 0; j < numOperations; j++ {
mutex.Lock()
counter++
mutex.Unlock()
}
}()
}
wg2.Wait()
mutexDuration := time.Since(start)
mutexResult := counter
fmt.Printf("自旋锁: 结果=%d, 耗时=%v\n", spinLockResult, spinLockDuration)
fmt.Printf("互斥锁: 结果=%d, 耗时=%v\n", mutexResult, mutexDuration)
if spinLockDuration < mutexDuration {
fmt.Printf("自旋锁快 %.2fx\n", float64(mutexDuration)/float64(spinLockDuration))
} else {
fmt.Printf("互斥锁快 %.2fx\n", float64(spinLockDuration)/float64(mutexDuration))
}
}
func main() {
compareSpinLockWithMutex()
}
实现原子引用计数 #
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// 原子引用计数器
type RefCounter struct {
count int64
data interface{}
}
func NewRefCounter(data interface{}) *RefCounter {
return &RefCounter{
count: 1,
data: data,
}
}
func (rc *RefCounter) AddRef() {
atomic.AddInt64(&rc.count, 1)
}
func (rc *RefCounter) Release() bool {
if atomic.AddInt64(&rc.count, -1) == 0 {
// 引用计数为0,可以清理资源
return true
}
return false
}
func (rc *RefCounter) GetCount() int64 {
return atomic.LoadInt64(&rc.count)
}
func (rc *RefCounter) GetData() interface{} {
return rc.data
}
// 共享资源管理器
type SharedResource struct {
name string
}
func (sr *SharedResource) Use() {
fmt.Printf("使用资源: %s\n", sr.name)
time.Sleep(time.Millisecond * 100) // 模拟使用时间
}
func (sr *SharedResource) Cleanup() {
fmt.Printf("清理资源: %s\n", sr.name)
}
func testRefCounter() {
fmt.Println("=== 原子引用计数测试 ===")
// 创建共享资源
resource := &SharedResource{name: "数据库连接"}
refCounter := NewRefCounter(resource)
var wg sync.WaitGroup
// 启动多个goroutine使用资源
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 增加引用计数
refCounter.AddRef()
fmt.Printf("Goroutine %d 获取资源,引用计数: %d\n", id, refCounter.GetCount())
// 使用资源
resource := refCounter.GetData().(*SharedResource)
resource.Use()
// 释放引用
if refCounter.Release() {
fmt.Printf("Goroutine %d 最后一个释放资源\n", id)
resource.Cleanup()
} else {
fmt.Printf("Goroutine %d 释放资源,剩余引用计数: %d\n", id, refCounter.GetCount())
}
}(i)
}
// 主goroutine也释放初始引用
go func() {
time.Sleep(time.Millisecond * 200) // 等待其他goroutine开始
if refCounter.Release() {
fmt.Println("主goroutine 最后一个释放资源")
resource.Cleanup()
} else {
fmt.Printf("主goroutine 释放资源,剩余引用计数: %d\n", refCounter.GetCount())
}
}()
wg.Wait()
time.Sleep(time.Millisecond * 300) // 等待主goroutine完成
fmt.Printf("最终引用计数: %d\n", refCounter.GetCount())
fmt.Println()
}
func main() {
testRefCounter()
}
原子操作的最佳实践 #
1. 选择合适的原子操作 #
// 好的做法:使用合适的原子操作
func goodAtomicUsage() {
var counter int64
// 简单的计数操作
atomic.AddInt64(&counter, 1)
// 读取操作
value := atomic.LoadInt64(&counter)
// 条件更新
for {
old := atomic.LoadInt64(&counter)
new := old * 2
if atomic.CompareAndSwapInt64(&counter, old, new) {
break
}
}
}
// 不好的做法:过度使用原子操作
func badAtomicUsage() {
var a, b, c int64
// 复杂的操作应该使用锁
for {
oldA := atomic.LoadInt64(&a)
oldB := atomic.LoadInt64(&b)
oldC := atomic.LoadInt64(&c)
newA := oldA + oldB
newB := oldB + oldC
newC := oldC + oldA
// 这种复杂的多变量操作很难保证原子性
if atomic.CompareAndSwapInt64(&a, oldA, newA) &&
atomic.CompareAndSwapInt64(&b, oldB, newB) &&
atomic.CompareAndSwapInt64(&c, oldC, newC) {
break
}
}
}
2. 避免 ABA 问题 #
package main
import (
"fmt"
"sync/atomic"
"time"
)
// ABA问题演示
type Node struct {
value int
next *Node
}
type Stack struct {
head unsafe.Pointer
}
func (s *Stack) Push(value int) {
node := &Node{value: value}
for {
head := (*Node)(atomic.LoadPointer(&s.head))
node.next = head
if atomic.CompareAndSwapPointer(&s.head, unsafe.Pointer(head), unsafe.Pointer(node)) {
break
}
}
}
func (s *Stack) Pop() (int, bool) {
for {
head := (*Node)(atomic.LoadPointer(&s.head))
if head == nil {
return 0, false
}
next := head.next
if atomic.CompareAndSwapPointer(&s.head, unsafe.Pointer(head), unsafe.Pointer(next)) {
return head.value, true
}
}
}
// 使用版本号解决ABA问题
type VersionedPointer struct {
ptr unsafe.Pointer
version int64
}
type SafeStack struct {
head VersionedPointer
}
func (s *SafeStack) Push(value int) {
node := &Node{value: value}
for {
head := s.head
node.next = (*Node)(head.ptr)
newHead := VersionedPointer{
ptr: unsafe.Pointer(node),
version: head.version + 1,
}
// 这里需要使用更复杂的CAS操作来同时比较指针和版本号
// 实际实现会更复杂,这里只是示意
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&s.head.ptr)), head.ptr, newHead.ptr) {
atomic.StoreInt64(&s.head.version, newHead.version)
break
}
}
}
func demonstrateABAProblem() {
fmt.Println("=== ABA问题演示 ===")
stack := &Stack{}
// 初始状态:A -> B -> C
stack.Push(3) // C
stack.Push(2) // B
stack.Push(1) // A
fmt.Println("初始栈: A -> B -> C")
// 模拟ABA问题场景
go func() {
// 线程1:读取头节点A
head := (*Node)(atomic.LoadPointer(&stack.head))
fmt.Printf("线程1读取头节点: %d\n", head.value)
// 模拟延迟
time.Sleep(time.Millisecond * 100)
// 线程1:尝试CAS操作(此时头节点可能已经变化)
next := head.next
if atomic.CompareAndSwapPointer(&stack.head, unsafe.Pointer(head), unsafe.Pointer(next)) {
fmt.Printf("线程1 CAS成功,弹出: %d\n", head.value)
} else {
fmt.Println("线程1 CAS失败")
}
}()
// 线程2:快速操作
go func() {
time.Sleep(time.Millisecond * 50)
// 弹出A
if value, ok := stack.Pop(); ok {
fmt.Printf("线程2弹出: %d\n", value)
}
// 弹出B
if value, ok := stack.Pop(); ok {
fmt.Printf("线程2弹出: %d\n", value)
}
// 重新压入A
stack.Push(1)
fmt.Println("线程2重新压入: 1")
}()
time.Sleep(time.Millisecond * 200)
// 检查最终状态
fmt.Print("最终栈内容: ")
for {
if value, ok := stack.Pop(); ok {
fmt.Printf("%d ", value)
} else {
break
}
}
fmt.Println()
}
func main() {
demonstrateABAProblem()
}
3. 性能考虑 #
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// 性能测试:原子操作 vs 互斥锁 vs Channel
func performanceComparison() {
fmt.Println("=== 性能对比测试 ===")
const numGoroutines = 100
const numOperations = 10000
// 测试原子操作
testAtomic(numGoroutines, numOperations)
// 测试互斥锁
testMutex(numGoroutines, numOperations)
// 测试Channel
testChannel(numGoroutines, numOperations)
}
func testAtomic(numGoroutines, numOperations int) {
var counter int64
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < numOperations; j++ {
atomic.AddInt64(&counter, 1)
}
}()
}
wg.Wait()
duration := time.Since(start)
fmt.Printf("原子操作: 结果=%d, 耗时=%v\n", counter, duration)
}
func testMutex(numGoroutines, numOperations int) {
var counter int64
var mu sync.Mutex
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < numOperations; j++ {
mu.Lock()
counter++
mu.Unlock()
}
}()
}
wg.Wait()
duration := time.Since(start)
fmt.Printf("互斥锁: 结果=%d, 耗时=%v\n", counter, duration)
}
func testChannel(numGoroutines, numOperations int) {
ch := make(chan struct{}, 1)
var counter int64
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < numOperations; j++ {
ch <- struct{}{}
counter++
<-ch
}
}()
}
wg.Wait()
duration := time.Since(start)
fmt.Printf("Channel: 结果=%d, 耗时=%v\n", counter, duration)
}
func main() {
performanceComparison()
}
总结 #
原子操作是 Go 语言并发编程的重要工具,它提供了比互斥锁更高效的同步机制。但是,原子操作也有其局限性:
适用场景 #
- 简单的数值操作(加减、交换等)
- 标志位设置
- 引用计数
- 无锁数据结构的实现
不适用场景 #
- 复杂的多步骤操作
- 需要保护多个变量的场景
- 需要条件等待的场景
最佳实践 #
- 优先使用 Channel 和高级同步原语
- 只在性能关键的场景使用原子操作
- 注意 ABA 问题和内存排序
- 进行充分的测试和性能评估
正确使用原子操作可以显著提高程序的并发性能,但需要深入理解其原理和限制,避免引入难以调试的并发问题。