您的当前位置:首页正文

开源限流组件分析(三):golang-time/rate

2024-11-26 来源:个人技术集锦

本系列

前言

根据的分析,令牌桶限流的实现没必要真的准备一个桶,定时往里塞令牌,然后每次获取令牌时从桶中弹出,但这样做对内存不友好,需要开辟桶最大容量大小的空间

最佳做法是利用 token 数可以和时间跨度相互转化的原理,只用维护一个桶中当前令牌数的变量,每次消费前才根据时间差计算并更新 Token 数目

本文将分析go官方库提供的限流工具的使用及实现细节,也是令牌桶算法。并对下面这个问题给出合理的解释:在CancelAt方法中,为啥归还令牌时不是归还所有令牌,而是要扣减一个值

提供获取令牌的API

  • Wait/WaitN:当没有足够的Token时,将在内部阻塞等待直到Token足够,或超时取消

    • 工程中推荐使用该方法,参数中有ctx,和ctx配合能优雅实现超时控制
  • Allow/AllowN:当前时刻没有足够的Token时,返回false。否则获取到令牌,返回成功

  • Reverse/ReverseN:当没有足够的Token时,返回 Reservation 对象,里面包含是否获取成功,以及要等多久才能放行请求

    • 可控性最好,可以选择等待到规定的时间然后放心请求,也可以调Reservation.Cancel() 取消等待,归还令牌

数据结构

type Limit float64

type Limiter struct {
	mu sync.Mutex

	// 每秒产生多少个token
	limit Limit
	// 桶大小
	burst int
	// 桶中剩余的token数,可以为负数,表示被某些请求预定了一些未来的令牌
	tokens float64
	// 最近一次推进令牌桶的时间
	last time.Time
	// 令牌可以满足最近一次请求的时间
	lastEvent time.Time
}

特别说明这两个字段:

  • last:表示最近一次推进令牌桶的时间,也就是每次调获取令牌api的时间,因此每次获取令牌都会推进令牌桶的时间

    • 这样每次获取令牌时,都会看从last到now这段时间之间产生了多少新的令牌
  • lastEvent:表示令牌桶中最近一次请求能放行的时间。如果当前时间调获取令牌api时就能满足,就是当前时间,否则是未来的某个时间

    • 只用于CancelAt计算归还多少个令牌,可以到后面再理解

基础方法

tokensFromDuration

计算在时长d内,可以生成多少个令牌

也就是 速度 * 时间 = 总量

func (limit Limit) tokensFromDuration(d time.Duration) float64 {
   if limit <= 0 {
      return 0
   }
   return d.Seconds() * float64(limit)
}

durationFromTokens

计算生成tokens个令牌,需要多少时间

也就是 总数 / 速度 = 时间

func (limit Limit) durationFromTokens(tokens float64) time.Duration {
	if limit <= 0 {
		return InfDuration
	}
	seconds := tokens / float64(limit)
	return time.Duration(float64(time.Second) * seconds)
}

advance

计算如果时间推进到t后,桶中可用的令牌个数

注意:该方法只计算,不会更新桶中的令牌数

func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {
	last := lim.last
	if t.Before(last) {
		last = t
	}

    // 过去了多长时间
	elapsed := t.Sub(last)
	// 计算过去这段时间,一共产生多少token
	delta := lim.limit.tokensFromDuration(elapsed)
	tokens := lim.tokens + delta
	// tokens不能超过最大容量
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}
	return t, tokens
}

获取令牌方法

reverseN

Wait系列,Allow系列,Reserve系列的方法,底层都调了reverseN,其流程如下:

注意如果桶中令牌数不够,需要等待直到桶中令牌产生为止

此时需要将桶中令牌数置为负值,表示有请求欠了账,占了位

如果接下来有别的请求来获取令牌,会在之前欠账的基础上继续欠账,让令牌数的负值更大,需要等待更长的时间

通过时间流逝来还这些债,这一点和另一个令牌桶很类似

// 当前时t时刻,想获取n个令牌,最多等maxFutureReserve时间
func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()
	defer lim.mu.Unlock()

    // 可以忽略这个分支,因为用了限流器就肯定要有速率限制
	if lim.limit == Inf {
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: t,
		}
	}

	// 如果时间推进到now,可用的 token 数
	t, tokens := lim.advance(t)

	// 本次消费后,桶还能剩能下多少token
	tokens -= float64(n)

	// 如果 token < 0, 说明目前的token不够,需要等待一段时间
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// n <= lim.burst:申请的 token 没有超过了桶的大小
	// waitDuration <= maxFutureReserve: 需要等待的时间 <=用户期望的时间
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}

	// 能获取到令牌
	if ok {
        // 本次获取了多少token
		r.tokens = n
		// 表示需要等待到这个时刻才能获得期望数量的token(当然 waitDuration 有可能为 0,就是立即满足,timeToAct就是now)
		r.timeToAct = t.Add(waitDuration)

		// 更新推进令牌桶的时间
		lim.last = t
		// 就扣减令牌,更新桶中token的数量
		lim.tokens = tokens
		// 桶中可以满足最近一次请求的时间
		lim.lastEvent = r.timeToAct
	}

	return r
}

