您的当前位置:首页正文

Go语言中常见的多线程同步方法

2024-11-24 来源:个人技术集锦

什么是线程、进程、协程

Go 源文件经过编译器处理后,会产生可执行文件,不同系统有不同的格式。可执行文件在操作系统上执行一次,就对应一个进程

进程可以理解为执行中的程序,是一个动态的概念,同一份可执行文件执行多次,会产生多个进程。操作系统中进程是资源分配的基本单位

线程是一个执行上下文,它包含诸多状态数据:每个线程有自己的执行流、调用栈、错误码、私有数据等等

实际上,进程和线程的概念是很类似的,本质上都是一个执行上下文,包括CPU状态、通信状态等等,Linux 内核中线程和进程没有什么区分度,Linux 中所有的线程都当作进程来实现,线程和进程没有都通过 task_struct 来描述,对于多线程来说,原本的进程称为主线程,它们在一起组成一个线程组

协程是一种轻量级的用户态线程,协程不是由操作系统调度,而是由程序控制,具有更低的切换开销

什么是多线程

一个进程内多个线程并发执行的情况就叫多线程,每个线程是一个独立的执行流,多线程是一种编程模型,它与处理器无关、跟设计有关

需要多线程的原因包括:

  • 并行计算:充分利用多核,提升整体吞吐,加快执行速度

  • 后台任务处理:将后台线程和主线程分离,在特定场景它是不可或缺的,如:响应式用户界面、实时系统等

来看一个例子:

 package main
 ​
 import (
     "fmt"
     "sync"
     "time"
 )
 ​
 func main() {
     // 创建一个 5 行 10000000 列的二维数组并初始化
     rows, cols := 5, 10000000
     arr := make([][]int, rows)
     for i := 0; i < rows; i++ {
         arr[i] = make([]int, cols)
         for j := 0; j < cols; j++ {
             arr[i][j] = i + j // 初始化为一些值
         }
     }
 ​
     // 第一种方法:遍历二维数组,累加求和
     startTime := time.Now().UnixNano()
     sum1 := 0
     for i := 0; i < rows; i++ {
         for j := 0; j < cols; j++ {
             sum1 += arr[i][j]
         }
     }
     endTime := time.Now().UnixNano()
     elapsed1 := endTime - startTime // 纳秒级别耗时
     fmt.Printf("第一种方法的总和: %d,耗时: %d ns\n", sum1, elapsed1)
 ​
     // 第二种方法:使用协程遍历每一行并求和
     startTime = time.Now().UnixNano()
     var wg sync.WaitGroup
     results := make(chan int, rows) // channel 用于存储每行的和
 ​
     for i := 0; i < rows; i++ {
         wg.Add(1)
         go func(row []int) {
             defer wg.Done()
             rowSum := 0
             for _, value := range row {
                 rowSum += value
             }
             results <- rowSum // 发送结果到 channel
         }(arr[i])
     }
 ​
     wg.Wait()      // 等待所有协程完成
     close(results) // 关闭 channel
 ​
     sum2 := 0
     for res := range results {
         sum2 += res // 汇总结果
     }
 ​
     endTime = time.Now().UnixNano()
     elapsed2 := endTime - startTime // 纳秒级别耗时
     fmt.Printf("第二种方法的总和: %d,耗时: %d ns\n", sum2, elapsed2)
 }
 ​
 // 第一种方法的总和: 250000075000000,耗时: 45818500 ms
 // 第二种方法的总和: 250000075000000,耗时: 19559500 ms

在这个例子中,我用了两种方法去计算一个 5 行 10000000 列的二维数组的总和,第一种是直接遍历,累加每一个值;第二种是开启 5 个协程,每个协程去累加每一行的值,把计算出的值放到通道里,等所有协程计算完毕后,累加通道中的值

可以看出,并发执行大大减少了耗时,这个差距会在数据量大的时候更明显,比如把数据量提升到 100000000(加了个0),结果会是

 第一种方法的总和: 25000000750000000,耗时: 771276600 ns
 第二种方法的总和: 25000000750000000,耗时: 218875800 ns

实际上,如果数据量过小,比如 10000 列,每个协程运行很短暂的时间,大量时间将耗费在协程的创建和销毁上,性能反倒不如直接遍历

 第一种方法的总和: 250075000,耗时: 0 ns
 第二种方法的总和: 250075000,耗时: 498000 ns

这样的多线程程序能加快处理速度,如果程序运行在多CPU多Core的机器上,就能充分利用多 CPU 多 Core 硬件优势,多线程加速执行是多线程的一个显而易见的主要目的

