根据的分析,令牌桶限流的实现没必要真的准备一个桶,定时往里塞令牌,然后每次获取令牌时从桶中弹出,但这样做对内存不友好,需要开辟桶最大容量大小的空间
最佳做法是利用 token 数可以和时间跨度相互转化的原理,只用维护一个桶中当前令牌数的变量,每次消费前才根据时间差计算并更新 Token 数目
本文将分析go官方库提供的限流工具的使用及实现细节,也是令牌桶算法。并对下面这个问题给出合理的解释:在CancelAt方法中,为啥归还令牌时不是归还所有令牌,而是要扣减一个值
Wait/WaitN:当没有足够的Token时,将在内部阻塞等待直到Token足够,或超时取消
ctx
,和ctx配合能优雅实现超时控制Allow/AllowN:当前时刻没有足够的Token时,返回false。否则获取到令牌,返回成功
Reverse/ReverseN:当没有足够的Token时,返回 Reservation 对象,里面包含是否获取成功,以及要等多久才能放行请求
type Limit float64
type Limiter struct {
mu sync.Mutex
// 每秒产生多少个token
limit Limit
// 桶大小
burst int
// 桶中剩余的token数,可以为负数,表示被某些请求预定了一些未来的令牌
tokens float64
// 最近一次推进令牌桶的时间
last time.Time
// 令牌可以满足最近一次请求的时间
lastEvent time.Time
}
特别说明这两个字段:
last:表示最近一次推进令牌桶的时间,也就是每次调获取令牌api的时间,因此每次获取令牌都会推进令牌桶的时间
lastEvent:表示令牌桶中最近一次请求能放行的时间。如果当前时间调获取令牌api时就能满足,就是当前时间,否则是未来的某个时间
计算在时长d内,可以生成多少个令牌
也就是 速度 * 时间 = 总量
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
if limit <= 0 {
return 0
}
return d.Seconds() * float64(limit)
}
计算生成tokens个令牌,需要多少时间
也就是 总数 / 速度 = 时间
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
if limit <= 0 {
return InfDuration
}
seconds := tokens / float64(limit)
return time.Duration(float64(time.Second) * seconds)
}
计算如果时间推进到t后,桶中可用的令牌个数
注意:该方法只计算,不会更新桶中的令牌数
func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {
last := lim.last
if t.Before(last) {
last = t
}
// 过去了多长时间
elapsed := t.Sub(last)
// 计算过去这段时间,一共产生多少token
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
// tokens不能超过最大容量
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return t, tokens
}
Wait系列,Allow系列,Reserve系列的方法,底层都调了reverseN,其流程如下:
注意如果桶中令牌数不够,需要等待直到桶中令牌产生为止
此时需要将桶中令牌数置为负值,表示有请求欠了账,占了位
如果接下来有别的请求来获取令牌,会在之前欠账的基础上继续欠账,让令牌数的负值更大,需要等待更长的时间
通过时间流逝来还这些债,这一点和另一个令牌桶很类似
// 当前时t时刻,想获取n个令牌,最多等maxFutureReserve时间
func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
defer lim.mu.Unlock()
// 可以忽略这个分支,因为用了限流器就肯定要有速率限制
if lim.limit == Inf {
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: t,
}
}
// 如果时间推进到now,可用的 token 数
t, tokens := lim.advance(t)
// 本次消费后,桶还能剩能下多少token
tokens -= float64(n)
// 如果 token < 0, 说明目前的token不够,需要等待一段时间
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// n <= lim.burst:申请的 token 没有超过了桶的大小
// waitDuration <= maxFutureReserve: 需要等待的时间 <=用户期望的时间
ok := n <= lim.burst && waitDuration <= maxFutureReserve
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
// 能获取到令牌
if ok {
// 本次获取了多少token
r.tokens = n
// 表示需要等待到这个时刻才能获得期望数量的token(当然 waitDuration 有可能为 0,就是立即满足,timeToAct就是now)
r.timeToAct = t.Add(waitDuration)
// 更新推进令牌桶的时间
lim.last = t
// 就扣减令牌,更新桶中token的数量
lim.tokens = tokens
// 桶中可以满足最近一次请求的时间
lim.lastEvent = r.timeToAct
}
return r
}
allowN:调reserveN时传maxFutureReserve=0,表示只有不等待就能获得时,才获取令牌
func (lim *Limiter) AllowN(t time.Time, n int) bool {
return lim.reserveN(t, n, 0).ok
}
ReserveN:直接调reserveN
func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation {
r := lim.reserveN(t, n, InfDuration)
return &r
}
WaitN:根据ctx计算要获取令牌最多等待多久,并以这个时间最为最大等待时间调reverseN。然后在内部sleep
先检查ctx是否已取消,如果已取消直接返回
根据ctx还剩多久过期,来决定在reverseN中最多等待多久
获取到Reservation后,看 1)根据Reservation计算还有多久能获取令牌的时间delay,和2) ctx被取消,这两件事谁先发生:
正常来说,只要不是外部主动取消,都是delay先过完
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
// 其实就是:time.NewTimer(d)
newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {
timer := time.NewTimer(d)
return timer.C, timer.Stop, func() {}
}
return lim.wait(ctx, n, time.Now(), newTimer)
}
func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {
lim.mu.Lock()
burst := lim.burst
limit := lim.limit
lim.mu.Unlock()
// 不能超过桶容量
if n > burst && limit != Inf {
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
}
// 先检查下ctx是否已取消
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// 获取令牌的最大等待时间根据ctx计算
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(t)
}
r := lim.reserveN(t, n, waitLimit)
if !r.ok {
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// 该Reservation还需要等多久才能获得令牌
delay := r.DelayFrom(t)
if delay == 0 {
return nil
}
ch, stop, _ := newTimer(delay)
defer stop()
select {
case <-ch:
// ctx超时或被取消之前,就获得令牌了
return nil
case <-ctx.Done():
// 在获得令牌之前,ctx就被取消了,需要归还令牌到桶中
r.Cancel()
return ctx.Err()
}
}
最后看看CancelAt
:取消本次持有,尽可能归还持有的令牌
为啥需要归还令牌?因此本次实际上没有放行请求,而是被cancel掉了,可以把令牌放回桶中,供接下来的请求使用
// 参数t一般是当前时间
func (r *Reservation) CancelAt(t time.Time) {
if !r.ok {
return
}
r.lim.mu.Lock()
defer r.lim.mu.Unlock()
/**
1.如果产生令牌的速率无限大,那没必要归还令牌
2.自己没有拿到token,也不用归还
3.预期获得时间比当前时间早,这里认为不用归还
*/
if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(t) {
return
}
// 如果只有有更新的请求请求了令牌,要归还的令牌数减去
// r.lim.lastEvent.Sub(r.timeToAct) 这期间产生的令牌
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
if restoreTokens <= 0 {
return
}
// 推进桶的时间
t, tokens := r.lim.advance(t)
tokens += restoreTokens
if burst := float64(r.lim.burst); tokens > burst {
tokens = burst
}
r.lim.last = t
// 归还token
r.lim.tokens = tokens
// 如果相等,说明后面没有新的token消费
if r.timeToAct == r.lim.lastEvent {
// 上一次的lastEvent
prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
if !prevEvent.Before(t) {
// 恢复lastEvent为上一次的lastEvent
r.lim.lastEvent = prevEvent
}
}
}
其中比较费解的是这一行:为啥要多减去这一部分
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
r.TimeToAct
:本次Reservation可以放行请求的时间,也是能使用令牌的时间r.lim.lastEvent
:令牌桶中,最近一次请求的TimeToAct
在归还令牌时,要减掉这段时间范围能产生的令牌数
对应于下面这个场景:
这行代码有注释:
// calculate tokens to restore
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
// after r was obtained. These tokens should not be restored.
但只是解释了在干啥:在r之后获取的令牌不应该被重新放回桶中
而没有解释为啥要这么干
有人,归还所有持有的token是否应该改成restoreTokens = float64(r.tokens)
也有人,举了这样一个例子:
func TestLimit(t *testing.T) {
t0 := time.Now()
t1 := time.Now().Add(100 * time.Millisecond)
t2 := time.Now().Add(200 * time.Millisecond)
t3 := time.Now().Add(300 * time.Millisecond)
// 桶每秒产生10个令牌,容量=20
l := NewLimiter(10, 20)
l.ReserveN(t0, 15)
// 桶里还剩 5 个 token
fmt.Printf("%+v\n", l)
// t1:时间过去100ms,桶里增加1个,变成6个
// 消费10个,变成-4个
r := l.ReserveN(t1, 10)
// 此时timeToAct:500ms
fmt.Printf("%+v\n", l)
// t2:时间过去100ms,桶里增加1个,变成-3个
// 消费2个, 桶里变成-5个
l.ReserveN(t2, 2)
// 此时timeToAct:700ms
fmt.Printf("%+v\n", l)
// t3: 时间过去100ms,桶里增加1个,变成-4个
// r在t3取消,变成4(归还8个)个,按常理是变成6(归还10个)个
r.CancelAt(t3)
fmt.Printf("%+v\n", l)
}
最后一行:
有人在issue下面给出解释:
意思就是:如果把保留的令牌数全部还回去的话,会造成在某一时刻突破令牌桶的速率限制
拿上面例子说明:
如果在t3时刻取消r时,如果把所有的令牌都归还了,也就是归还10个,此时桶中的令牌数变成6
那么再过400ms,在t2时刻获取的Reservation r2就可以开始执行,同时现在桶中有10个令牌!
如果此时有请求来获取这10个令牌,算上刚刚开始执行的r2,那就相当于同时有加起来需要消耗12个令牌
的请求在执行!超过了最大容量10的限制
而按照源码中的逻辑扣减,在t3时刻取消r时,只归还8个,此时桶中的令牌数变成4
那么再过400ms,在t2时刻获取的Reservation r2就可以开始执行,同时现在桶中有8个令牌
如果此时有请求来获取这8个令牌,算上刚刚开始执行的r2,那就会同时加起来需要消耗10个令牌
的请求在执行,刚好不超过桶中的最大容量的限制
也就是说,如果r后面有更新的请求rnew,rnew.TimeToAct
赋值给了r.limit.lastEvent
那么从r.TimeToAct
到r.limit.lastEvent
之间能产生的令牌是不能归还的,而是要等时间流逝自然填充
这样当rnew可以执行时,桶中的令牌数 + rnew获取的令牌数才不会超过桶的容量
这个库有个小问题:如果请求A在请求B之前发生,都需要等待一段时间。如果任务A提前cancel了,使得桶中的令牌满足请求B的需求,请求B也不会被唤醒,而是等待其预定的时间到了再执行
例如下面的例子:
func TestBug(t *testing.T) {
// 每100ms产生一个令牌,最大容量=10
l := NewLimiter(10, 10)
t1 := time.Now()
// 先把桶中的初始令牌用完
l.ReserveN(t1, 10)
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(context.TODO(), time.Hour)
defer cancel()
wg.Add(1)
wg.Add(2)
go func() {
defer wg.Done()
// 如果要等,这个要等 1s才能执行,但是我们的 ctx 200ms 就会取消
l.WaitN(ctx, 10)
fmt.Printf("[1] cost: %s\n", time.Since(t1))
}()
go func() {
defer wg.Done()
// 模拟出现问题, 200ms就取消了
time.Sleep(200 * time.Millisecond)
cancel()
}()
time.Sleep(100 * time.Millisecond)
go func() {
defer wg.Done()
// 正常情况下,这个要等 1.2 s 才能执行,
// 但是我们前面都取消了,按理说在400ms时,桶中令牌数就够了,这个可以执行
// 但实际上没有唤醒,等到1.2才执行
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()
l.WaitN(ctx, 2)
fmt.Printf("[2] cost: %s\n", time.Since(t1))
}()
wg.Wait()
}
tokens = -10
tokens = -11
tokens = -2