其他系列API

allowN:调reserveN时传maxFutureReserve=0,表示只有不等待就能获得时,才获取令牌

func (lim *Limiter) AllowN(t time.Time, n int) bool {
	return lim.reserveN(t, n, 0).ok
}

ReserveN:直接调reserveN

func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation {
   r := lim.reserveN(t, n, InfDuration)
   return &r
}

WaitN:根据ctx计算要获取令牌最多等待多久,并以这个时间最为最大等待时间调reverseN。然后在内部sleep

  1. 先检查ctx是否已取消,如果已取消直接返回

  2. 根据ctx还剩多久过期,来决定在reverseN中最多等待多久

    1. 换句话说,如果判断直到ctx过期都无法获取令牌,就不等了
  3. 获取到Reservation后,看 1)根据Reservation计算还有多久能获取令牌的时间delay,和2) ctx被取消,这两件事谁先发生:

    1. 如果delay先过完:获取令牌成功,可以执行请求
    2. 如果ctx先被取消:本次获取令牌失败,需要尽可能归还桶中的令牌

正常来说,只要不是外部主动取消,都是delay先过完

func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
    // 其实就是:time.NewTimer(d)
	newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {
		timer := time.NewTimer(d)
		return timer.C, timer.Stop, func() {}
	}

	return lim.wait(ctx, n, time.Now(), newTimer)
}


func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {
	lim.mu.Lock()
	burst := lim.burst
	limit := lim.limit
	lim.mu.Unlock()

    // 不能超过桶容量
	if n > burst && limit != Inf {
		return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
	}

    // 先检查下ctx是否已取消
  select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}
	// 获取令牌的最大等待时间根据ctx计算
	waitLimit := InfDuration
	if deadline, ok := ctx.Deadline(); ok {
		waitLimit = deadline.Sub(t)
	}

	r := lim.reserveN(t, n, waitLimit)
	if !r.ok {
		return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
	}

    // 该Reservation还需要等多久才能获得令牌
    delay := r.DelayFrom(t)
	if delay == 0 {
		return nil
	}
  
	ch, stop, _ := newTimer(delay)
	defer stop()
	select {
	case <-ch:
		// ctx超时或被取消之前,就获得令牌了
		return nil
	case <-ctx.Done():

		// 在获得令牌之前,ctx就被取消了,需要归还令牌到桶中
		r.Cancel()
		return ctx.Err()
	}
}

令人费解的CancelAt

最后看看CancelAt:取消本次持有,尽可能归还持有的令牌

为啥需要归还令牌?因此本次实际上没有放行请求,而是被cancel掉了,可以把令牌放回桶中,供接下来的请求使用

// 参数t一般是当前时间
func (r *Reservation) CancelAt(t time.Time) {
	if !r.ok {
		return
	}

	r.lim.mu.Lock()
	defer r.lim.mu.Unlock()

	/**
	1.如果产生令牌的速率无限大,那没必要归还令牌
	2.自己没有拿到token,也不用归还
	3.预期获得时间比当前时间早,这里认为不用归还
	 */
	if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(t) {
		return
	}

	// 如果只有有更新的请求请求了令牌,要归还的令牌数减去
	// r.lim.lastEvent.Sub(r.timeToAct) 这期间产生的令牌
	restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
	if restoreTokens <= 0 {
		return
	}

	// 推进桶的时间
	t, tokens := r.lim.advance(t)

	tokens += restoreTokens
	if burst := float64(r.lim.burst); tokens > burst {
		tokens = burst
	}

	r.lim.last = t
	// 归还token
	r.lim.tokens = tokens

	// 如果相等,说明后面没有新的token消费
	if r.timeToAct == r.lim.lastEvent {
		// 上一次的lastEvent
		prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
		if !prevEvent.Before(t) {
			// 恢复lastEvent为上一次的lastEvent
			r.lim.lastEvent = prevEvent
		}
	}
}

是bug吗

其中比较费解的是这一行:为啥要多减去这一部分

restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
  • r.TimeToAct:本次Reservation可以放行请求的时间,也是能使用令牌的时间
  • r.lim.lastEvent:令牌桶中,最近一次请求的TimeToAct

在归还令牌时,要减掉这段时间范围能产生的令牌数

对应于下面这个场景:

这行代码有注释:

// calculate tokens to restore
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
// after r was obtained. These tokens should not be restored.

但只是解释了在干啥:在r之后获取的令牌不应该被重新放回桶中

而没有解释为啥要这么干


有人,归还所有持有的token是否应该改成restoreTokens = float64(r.tokens)