你可能注意到了,上面的代码中用了一个通道(channel)去接收每行的计算结果,那能不能不用通道,只用一个变量 sum2,协程共同操作一个变量,性能会不会提升呢?

将部分代码修改为:

     // 第二种方法:使用协程遍历每一行并求和
     startTime = time.Now().UnixNano()
     var wg sync.WaitGroup
     sum2 := 0
 ​
     for i := 0; i < rows; i++ {
         wg.Add(1)
         go func(row []int) {
             defer wg.Done()
             for _, value := range row {
                 sum2 += value
             }
         }(arr[i])
     }
 ​
     wg.Wait() // 等待所有协程完成
 ​
     endTime = time.Now().UnixNano()
     elapsed2 := endTime - startTime // 纳秒级别耗时
     fmt.Printf("第二种方法的总和: %d,耗时: %d ns\n", sum2, elapsed2)

结果为:

 第一种方法的总和: 250000075000000,耗时: 41012900 ns
 第二种方法的总和: 80190589801507,耗时: 29967500 ns

耗时不仅没减少,计算出的结果还是错的!

来分析一下结果错误的原因,对于 sum2 += value 这一步来说,可以拆分为三部分

  • 读取 sum2

  • 计算 sum2 + value

  • 赋值 sum2 = sum2 + value

在并发情况下,如果一个协程在读取 sum2 后,赋值 sum2 之前,有一个协程已经完成了赋值操作,也就是第一个协程读取的是第二个协程修改前的数据。这样协程一完成赋值操作后,协程二的赋值操作就被覆盖了,导致了数据的不一致

这其实就是 多线程同步 问题

多线程同步

同一进程内的多个线程会共享数据,对共享数据的并发访问会出现竞争

多线程同步是指:

  • 协调多个线程对共享数据的访问,避免出现数据不一致的情况。

  • 协调各个事件的发生顺序,使多线程在某个点交汇并按预期步骤往前推进,比如某线程需要等另一个线程完成某项工作才能开展该线程的下一步工作

要掌握多线程同步,需先理解为什么需要多线程同步、哪些情况需要同步

为什么需要同步

理解为什么要同步(Why)是多线程编程的关键,它甚至比掌握多线程同步机制(How)本身更加重要

识别什么地方需要同步是编写多线程程序的难点,只有准确识别需要保护的数据、需要同步的点,再配合系统或语言提供的合适的同步机制,才能编写安全高效的多线程程序

来看两个例子:

示例 1

有1个长度为256的切片 msg 用于保存消息,函数 readMsg() 和 writeMsg() 分别用于 msg 的读和写

 package main
 ​
 import (
     "fmt"
     "time"
 )
 ​
 const msgSize = 256
 ​
 var msg [msgSize]byte
 ​
 // readMsg 读取消息
 func readMsg() string {
     return string(msg[:])
 }
 ​
 // writeMsg 写入新消息
 func writeMsg(newMsg string) {
     copy(msg[:], []byte(newMsg))
 }
 ​
 func thread1() {
     newMsg := "this is new msg, it's too looooooong"
     writeMsg(newMsg)
 }
 ​
 func thread2() {
     fmt.Printf("msg=%s\n", readMsg())
 }
 ​
 func main() {
     // 启动两个 goroutine 来模拟并发操作
     go thread1()
     go thread2()
 ​
     // 等待一段时间以确保所有协程完成
     time.Sleep(50 * time.Millisecond)
 }

如果线程1调用 writeMsg(),线程2调用 readMsg(),并发操作,不加保护

因为 msg 的长度是 256 字节,完成长达 256 字节的写入需要多个内存周期,在线程1写入新消息期间,线程2可能读到不一致的数据。即可能读到 "this is new msg",而后半段内容 "it's very..." 线程1还没来得及写入,它不是完整的新消息

在这个例子中,因为数据不完整导致了数据读写不一致

示例2

考虑两个线程对同一个整型变量做自增,变量的初始值是0,我们预期2个线程完成自增后变量的值为2

 // 伪代码
 var x int = 0 // 初始值为 0
 go func thread1() {
     x++
 }()
 ​
 go func thread2() {
     x++
 }()

简单的自增操作,包括三步:

  • 加载:从内存中读取变量x的值存放到寄存器

  • 更新:在寄存器里完成自增

  • 保存:把位于寄存器中的x的新值写入内存

