Skip to content

并发等待组 WaitGroup

等待异步任务执行完

go
package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}

	// wg.Go 简化了 Add 和 Done 两步
	wg.Go(func() {
		fmt.Println("开始执行任务2")
	})

	// 这个就是 wg.Go 方法的实现
	go func() {
		wg.Add(1)       // 增加计数
		defer wg.Done() // 减少计数本质就是 wg.Add(-1)
		fmt.Println("开始执行任务1")
	}()

	// 等待所有任务完成(计数器归零) 这个操作会阻塞主线程
	wg.Wait()
	fmt.Println("所有任务完成!")
}

互斥锁 Mutex

为了解决异步任务的数据竞争问题

  • 最基础的锁类型, 同一时间只允许一个 goroutine 访问共享资源(其实就是内存), 无论是读操作还是写操作
go
package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	lk := sync.Mutex{}

	num := 0

	// inc
	for i := range 100 {
		wg.Go(func() {
			lk.Lock()   // 给 num 加锁, 当前 gorutine 在操作这个
			num += i    // 变量(内存) 的时候, 其他 gorutine 不允许操作
			lk.Unlock() // 必须等待我解锁之后才允许操作
		})
	}

	// dec
	for i := range 100 {
		wg.Go(func() {
			lk.Lock()
			num -= i
			lk.Unlock()
		})
	}

	wg.Wait()
	fmt.Println("所有任务完成, num = ", num)
}
go
package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}

	num := 0

	// inc
	for i := range 100 {
		wg.Go(func() {
			num += i
		})
	}

	// dec
	for i := range 100 {
		wg.Go(func() {
			num -= i
		})
	}

	wg.Wait()
	fmt.Println("所有任务完成, num = ", num)
}

读写锁 RWMutex

与 Mutex 类似, 也是为了解决数据竞争问题, 不同的是, 精细的区分控制 readwrite 操作

  • 支持单写多读的锁机制,允许多个读操作同时进行,但写操作需要独占

  • 读密集型: RWMutex 性能更好, 因为多个读操作可以并发执行

  • 写密集型: MutexRWMutex 性能接近, 因为写操作都需要独占锁

  • 混合场景: 取决于读写比例, 读多写少时 RWMutex 优势明显

RWMutex 比 Mutex 多了3个方法

  • Lock: 获取写锁
  • Unlock: 释放写锁
  • RLock: 获取读锁
  • RUnlock: 释放读锁
  • RLocker: 返回读锁接口 Locker
go
package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	lk := sync.RWMutex{}

	num := 0

	// 写操作, 需要加锁
	for i := range 100 {
		wg.Go(func() {
			lk.Lock()   // 给 num 加锁, 当前 gorutine 在操作这个
			num += i    // 变量(内存) 的时候, 其他 gorutine 不允许操作
			lk.Unlock() // 必须等待我解锁之后才允许操作
		})
	}

	// 允许读取, 不用管锁的问题
	for i := range 100 {
		wg.Go(func() {
			fmt.Printf("第 %d 次数读取, num =%d \n", i, num)
		})
	}

	// 写的操作必须要锁定, 否则还是出现数据竞争问题
	for i := range 100 {
		wg.Go(func() {
			lk.Lock()
			num -= i
			lk.Unlock()
		})
	}

	// 允许读取, 不用管锁的问题
	for i := range 100 {
		wg.Go(func() {
			fmt.Printf("第 %d 次数读取, num =%d \n", i, num)
		})
	}

	wg.Wait()
	fmt.Println("所有任务完成, num = ", num)
}

并发安全字典 sync.Map