也有人,举了这样一个例子:

func TestLimit(t *testing.T) {
	t0 := time.Now()
	t1 := time.Now().Add(100 * time.Millisecond)
	t2 := time.Now().Add(200 * time.Millisecond)
	t3 := time.Now().Add(300 * time.Millisecond)
    // 桶每秒产生10个令牌,容量=20
	l := NewLimiter(10, 20)
	l.ReserveN(t0, 15) 
    // 桶里还剩 5 个 token
	fmt.Printf("%+v\n", l)

	// t1:时间过去100ms,桶里增加1个,变成6个
	// 消费10个,变成-4个
	r := l.ReserveN(t1, 10)
	// 此时timeToAct:500ms
	fmt.Printf("%+v\n", l)

	// t2:时间过去100ms,桶里增加1个,变成-3个
	// 消费2个, 桶里变成-5个
	l.ReserveN(t2, 2)
	// 此时timeToAct:700ms
	fmt.Printf("%+v\n", l)

	// t3: 时间过去100ms,桶里增加1个,变成-4个
	// r在t3取消,变成4(归还8个)个,按常理是变成6(归还10个)个
	r.CancelAt(t3)
	fmt.Printf("%+v\n", l)
}

最后一行:

  • 按常理来说,应该把所有令牌都归还,也就是归还10个,这样桶中的令牌数变成6
  • 实际上只归还了8个,桶中的令牌数变成4。就是那行令人费解的代码造成的

有人在issue下面给出解释:

意思就是:如果把保留的令牌数全部还回去的话,会造成在某一时刻突破令牌桶的速率限制

拿上面例子说明:

如果在t3时刻取消r时,如果把所有的令牌都归还了,也就是归还10个,此时桶中的令牌数变成6

那么再过400ms,在t2时刻获取的Reservation r2就可以开始执行,同时现在桶中有10个令牌!

如果此时有请求来获取这10个令牌,算上刚刚开始执行的r2,那就相当于同时有加起来需要消耗12个令牌的请求在执行!超过了最大容量10的限制


而按照源码中的逻辑扣减,在t3时刻取消r时,只归还8个,此时桶中的令牌数变成4

那么再过400ms,在t2时刻获取的Reservation r2就可以开始执行,同时现在桶中有8个令牌

如果此时有请求来获取这8个令牌,算上刚刚开始执行的r2,那就会同时加起来需要消耗10个令牌的请求在执行,刚好不超过桶中的最大容量的限制

也就是说,如果r后面有更新的请求rnew,rnew.TimeToAct赋值给了r.limit.lastEvent

那么从r.TimeToActr.limit.lastEvent之间能产生的令牌是不能归还的,而是要等时间流逝自然填充

这样当rnew可以执行时,桶中的令牌数 + rnew获取的令牌数才不会超过桶的容量


取消后无法唤醒其他请求

这个库有个小问题:如果请求A在请求B之前发生,都需要等待一段时间。如果任务A提前cancel了,使得桶中的令牌满足请求B的需求,请求B也不会被唤醒,而是等待其预定的时间到了再执行

例如下面的例子:

func TestBug(t *testing.T) {
	// 每100ms产生一个令牌,最大容量=10
	l := NewLimiter(10, 10)
	t1 := time.Now()
	// 先把桶中的初始令牌用完
	l.ReserveN(t1, 10)

	var wg sync.WaitGroup

	ctx, cancel := context.WithTimeout(context.TODO(), time.Hour)
	defer cancel()

	wg.Add(1)

	wg.Add(2)
	go func() {
		defer wg.Done()
		// 如果要等,这个要等 1s才能执行,但是我们的 ctx 200ms 就会取消
		l.WaitN(ctx, 10)
		fmt.Printf("[1] cost: %s\n", time.Since(t1))
	}()
	go func() {
		defer wg.Done()
		// 模拟出现问题, 200ms就取消了
		time.Sleep(200 * time.Millisecond)
		cancel()
	}()

	time.Sleep(100 * time.Millisecond)

	go func() {
		defer wg.Done()
		// 正常情况下,这个要等 1.2 s 才能执行,
		// 但是我们前面都取消了,按理说在400ms时,桶中令牌数就够了,这个可以执行
		// 但实际上没有唤醒,等到1.2才执行
		ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
		defer cancel()
		l.WaitN(ctx, 2)
		fmt.Printf("[2] cost: %s\n", time.Since(t1))
	}()

	wg.Wait()
}
  1. 时刻0ms:请求A获取10个token,需要等1000ms,桶中tokens = -10
  2. 时刻100ms:请求B获取2个token,需要等1200ms,桶中tokens = -11
  3. 时刻200ms:请求A取消,往桶中归还8个token,桶中tokens = -2
  4. 时刻400ms:理想情况下,此时请求B可以执行,因为桶中容量够了
  5. 时刻1.2s:实际上,请求B此时才可以执行
显示全文