两个线程并发执行 x++,情况会是:

  • 首先,线程1把 x 读到 core1 的寄存器,线程2也把 x 的值加载到 core2 的寄存器,此时,存放在两个 core 的寄存器中x的副本都是0

  • 然后,线程1完成自增,更新寄存器里 x 的值的副本(0变1),线程2也完成自增,更新寄存器里x的值的副本(0变1)

  • 再然后,线程1将更新后的新值 1 写入变量 x 的内存位置

  • 最后,线程2将更新后的新值 1 写入同一内存位置,变量 x 的最终值是1,不符合预期

线程1和线程2在同一个 core(CPU核心) 上交错执行,也有可能出现同样的问题,这个问题跟硬件结构无关

之所以会出现不符合预期的情况,主要是因为“加载+更新+保存”这3个步骤不能在一个内存周期内完成。多个线程对同一变量并发读写,不加同步的话会出现数据不一致

在这个例子中,不一致表现为 x 的终值既可能为 1 也可能为 2

经过这两个例子,可以看出来出现问题的核心在于:多线程访问了共享资源,并且对这个资源的操作不能保证同时完成

接下来介绍 Go 语言中常用的多线程同步方法

怎么做同步

多线程程序里,我们要保护的是数据而非代码,解决上面提到的问题核心,就能实现多线程同步

原子变量

针对前面的两个线程对同一整型变量自增的问题,如果“load、update、store”这 3 个步骤是不可分割的整体,这 3 个步骤要么同时成功要么同时失败,即自增操作 x++ 满足原子性,上面的程序便不会有问题

因为这样的话,2个线程并发执行++x,只会有2个结果:

  • 线程a x++,然后线程b x++,结果是2

  • 线程b x++,然后线程a x++,结果是2

除此之外,不会出现第三种情况,线程a、b谁先谁后,取决于线程调度,但不影响最终结果

Go 中的 sync/atomic 包提供了一组原子操作函数,可以安全地在多个 goroutine 之间进行并发读写操作

atomic包常用方法:

看一个简单的示例:

 package main
 ​
 import (
     "fmt"
     "sync"
     "sync/atomic"
 )
 ​
 func main() {
     var counter int64 // 使用 int64 类型作为原子变量
     var wg sync.WaitGroup
 ​
     // 启动多个 goroutine 增加计数器
     for i := 0; i < 1000; i++ {
         wg.Add(1)
         go func() {
             defer wg.Done()
             atomic.AddInt64(&counter, 1) // 原子增加计数器
         }()
     }
 ​
     wg.Wait()                               // 等待所有 goroutine 完成
     fmt.Println("最终计数器的值:", counter) // 输出计数器的值
 }

在这个示例中,每个 goroutine 都对 counter 变量执行原子增加操作,保证了正确的结果

互斥锁

针对线程1 writeMsg() + 线程2 readMsg()的问题,如果能让线程1 writeMsg()的过程中,线程2不能readMsg(),那就不会有问题。这个要求,其实就是要让多个线程互斥访问共享资源

互斥锁就是能满足上述要求的同步机制,互斥是排他的意思,它可以确保在同一时间,只能有一个线程对那个共享资源进行访问

Go 语言的互斥锁通过 sync 包的 Mutex 类型实现

Mutex 具有两个主要的方法:

  • Lock():获取互斥锁。如果锁已经被其他 goroutine 持有,调用该方法的 goroutine 将会阻塞,直到锁被释放

  • Unlock():释放互斥锁,允许其他被阻塞的 goroutine 获取锁

为某个共享资源配置一个互斥锁,使用互斥锁做线程同步,那么所有 goroutine 对该资源的访问,都需要遵从“加锁、访问、解锁”的三步

示例代码:

 package main
 ​
 import (
     "fmt"
     "sync"
 )
 ​
 type Counter struct {
     mu    sync.Mutex // 互斥锁
     count int
 }
 ​
 func (c *Counter) Increment() {
     c.mu.Lock()         // 获取锁
     defer c.mu.Unlock() // 确保在函数返回时释放锁
     c.count++           // 修改共享数据
 }
 ​
 func (c *Counter) Value() int {
     c.mu.Lock()         // 获取锁以读取数据
     defer c.mu.Unlock() // 确保在函数返回时释放锁
     return c.count      // 返回当前的计数值
 }
 ​
 func main() {
     counter := Counter{}
     var wg sync.WaitGroup
 ​
     // 启动多个 goroutine 进行计数
     for i := 0; i < 1000; i++ {
         wg.Add(1)
         go func() {
             defer wg.Done()
             counter.Increment() // 增加计数器
         }()
     }
 ​
     wg.Wait()                                       // 等待所有 goroutine 完成
     fmt.Println("最终计数器的值:", counter.Value()) // 输出计数器的值
 }