go
package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	kv := sync.Map{}
	lk := sync.RWMutex{}

	kv.LoadOrStore("count", 0)

	for i := range 100 {
		wg.Go(func() {
			lk.Lock()
			count, ok := kv.Load("count")
			if ok {
				kv.Store("count", count.(int)+i)
			}
			lk.Unlock()
		})
	}

	for i := range 100 {
		wg.Go(func() {
			count, ok := kv.Load("count")
			if ok {
				fmt.Println("第", i+1, "次数读取, count =", count)
			}
		})
	}

	for i := range 100 {
		wg.Go(func() {
			lk.Lock()
			count, ok := kv.Load("count")
			if ok {
				kv.Store("count", count.(int)-i)
			}
			lk.Unlock()
		})
	}

	wg.Wait()
	count, ok := kv.Load("count")
	if ok {
		fmt.Println("所有任务完成, count = ", count)
	}

	// 避免数据竞争的问题, 让程序可控
}
go
package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	kv := sync.Map{}

	kv.LoadOrStore("count", 0)

	for i := range 100 {
		wg.Go(func() {
			count, ok := kv.Load("count")
			if ok {
				kv.Store("count", count.(int)+i)
			}
		})
	}

	for i := range 100 {
		wg.Go(func() {
			count, ok := kv.Load("count")
			if ok {
				fmt.Println("第", i+1, "次数读取, count =", count)
			}
		})
	}

	for i := range 100 {
		wg.Go(func() {
			count, ok := kv.Load("count")
			if ok {
				kv.Store("count", count.(int)-i)
			}
		})
	}

	wg.Wait()
	count, ok := kv.Load("count")
	if ok {
		fmt.Println("所有任务完成, count = ", count)
	}

	// 程序在执行的时候不会报错
	// 但是无法避免数据竞争的问题
	// 要想避免数据竞争的问题还得加锁
}
go
package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	kv := map[string]int{
		"count": 0,
	}

	for i := range 100 {
		wg.Go(func() {
			kv["count"] += i
		})
	}

	for i := range 100 {
		wg.Go(func() {
			fmt.Println("第", i+1, "次数读取, count =", kv["count"])
		})
	}

	for i := range 100 {
		wg.Go(func() {
			kv["count"] -= i
		})
	}

	wg.Wait()
	fmt.Println("所有任务完成, kv = ", kv)

	// 这个代码在编译时期是可以通过的
	// 但是在执行的时候就会直接报错:
	// fatal error: concurrent map writes
	// 哪怕给加上 RWMutex 也还是有几率会报错
}

单例模式 Once

仅执行一次 Once

go
package main

import (
	"fmt"
	"sync"
)

var (
	once sync.Once
	data int
)

func initData() {
	fmt.Println("初始化数据...")
	data = 42
}

func main() {
	var wg sync.WaitGroup

	for range 10 {
		wg.Go(func() {
			once.Do(initData) // 10 个 goroutine 只会执行一次
		})
	}

	wg.Wait()
	fmt.Println("所有任务执行完成", data)
}

生成仅执行一次方法 OnceFunc

sync.Once.Do 的便捷方法

go
package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}

	// 返回一个只会执行一次的函数
	parseConfig := sync.OnceFunc(func() {
		fmt.Println("解析配置文件")
	})

	for range 10 {
		wg.Go(func() {
			parseConfig() // 10 个 goroutine 也只会执行一次
		})
	}

	wg.Wait()
	fmt.Println("所有任务完成")
}

OnceValue/OnceValues

创建一个只执行一次并返回(一个/多个)值的函数

go
package main

import (
	"fmt"
	"sync"
)

func main() {
	// 创建一个只执行一次并返回值的函数
	getConfig := sync.OnceValue(func() string {
		fmt.Println(">>> 模拟加载配置文件")
		return "database_config"
	})

	// 多次调用,只执行一次,后续直接返回缓存的值
	fmt.Println("第一次调用:", getConfig())
	fmt.Println("第二次调用:", getConfig())
	fmt.Println("第三次调用:", getConfig())

	// 并发调用示例
	var wg sync.WaitGroup
	for i := range 5 {
		wg.Go(func() {
			fmt.Printf("Goroutine %d: %s\n", i, getConfig())
		})
	}
	wg.Wait()
}
go
package main

import (
	"fmt"
	"sync"
)

