您的当前位置:首页正文

golang CPU调度、并发

2024-11-23 来源:个人技术集锦

java并发编程流程:

Goroutine 和 Channel

不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存

线程通讯方案

主流传统方案:通过共享内存的方式进行通信

多线程访问一个内存时
需要限制同一时间能够读写这些变量的线程数量

go:通过通信的方式共享内存

Go 语言提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)
Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介

两个 Goroutine不存在直接关联,但是能通过 Channel 间接完成通信。

Goroutine 树

Go 服务的每一个请求都是通过单独的 Goroutine 处理的

  • 可能会创建多个 Goroutine 来处理一次请求
    于是在一次请求中的 Goroutine形成了 树形结构

所以就要在不同 Goroutine 之间 同步请求特定数据、取消信号以及处理请求的截止日期。

上下文 Context


用来设置截止日期、同步信号,传递请求相关值的结构体

  • 作用:
    对Goroutine 信号进行同步 以减少计算资源的浪费

  • 原理
    每一个 context.Context 都会从最顶层的 Goroutine 一层一层传递到最下层
    context.Context 可以在上层 Goroutine 执行出现错误时,将取消信号及时同步给下层,下层及时停掉无用的工作以减少额外资源的消耗:一旦接收到取消信号就立刻停止当前正在执行的工作。

  • 接口

type Context interface {
	// 上下文截止时间
	Deadline() (deadline time.Time, ok bool)
	// Channel,在上下文的工作完成后关闭
	Done() <-chan struct{}
	// 上下文结束的原因
	Err() error
	// 上下文中的 K-V
	Value(key interface{}) interface{}
}

默认上下文 context.Background

取消信号 context.WithCancel

从 context.Context 中衍生出一个新的子上下文,并返回一个取消函数
执行取消函数,当前上下文以及它的子上下文都会被取消

传值 context.WithValue, context.valueCtx

context.WithValue 能从父上下文中创建一个子上下文,传值的子上下文使用 context.valueCtx 类型

Channel

Go 核心的数据结构, Goroutine 之间的通信方式
从某种程度上说,Channel 是一个用于同步和通信的有锁队列

Channel 与关键字 range 和 select 的关系紧密

  • select 能够让 Goroutine 同时等待多个 Channel 可读或者可写

先进先出

Channel 收发操作均遵循了先进先出的设计,具体规则如下:

  1. 先从 Channel 读取数据的 Goroutine 会先接收到数据;
  2. 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;

并发编程

goroutine

context包

处理单个请求的多个Goroutine之间与请求域的数据、超时和退出等操作

https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-context/

  • 方法
type Context interface {
	// 上下文截止时间
	Deadline() (deadline time.Time, ok bool)
	// Channel,在上下文的工作完成后关闭
	Done() <-chan struct{}
	// 上下文结束的原因
	Err() error
	// 上下文中的 K-V
	Value(key interface{}) interface{}
}
// 默认上下文, 最简单、最常用的上下文类型
context.Background()

// 取消信号: 衍生出一个新的子上下文 + 一个用于取消该上下文的函数
ctx, cancel := context.WithCancel(context.Background())

// 计时器上下文:衍生出一个过期时间为 1s 的上下文
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
  • 使用案例
func main() {
	//当前函数没有上下文作为入参,我们都会使用 context.Background 作为起始的上下文向下传递。
    ctx, cancel := context.WithCancel(context.Background())
    go handle(ctx)
    
	cancel() // 通知子goroutine结束
}

func handle(ctx context.Context) {
	select {
		case <-ctx.Done(): // 等待上级通知
			fmt.Println("handle", ctx.Err())
	}
}

Channel, select多路复用

通道有发送(send)、接收(receive)和关闭(close)三种操作

ch <- 10 // 把10发送到ch中

x := <- ch // 从ch中接收值并赋值给变量x
<-ch       // 从ch中接收值,忽略结果

close(ch)

对于从无缓冲Channel进行的接收,发生在对该Channel进行的发送完成之前(无缓冲Channel:接收完成,发送才能完成

select 可以同时响应多个通道的操作。
同时监听多个channel,直到其中一个channel ready

高级用法

goroutine池

控制goroutine数量,防止暴涨
减少goroutine的创建/销毁的性能损耗

没有官方包,根据需求自实现,

"gitee.com/johng/gf/g/os/grpool"包

协程池ants

https://github.com/panjf2000/ants/blob/master/README_ZH.md


cpu调度: runtime包

  • runtime.Gosched()
    让出CPU时间片,重新等待安排任务

  • runtime.Goexit()
    退出当前协程

  • runtime.GOMAXPROCS()
    设置当前程序并发时占用的CPU逻辑核心数

定时器: Timer包

  • Time = time.Now()
    输出当前时间

  • *Timer = time.NewTimer( time.Second)
    时间到了,输出1次时间

timer1 := time.NewTimer(time.Second)
fmt.Println("时间到:", <-timer1.C)
  • *Timer = time.NewTicker( time.Second)
    时间到了,输出多次时间
ticker1 := time.NewTicker(1 * time.Second)
// 一直在循环输出时间
for {
	fmt.Println(<-ticker.C)
}
  • [*Timer].Stop()
    停止定时器
  • [*Timer].Reset(time.Second))
    重置定时器
  • time.Sleep(time.Second)