在这个示例中,Counter 结构体包含了一个 sync.Mutex 类型和一个计数器字段 count

在 Increment() 和 Value() 函数中,写和读操作都需要先获取到互斥锁,才能继续进行下一步操作

保证同一时刻只有读或写在独立进行,就不会发生数据不一致

获取锁的过程为:

线程在访问数据之前,申请加锁,如果互斥锁已经被其他线程加锁,则调用该函数的线程会阻塞在加锁操作上,直到其他线程访问完数据,释放(解)锁,阻塞在加锁操作的线程1才会被唤醒,并尝试加锁:

  • 如果没有其他线程申请该锁,那么线程1加锁成功,获得了对资源的访问权,完成操作后,释放锁

  • 如果其他线程也在申请该锁,那么:

    • 如果其他线程抢到了锁,那么线程1继续阻塞

    • 如果线程1抢到了该锁,那么线程1将访问资源,再释放锁,其他竞争该锁的线程得以有机会继续执行

如果不能承受加锁失败而陷入阻塞的代价,可以调用 sync.Mutex 类型提供的 TryLock 方法(在 Go 1.18+ 中提供),获取锁失败后会立即返回失败,而不会阻塞等待锁的释放

读写锁

读写锁跟互斥锁类似,也是申请锁的时候,如果不能得到满足则阻塞,但读写锁跟互斥锁也有不同,读写锁有3个状态:

  • 已加读锁状态

  • 已加写锁状态

  • 未加锁状态

Go 语言的读写锁通过 sync 包下的 RWMutex 结构实现

关键方法有:

  • RLock():获取读锁,如果读写锁处于已加写锁状态,则申请锁的线程阻塞;否则把锁设置为已加读锁状态并成功返回

  • Lock():获取写锁,如果读写锁处于未加锁状态,则把锁设置为已加写锁状态并成功返回;否则阻塞

  • RUnlock() 和 Unlock():把锁设置为未加锁状态后返回

读写锁提升了线程的并行度,可以提升吞吐。它可以让多个读线程同时读共享资源,而写线程访问共享资源的时候,其他线程不能执行,所以,读写锁适合对共享资源访问“读大于写”的场合。读写锁也叫“共享互斥锁”,多个读线程可以并发访问同一资源,这对应共享的概念

考虑一个场景:如果有线程1、2、3共享资源 x,读写锁 RWMutex 保护资源,线程1读访问某资源,然后线程2以写的形式访问同一资源 x,因为 RWMutex 已经被加了读锁,所以线程2被阻塞,然后过了一段时间,线程3也读访问资源 x,这时候线程3可以继续执行,因为读是共享的,然后线程1读访问完成,线程3继续访问,过了一段时间,在线程3访问完成前,线程1又申请读资源,那么它还是会获得访问权,但是写资源的线程2会一直被阻塞

为了避免共享的读线程饿死写线程,通常读写锁的实现,会给写线程优先权,当然这处决于读写锁的实现,作为读写锁的使用方,理解它的语义和使用场景就够了

条件变量

条件变量常用于生产者消费者模式,需配合互斥锁使用

假如现在有一个队列,producer 线程组负责往队列中投递(put),consumer 线程组负责从队列中取出(get)

producer 线程和 comsumer 线程并发访问消息队列,不断竞争资源,需要保证同步

可以给队列配置互斥锁,put 和 get 操作前都先加锁,操作完成后再释放锁。由于获取锁后才能操作资源,consumer 线程组获取锁之后,还需要检测队列中是否有信息,如果有信息就处理信息,没有信息这次获取锁就释放锁,等待下一次获取锁成功

这种不断问询的方式叫做轮询,即使 consumer 线程组获取锁-->检测到队列中没有数据-->释放锁 这一过程负载很轻,但是次数多了后还是会消耗大量的 CPU 资源

你可能想,可以在两次查询之间加入短暂的 sleep,使 consumer 线程组让出 CPU,但是这个睡眠的时间设置为多少合适呢?设置长了的话,会出现消息到来得不到及时处理(延迟上升),设置太短了,还是无辜消耗了CPU资源

轮询行为逻辑上,相当于你在等一个投递到楼下小邮局的包裹,你下楼查验没有之后就上楼回房间,然后又下楼查验,你不停的上下楼查验,其实大可不必如此,何不等包裹到达以后,让门卫打电话通知你去取呢?

条件变量提供了一种类似通知 notify 的机制,能够让线程等待某个条件发生,修改条件这一过程需要受互斥锁保护才能保证正确,所以条件变量必须要搭配互斥锁使用