func main() {
	// 创建一个只执行一次并返回值的函数
	// 注: 返回值必须有且只有2个, 但是类型是任意类型
	getConfig := sync.OnceValues(func() (string, error) {
		fmt.Println(">>> 模拟加载配置文件")
		return "production", nil
	})

	// 多次调用,只执行一次,后续直接返回缓存的值
	// 多次调用,只执行一次
	env1, err1 := getConfig()
	fmt.Printf("第一次: env=%s, err=%v\n", env1, err1)

	env2, err2 := getConfig()
	fmt.Printf("第二次: env=%s, err=%v\n", env2, err2)

	// 并发调用示例
	var wg sync.WaitGroup
	for i := range 5 {
		wg.Go(func() {
			env, err := getConfig()
			fmt.Printf("第 %d 次: env=%s, err=%v\n", i, env, err)
		})
	}
	wg.Wait()
	fmt.Println("所有任务执行完成")
}

临时对象池 Pool

用于对象复用, 减少内存分配

go
package main

import (
	"fmt"
	"sync"
	"time"
)

type DBConnection struct {
	id       int
	dsn      string
	lastUsed time.Time
}

var dbPool = sync.Pool{
	New: func() any {
		// 实际应用中应该创建真实的数据库连接
		// 1.对象复用前必须重置: 从池中获取的对象可能是之前使用过的,使用前应该重置状态
		// 2.不保证对象存活: GC期间池中的对象可能会被清理
		// 3.不要存储重要数据: 不适合存储需要持久化的数据
		// 4.性能优化: 主要用于减少频繁的内存分配
		fmt.Println("创建新的数据库连接")
		return &DBConnection{
			id:       time.Now().Nanosecond(),
			dsn:      "sqlite:///test.db",
			lastUsed: time.Now(),
		}
	},
}

func DBQuery(sql string) {
	// 从池中获取连接, 查询数据库数据
	conn := dbPool.Get().(*DBConnection)
	defer dbPool.Put(conn) // Put 方法将数据库链接对象放回 dbPool

	// 模拟查询操作
	fmt.Printf("使用连接 %d 执行查询: %s\n", conn.id, sql)
	conn.lastUsed = time.Now()
	time.Sleep(100 * time.Millisecond)
}

func main() {
	var wg sync.WaitGroup

	for i := range 10 {
		wg.Go(func() {
			DBQuery(fmt.Sprintf("SELECT * FROM `users` WHERE `id` = %d", i))
		})
	}

	wg.Wait()
	fmt.Println("所有任务完成")
}

同步等待条件 Cond

条件等待和通知: 用于在多个goroutine之间进行条件等待和通知。它通常与互斥锁配合使用, 允许goroutine等待某个条件成立, 然后被其他goroutine唤醒。

  • Wait - 等待条件成立, 会释放锁并阻塞, 被唤醒后重新获取锁
  • Signal - 唤醒一个等待的goroutine
  • Broadcast - 唤醒所有等待的goroutine

注意点:

  1. 必须在持有锁的情况下调用 Wait/Signal/Broadcast
  2. 不要在没有锁的情况下使用 Cond
  3. 根据场景选择 SignalBroadcast
  4. 不要用 Cond+Mutex 替代 channel
go
package main

import (
	"fmt"
	"sync"
)

// 队列
type Queue struct {
	items []int
	size  int
	mu    sync.Mutex
	cond  *sync.Cond
}

// 实例化队列
func NewQueue(size int) *Queue {
	q := &Queue{
		items: make([]int, 0, size),
		size:  size,
	}
	q.cond = sync.NewCond(&q.mu)
	return q
}

// 入列
func (q *Queue) Enqueue(item int) {
	q.mu.Lock()
	defer q.mu.Unlock()

	// 如果队列已满, 等待
	for len(q.items) >= q.size {
		fmt.Println("队列已满,生产者等待...")
		q.cond.Wait()
	}

	q.items = append(q.items, item)
	fmt.Printf("生产者放入: %d, 队列大小: %d\n", item, len(q.items))

	// 唤醒等待的消费者
	q.cond.Signal()
}

// 出列
func (q *Queue) Dequeue() int {
	q.mu.Lock()
	defer q.mu.Unlock()

	// 如果队列为空,等待
	for len(q.items) == 0 {
		fmt.Println("队列为空,消费者等待...")
		q.cond.Wait()
	}

	item := q.items[0]
	q.items = q.items[1:]
	fmt.Printf("消费者取出: %d, 队列大小: %d\n", item, len(q.items))

	// 唤醒等待的生产者
	q.cond.Signal()

	return item
}