sync包

互斥锁 sync.Mutex

var lock sync.Mutex

func main() {
	go add()
	go add()
}

func add() {
	lock.Lock() // 加锁, goroutine获取锁才能进入临界区
	x = x + 1
	lock.Unlock() // 解锁
}

读写互斥锁 sync.RWMutex

分为 读锁和写锁, 读锁能多次获取,写锁需要等待
适合读多写少的场景

rwlock sync.RWMutex

rwlock.Lock() 		// 加写锁
rwlock.Unlock() 	// 解写锁

rwlock.RLock()		// 加读锁
rwlock.RUnlock()	// 解读锁

同步信号量 sync.WaitGroup

(wg *WaitGroup) Add(delta int) 计数器+delta
(wg *WaitGroup) Done() 计数器-1
(wg *WaitGroup) Wait() 阻塞直到计数器变为0

var wg sync.WaitGroup

func hello() {
    defer wg.Done() //计数器-1
    fmt.Println("Hello Goroutine!")
}
func main() {
    wg.Add(1)	//计数器+1
    go hello() 
    
    wg.Wait()	//等计数器变为0 才执行
    fmt.Println("main goroutine done!")
}

func并发安全 sync.Once

var loadIconsOnce sync.Once
//func()只执行一次,并发安全
loadIconsOnce.Do(func())

Map并发安全 sync.Map

var m = sync.Map{}

原子操作:sync/atomic包

性能比加锁操作更好

GPM调度系统

和java用操作系统来调度线程不同:线程由 CPU 调度是抢占式的

go语言自己实现的一套调度系统,用来调度goroutine
调度是在用户态下完成的,不需要内核态切换,内存分配,协程由用户态调度是协作式的,一个协程让出 CPU 后,才执行下一个协程。

模型

  • G = goroutine
  • P = 一组goroutine队列 (默认个数=物理线程数) P管理着一组G挂载在M上运行:
  • M = 虚拟内核线程,一个groutine最终是要放到M上执行

M与内核线程一般是一一映射
P与M一般也是一一对应

协程与线程 关系是M:N

当一个G长久阻塞在一个M1上时,
runtime会新建一个M2,阻塞G所在的P 会把其他的G挂载在新建的M2上。
当旧的G阻塞完成或者认为其已经死掉时 回收旧的M1。

  • P原理
    数据结构:P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),
    作用:对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)
    当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务

并发模式

go 并发相比线程同步,更推荐用channel通信

1. 线程同步

同步通信:后台线程执行后 ,主线程再退出

func main() {
    var mu sync.Mutex

    mu.Lock()
    go func(){
        fmt.Println("你好, 世界")
        mu.Unlock()
    }()

    mu.Lock()
}

扩展到N个线程完成后,再进行下一步

func main() {
    var wg sync.WaitGroup

    // 开N个后台打印线程
    for i := 0; i < 10; i++ {
        wg.Add(1)

        go func() {
        	defer wg.Done()
            fmt.Println("你好, 世界")
        }()
    }

    // 等待N个后台线程完成
    wg.Wait()
}

2. 消息传递

同步通信:后台线程执行后 ,主线程再退出

func main() {
    done := make(chan int, 1) // 带缓存的管道

    go func(){
        fmt.Println("你好, 世界")
        done <- 1
    }()

    <-done
}

扩展到N个线程完成后,再进行下一步

func main() {
    done := make(chan int, 10) // 带 10 个缓存

    // 开N个后台打印线程
    for i := 0; i < cap(done); i++ {
        go func(){
            fmt.Println("你好, 世界")
            done <- 1
        }()
    }

    // 等待N个后台线程完成
    for i := 0; i < cap(done); i++ {
        <-done
    }
}

生产者消费者

Go语言实现生产者消费者并发很简单:

  1. 创建1个channel做成果队列
  2. 生产者 并发 发送数据到 channel
  3. 消费者 并发 从channel接收数据

当成果队列中没有数据时,消费者就进入饥饿的等待中;
而当成果队列中数据已满时,生产者则面临因产品挤压导致CPU被剥夺的下岗

发布订阅

显示全文