线程在改变条件前先获取锁,改变条件状态后解锁,最后发出通知。等待条件的睡眠中的线程被通知后,必须先获得锁,再判断条件状态,如果条件不成立,则继续转入睡眠并释放锁

Go 语言中的条件变量可以通过 sync.NewCond() 实现,创建时需要传入一个互斥锁作为参数

关键方法有:

  • Wait:Goroutine 在此方法中等待条件,必须在获取到互斥锁后调用

  • Signal:用于通知一个等待的 Goroutine 条件已改变

  • Broadcast:用于通知所有等待的 Goroutine 条件已改变

写一个简单的例子:

 package main
 ​
 import (
     "fmt"
     "sync"
     "time"
 )
 ​
 type BoundedBuffer struct {
     mu       sync.Mutex
     cond     *sync.Cond
     buffer   []int
     capacity int
 }
 ​
 func NewBoundedBuffer(size int) *BoundedBuffer {
     bb := &BoundedBuffer{
         buffer:   make([]int, 0, size),
         capacity: size,
     }
     bb.cond = sync.NewCond(&bb.mu)
     return bb
 }
 ​
 // Produce 生产者线程
 func (bb *BoundedBuffer) Produce(item int) {
     // 先获取锁
     bb.mu.Lock()
     defer bb.mu.Unlock()
 ​
     // 等待直到缓冲区有空余空间
     for len(bb.buffer) == bb.capacity {
         bb.cond.Wait()
     }
 ​
     bb.buffer = append(bb.buffer, item)
     fmt.Printf("Produced: %d\n", item)
     bb.cond.Signal() // 通知一个等待的消费者
 }
 ​
 // Consume 消费者线程
 func (bb *BoundedBuffer) Consume() int {
     // 先获取锁
     bb.mu.Lock()
     defer bb.mu.Unlock()
 ​
     // 等待直到缓冲区有可消费的项目
     for len(bb.buffer) == 0 {
         bb.cond.Wait()
     }
 ​
     item := bb.buffer[0]
     bb.buffer = bb.buffer[1:]
     fmt.Printf("Consumed: %d\n", item)
     bb.cond.Signal() // 通知一个等待的生产者
     return item
 }
 ​
 func main() {
     bb := NewBoundedBuffer(5)
     var wg sync.WaitGroup
 ​
     // 启动生产者
     for i := 0; i < 5; i++ {
         wg.Add(1)
         go func(id int) {
             defer wg.Done()
             for j := 0; j < 10; j++ {
                 bb.Produce(id*10 + j)
                 time.Sleep(time.Millisecond * 50) // 模拟生产延迟
             }
         }(i)
     }
 ​
     // 启动消费者
     for i := 0; i < 5; i++ {
         wg.Add(1)
         go func() {
             defer wg.Done()
             for j := 0; j < 10; j++ {
                 bb.Consume()
                 time.Sleep(time.Millisecond * 100) // 模拟消费延迟
             }
         }()
     }
 ​
     wg.Wait()
 }

代码实现了一个简单的消息队列功能,生产者线程负责往缓冲区中写入数据,消费者线程负责在缓冲区中取数据。生产者写入完毕后,会调用 Signal() 方法唤醒所有等待的 goroutine,goroutine 相互竞争锁,竞争到后就可以消费数据

条件变量的使用需要非常谨慎,否则容易出现不能唤醒的情况

总结

多线程同步就是协调多个线程对共享数据的访问、各个事件的发生顺序

在并发场景下,访问共享资源,并且对这个资源的操作不能保证同时完成,就有可能造成数据不一致等问题,为了解决这个问题,就需要一些机制,去保证这两个条件不能同时发生

Go 语言中的多线程同步方法主要有:原子变量、互斥锁、读写锁和条件变量

  • 原子变量保证了操作的原子性,要么同时成功,要么同时失败,sync/atomic 包提供了一组原子操作函数,可以安全地在多个 goroutine 之间进行并发读写操作

  • 互斥锁保证了多个线程互斥访问共享资源,它可以确保在同一时间,只能有一个线程对那个共享资源进行访问,Go 语言的互斥锁通过 sync 包的 Mutex 类型实现

  • 读写锁跟互斥锁类似,也是申请锁的时候,如果不能得到满足则阻塞,Go 语言的读写锁通过 sync 包下的 RWMutex 结构实现

  • 条件变量常用于生产者消费者模式,需配合互斥锁使用。条件变量提供了一种类似通知 notify 的机制,能够让线程等待某个条件发生,Go 语言中的条件变量可以通过 sync.NewCond() 实现,创建时需要传入一个互斥锁作为参数

参考资料:

显示全文