并发等待组 WaitGroup
等待异步任务执行完
go
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
// wg.Go 简化了 Add 和 Done 两步
wg.Go(func() {
fmt.Println("开始执行任务2")
})
// 这个就是 wg.Go 方法的实现
go func() {
wg.Add(1) // 增加计数
defer wg.Done() // 减少计数本质就是 wg.Add(-1)
fmt.Println("开始执行任务1")
}()
// 等待所有任务完成(计数器归零) 这个操作会阻塞主线程
wg.Wait()
fmt.Println("所有任务完成!")
}互斥锁 Mutex
为了解决异步任务的数据竞争问题
- 最基础的锁类型, 同一时间只允许一个 goroutine 访问共享资源(其实就是内存), 无论是读操作还是写操作
go
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
lk := sync.Mutex{}
num := 0
// inc
for i := range 100 {
wg.Go(func() {
lk.Lock() // 给 num 加锁, 当前 gorutine 在操作这个
num += i // 变量(内存) 的时候, 其他 gorutine 不允许操作
lk.Unlock() // 必须等待我解锁之后才允许操作
})
}
// dec
for i := range 100 {
wg.Go(func() {
lk.Lock()
num -= i
lk.Unlock()
})
}
wg.Wait()
fmt.Println("所有任务完成, num = ", num)
}go
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
num := 0
// inc
for i := range 100 {
wg.Go(func() {
num += i
})
}
// dec
for i := range 100 {
wg.Go(func() {
num -= i
})
}
wg.Wait()
fmt.Println("所有任务完成, num = ", num)
}读写锁 RWMutex
与 Mutex 类似, 也是为了解决数据竞争问题, 不同的是, 精细的区分控制 read 和 write 操作
支持单写多读的锁机制,允许多个读操作同时进行,但写操作需要独占
读密集型:
RWMutex性能更好, 因为多个读操作可以并发执行写密集型:
Mutex和RWMutex性能接近, 因为写操作都需要独占锁混合场景: 取决于读写比例, 读多写少时
RWMutex优势明显
RWMutex 比 Mutex 多了3个方法
- Lock: 获取写锁
- Unlock: 释放写锁
- RLock: 获取读锁
- RUnlock: 释放读锁
- RLocker: 返回读锁接口 Locker
go
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
lk := sync.RWMutex{}
num := 0
// 写操作, 需要加锁
for i := range 100 {
wg.Go(func() {
lk.Lock() // 给 num 加锁, 当前 gorutine 在操作这个
num += i // 变量(内存) 的时候, 其他 gorutine 不允许操作
lk.Unlock() // 必须等待我解锁之后才允许操作
})
}
// 允许读取, 不用管锁的问题
for i := range 100 {
wg.Go(func() {
fmt.Printf("第 %d 次数读取, num =%d \n", i, num)
})
}
// 写的操作必须要锁定, 否则还是出现数据竞争问题
for i := range 100 {
wg.Go(func() {
lk.Lock()
num -= i
lk.Unlock()
})
}
// 允许读取, 不用管锁的问题
for i := range 100 {
wg.Go(func() {
fmt.Printf("第 %d 次数读取, num =%d \n", i, num)
})
}
wg.Wait()
fmt.Println("所有任务完成, num = ", num)
}并发安全字典 sync.Map
go
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
kv := sync.Map{}
lk := sync.RWMutex{}
kv.LoadOrStore("count", 0)
for i := range 100 {
wg.Go(func() {
lk.Lock()
count, ok := kv.Load("count")
if ok {
kv.Store("count", count.(int)+i)
}
lk.Unlock()
})
}
for i := range 100 {
wg.Go(func() {
count, ok := kv.Load("count")
if ok {
fmt.Println("第", i+1, "次数读取, count =", count)
}
})
}
for i := range 100 {
wg.Go(func() {
lk.Lock()
count, ok := kv.Load("count")
if ok {
kv.Store("count", count.(int)-i)
}
lk.Unlock()
})
}
wg.Wait()
count, ok := kv.Load("count")
if ok {
fmt.Println("所有任务完成, count = ", count)
}
// 避免数据竞争的问题, 让程序可控
}go
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
kv := sync.Map{}
kv.LoadOrStore("count", 0)
for i := range 100 {
wg.Go(func() {
count, ok := kv.Load("count")
if ok {
kv.Store("count", count.(int)+i)
}
})
}
for i := range 100 {
wg.Go(func() {
count, ok := kv.Load("count")
if ok {
fmt.Println("第", i+1, "次数读取, count =", count)
}
})
}
for i := range 100 {
wg.Go(func() {
count, ok := kv.Load("count")
if ok {
kv.Store("count", count.(int)-i)
}
})
}
wg.Wait()
count, ok := kv.Load("count")
if ok {
fmt.Println("所有任务完成, count = ", count)
}
// 程序在执行的时候不会报错
// 但是无法避免数据竞争的问题
// 要想避免数据竞争的问题还得加锁
}go
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
kv := map[string]int{
"count": 0,
}
for i := range 100 {
wg.Go(func() {
kv["count"] += i
})
}
for i := range 100 {
wg.Go(func() {
fmt.Println("第", i+1, "次数读取, count =", kv["count"])
})
}
for i := range 100 {
wg.Go(func() {
kv["count"] -= i
})
}
wg.Wait()
fmt.Println("所有任务完成, kv = ", kv)
// 这个代码在编译时期是可以通过的
// 但是在执行的时候就会直接报错:
// fatal error: concurrent map writes
// 哪怕给加上 RWMutex 也还是有几率会报错
}单例模式 Once
仅执行一次 Once
go
package main
import (
"fmt"
"sync"
)
var (
once sync.Once
data int
)
func initData() {
fmt.Println("初始化数据...")
data = 42
}
func main() {
var wg sync.WaitGroup
for range 10 {
wg.Go(func() {
once.Do(initData) // 10 个 goroutine 只会执行一次
})
}
wg.Wait()
fmt.Println("所有任务执行完成", data)
}生成仅执行一次方法 OnceFunc
sync.Once.Do 的便捷方法
go
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
// 返回一个只会执行一次的函数
parseConfig := sync.OnceFunc(func() {
fmt.Println("解析配置文件")
})
for range 10 {
wg.Go(func() {
parseConfig() // 10 个 goroutine 也只会执行一次
})
}
wg.Wait()
fmt.Println("所有任务完成")
}OnceValue/OnceValues
创建一个只执行一次并返回(一个/多个)值的函数
go
package main
import (
"fmt"
"sync"
)
func main() {
// 创建一个只执行一次并返回值的函数
getConfig := sync.OnceValue(func() string {
fmt.Println(">>> 模拟加载配置文件")
return "database_config"
})
// 多次调用,只执行一次,后续直接返回缓存的值
fmt.Println("第一次调用:", getConfig())
fmt.Println("第二次调用:", getConfig())
fmt.Println("第三次调用:", getConfig())
// 并发调用示例
var wg sync.WaitGroup
for i := range 5 {
wg.Go(func() {
fmt.Printf("Goroutine %d: %s\n", i, getConfig())
})
}
wg.Wait()
}go
package main
import (
"fmt"
"sync"
)
func main() {
// 创建一个只执行一次并返回值的函数
// 注: 返回值必须有且只有2个, 但是类型是任意类型
getConfig := sync.OnceValues(func() (string, error) {
fmt.Println(">>> 模拟加载配置文件")
return "production", nil
})
// 多次调用,只执行一次,后续直接返回缓存的值
// 多次调用,只执行一次
env1, err1 := getConfig()
fmt.Printf("第一次: env=%s, err=%v\n", env1, err1)
env2, err2 := getConfig()
fmt.Printf("第二次: env=%s, err=%v\n", env2, err2)
// 并发调用示例
var wg sync.WaitGroup
for i := range 5 {
wg.Go(func() {
env, err := getConfig()
fmt.Printf("第 %d 次: env=%s, err=%v\n", i, env, err)
})
}
wg.Wait()
fmt.Println("所有任务执行完成")
}临时对象池 Pool
用于对象复用, 减少内存分配
go
package main
import (
"fmt"
"sync"
"time"
)
type DBConnection struct {
id int
dsn string
lastUsed time.Time
}
var dbPool = sync.Pool{
New: func() any {
// 实际应用中应该创建真实的数据库连接
// 1.对象复用前必须重置: 从池中获取的对象可能是之前使用过的,使用前应该重置状态
// 2.不保证对象存活: GC期间池中的对象可能会被清理
// 3.不要存储重要数据: 不适合存储需要持久化的数据
// 4.性能优化: 主要用于减少频繁的内存分配
fmt.Println("创建新的数据库连接")
return &DBConnection{
id: time.Now().Nanosecond(),
dsn: "sqlite:///test.db",
lastUsed: time.Now(),
}
},
}
func DBQuery(sql string) {
// 从池中获取连接, 查询数据库数据
conn := dbPool.Get().(*DBConnection)
defer dbPool.Put(conn) // Put 方法将数据库链接对象放回 dbPool
// 模拟查询操作
fmt.Printf("使用连接 %d 执行查询: %s\n", conn.id, sql)
conn.lastUsed = time.Now()
time.Sleep(100 * time.Millisecond)
}
func main() {
var wg sync.WaitGroup
for i := range 10 {
wg.Go(func() {
DBQuery(fmt.Sprintf("SELECT * FROM `users` WHERE `id` = %d", i))
})
}
wg.Wait()
fmt.Println("所有任务完成")
}同步等待条件 Cond
条件等待和通知: 用于在多个goroutine之间进行条件等待和通知。它通常与互斥锁配合使用, 允许goroutine等待某个条件成立, 然后被其他goroutine唤醒。
- Wait - 等待条件成立, 会释放锁并阻塞, 被唤醒后重新获取锁
- Signal - 唤醒一个等待的goroutine
- Broadcast - 唤醒所有等待的goroutine
注意点:
- 必须在持有锁的情况下调用
Wait/Signal/Broadcast - 不要在没有锁的情况下使用
Cond - 根据场景选择
Signal或Broadcast - 不要用
Cond+Mutex替代channel
go
package main
import (
"fmt"
"sync"
)
// 队列
type Queue struct {
items []int
size int
mu sync.Mutex
cond *sync.Cond
}
// 实例化队列
func NewQueue(size int) *Queue {
q := &Queue{
items: make([]int, 0, size),
size: size,
}
q.cond = sync.NewCond(&q.mu)
return q
}
// 入列
func (q *Queue) Enqueue(item int) {
q.mu.Lock()
defer q.mu.Unlock()
// 如果队列已满, 等待
for len(q.items) >= q.size {
fmt.Println("队列已满,生产者等待...")
q.cond.Wait()
}
q.items = append(q.items, item)
fmt.Printf("生产者放入: %d, 队列大小: %d\n", item, len(q.items))
// 唤醒等待的消费者
q.cond.Signal()
}
// 出列
func (q *Queue) Dequeue() int {
q.mu.Lock()
defer q.mu.Unlock()
// 如果队列为空,等待
for len(q.items) == 0 {
fmt.Println("队列为空,消费者等待...")
q.cond.Wait()
}
item := q.items[0]
q.items = q.items[1:]
fmt.Printf("消费者取出: %d, 队列大小: %d\n", item, len(q.items))
// 唤醒等待的生产者
q.cond.Signal()
return item
}
func main() {
var wg sync.WaitGroup
queue := NewQueue(2)
// 启动生产者
for i := range 5 {
wg.Go(func() {
item := i + 1
queue.Enqueue(item)
})
}
// 启动消费者
for range 5 {
wg.Go(func() {
_ = queue.Dequeue()
// item := queue.Dequeue()
// fmt.Println("消费者获得item:", item)
})
}
wg.Wait()
fmt.Println("所有任务完成")
}原子操作 sync.atomic
所谓的原子操作就可以简单理解为 标准库封装的一系列并发安全的操作方法
为什么有了
锁还需要原子操作
因为锁的成本比较高, 耗时多, 需要切换上下文, 而原子操作是经过系统优化过的, 且仅支持基本数据类型(在性能敏感且操作较为简单的的场景下, 应优先使用原子操作)
为什么原子操作比互斥锁要快
- 原子操作是依赖CPU指令来实现的, 而不需要依赖外部锁, 源码在 runtime/internal/atomic/atomic_i386.go
- 每次获得锁时, 都需要暂停或中断 goroutine, 这就会导致阻塞, 而这种阻塞占使用互斥锁所要花费的大部分时间
- 原子操作是能够保证CPU执行期间是连续而不需要中断的
当前 atomic 包有以下种原操作:
Add: 加法运算
- AddInt32:
在原基础上加10的操作: atomic.AddInt32(&num, 10) - AddUint32:
在原基础上减10的操作: atomic.AddUInt32(&num, -10) - AddInt64
- AddUInt64
- AddUIntptr
- AddInt32:
CompareAndSwap: 比较并交换值
- CompareAndSwapInt32:
对比并交换值: atomic.CompareAndSwapInt32(&num, 10, 20) 如果 num 是 10, 那么交换成功, 否则交换失败 - CompareAndSwapUInt32
- CompareAndSwapInt64
- CompareAndSwapUInt64
- CompareAndSwapUIntptr
- CompareAndSwapInt32:
Load: 加载/载入/读取
- LoadInt32:
加载&num对应地址的值: atomic.LoadInt32(&num) 返回 num 变量的值 - LoadUInt32
- LoadInt64
- LoadUInt64
- LoadUIntptr
- LoadInt32:
Store: 保存/存储
- StoreInt32:
将一个值设置到指定的内存地址: atomic.StoreInt32(&num, 11) - StoreUInt32
- StoreInt64
- StoreUInt64
- StoreUIntptr
- StoreInt32:
Swap: 交换
- SwapInt32:
直接交换值并返回旧值: atomic.SwapInt32(&num, 22) 如果 num 是 10, 那么交换后, 值为22, 方法的返回值为 10 - SwapUInt32
- SwapInt64
- SwapUInt64
- SwapUIntptr
- SwapInt32:
go
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
num := 0
// increment
for range 1000 {
wg.Go(func() {
num++
})
}
// decrement
for range 1000 {
wg.Go(func() {
num--
})
}
wg.Wait()
// 这会导致 num 的结果不可控
fmt.Println("所有任务完成", num)
}go
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
var lk sync.Mutex
num := 0
// increment
for range 1000 {
wg.Go(func() {
// 这个操作非常简单, 就是对一个数字递增/递减而已
// 完全没必要加锁, 仅使用 原子操作API 就可以搞定
lk.Lock()
num++
lk.Unlock()
})
}
// decrement
for range 1000 {
wg.Go(func() {
lk.Lock()
num--
lk.Unlock()
})
}
wg.Wait()
fmt.Println("所有任务完成", num)
}go
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var wg sync.WaitGroup
var num int32 // 注: 此处必须明确指定变量类型
// increment
for range 1000 {
wg.Go(func() {
atomic.AddInt32(&num, 1)
})
}
// decrement
for range 1000 {
wg.Go(func() {
atomic.AddInt32(&num, -1)
})
}
wg.Wait()
fmt.Println("所有任务完成", num)
}