func main() {
	var wg sync.WaitGroup
	queue := NewQueue(2)

	// 启动生产者
	for i := range 5 {
		wg.Go(func() {
			item := i + 1
			queue.Enqueue(item)
		})
	}

	// 启动消费者
	for range 5 {
		wg.Go(func() {
			_ = queue.Dequeue()
			// item := queue.Dequeue()
			// fmt.Println("消费者获得item:", item)
		})
	}

	wg.Wait()
	fmt.Println("所有任务完成")
}

原子操作 sync.atomic

所谓的原子操作就可以简单理解为 标准库封装的一系列并发安全的操作方法

为什么有了 还需要 原子操作

因为锁的成本比较高, 耗时多, 需要切换上下文, 而原子操作是经过系统优化过的, 且仅支持基本数据类型(在性能敏感且操作较为简单的的场景下, 应优先使用原子操作)

为什么原子操作比互斥锁要快

  1. 原子操作是依赖CPU指令来实现的, 而不需要依赖外部锁, 源码在 runtime/internal/atomic/atomic_i386.go
  2. 每次获得锁时, 都需要暂停或中断 goroutine, 这就会导致阻塞, 而这种阻塞占使用互斥锁所要花费的大部分时间
  3. 原子操作是能够保证CPU执行期间是连续而不需要中断的

当前 atomic 包有以下种原操作:

  • Add: 加法运算

    • AddInt32: 在原基础上加10的操作: atomic.AddInt32(&num, 10)
    • AddUint32: 在原基础上减10的操作: atomic.AddUInt32(&num, -10)
    • AddInt64
    • AddUInt64
    • AddUIntptr
  • CompareAndSwap: 比较并交换值

    • CompareAndSwapInt32: 对比并交换值: atomic.CompareAndSwapInt32(&num, 10, 20) 如果 num 是 10, 那么交换成功, 否则交换失败
    • CompareAndSwapUInt32
    • CompareAndSwapInt64
    • CompareAndSwapUInt64
    • CompareAndSwapUIntptr
  • Load: 加载/载入/读取

    • LoadInt32: 加载&num对应地址的值: atomic.LoadInt32(&num) 返回 num 变量的值
    • LoadUInt32
    • LoadInt64
    • LoadUInt64
    • LoadUIntptr
  • Store: 保存/存储

    • StoreInt32: 将一个值设置到指定的内存地址: atomic.StoreInt32(&num, 11)
    • StoreUInt32
    • StoreInt64
    • StoreUInt64
    • StoreUIntptr
  • Swap: 交换

    • SwapInt32: 直接交换值并返回旧值: atomic.SwapInt32(&num, 22) 如果 num 是 10, 那么交换后, 值为22, 方法的返回值为 10
    • SwapUInt32
    • SwapInt64
    • SwapUInt64
    • SwapUIntptr
go
package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	num := 0

	// increment
	for range 1000 {
		wg.Go(func() {
			num++
		})
	}

	// decrement
	for range 1000 {
		wg.Go(func() {
			num--
		})
	}

	wg.Wait()

	// 这会导致 num 的结果不可控
	fmt.Println("所有任务完成", num)
}
go
package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	var lk sync.Mutex
	num := 0

	// increment
	for range 1000 {
		wg.Go(func() {
  		// 这个操作非常简单, 就是对一个数字递增/递减而已
  		// 完全没必要加锁, 仅使用 原子操作API 就可以搞定
			lk.Lock()
			num++
			lk.Unlock()
		})
	}

	// decrement
	for range 1000 {
		wg.Go(func() {
			lk.Lock()
			num--
			lk.Unlock()
		})
	}

	wg.Wait()
	fmt.Println("所有任务完成", num)
}
go
package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	var wg sync.WaitGroup
	var num int32 // 注: 此处必须明确指定变量类型

	// increment
	for range 1000 {
		wg.Go(func() {
			atomic.AddInt32(&num, 1)
		})
	}

	// decrement
	for range 1000 {
		wg.Go(func() {
			atomic.AddInt32(&num, -1)
		})
	}

	wg.Wait()
	fmt.Println("所有任务完成", num)
}

Released under the MIT License.