限流器是后台服务中的非常重要的组件,可以用来限制请求速率,保护服务,以免服务过载。
限流器的实现方法有很多种,例如滑动窗口法、Token Bucket、Leaky Bucket等。
其实golang标准库中就自带了限流算法的实现,即golang.org/x/time/rate。
该限流器是基于Token Bucket(令牌桶)实现的。
简单来说,令牌桶就是想象有一个固定大小的桶,系统会以恒定速率向桶中放Token,桶满则暂时不放。
而用户则从桶中取Token,如果有剩余Token就可以一直取。如果没有剩余Token,则需要等到系统中被放置了Token才行。
构造一个限流器
我们可以使用以下方法构造一个限流器对象:
limiter := NewLimiter(10, 1);
这里有两个参数:
第一个参数是r Limit。代表每秒可以向Token桶中产生多少token。Limit实际上是float64的别名。
第二个参数是b int。b代表Token桶的容量大小。
那么,对于以上例子来说,其构造出的限流器含义为,其令牌桶大小为1, 以每秒10个Token的速率向桶中放置Token。
除了直接指定每秒产生的Token个数外,还可以用Every方法来指定向Token桶中放置Token的间隔,例如:
limit := Every(100 * time.Millisecond);
limiter := NewLimiter(limit, 1);
以上就表示每100ms往桶中放一个Token。本质上也就是一秒钟产生10个。
Limiter提供了三类方法供用户消费Token,用户可以每次消费一个Token,也可以一次性消费多个Token。
而每种方法代表了当Token不足时,各自不同的对应手段。
Wait/WaitN
func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
Wait实际上就是WaitN(ctx,1)。
当使用Wait方法消费Token时,如果此时桶内Token数组不足(小于N),那么Wait方法将会阻塞一段时间,直至Token满足条件。如果充足则直接返回。
这里可以看到,Wait方法有一个context参数。
我们可以设置context的Deadline或者Timeout,来决定此次Wait的最长时间。
Allow/AllowN
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool
Allow实际上就是AllowN(time.Now(),1)。
AllowN方法表示,截止到某一时刻,目前桶中数目是否至少为n个,满足则返回true,同时从桶中消费n个token。
反之返回不消费Token,false。
通常对应这样的线上场景,如果请求速率过快,就直接丢到某些请求。
Reserve/ReserveN
func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
Reserve相当于ReserveN(time.Now(), 1)。
ReserveN的用法就相对来说复杂一些,当调用完成后,无论Token是否充足,都会返回一个Reservation*对象。
你可以调用该对象的Delay()方法,该方法返回了需要等待的时间。如果等待时间为0,则说明不用等待。
必须等到等待时间之后,才能进行接下来的工作。
或者,如果不想等待,可以调用Cancel()方法,该方法会将Token归还。
举一个简单的例子,我们可以这么使用Reserve方法。
r := lim.Reserve()
f !r.OK() {
// Not allowed to act! Did you remember to set lim.burst to be > 0 ?
return
}
time.Sleep(r.Delay())
Act() // 执行相关逻辑
动态调整速率
Limiter支持可以调整速率和桶大小:
SetLimit(Limit) 改变放入Token的速率
SetBurst(int) 改变Token桶大小
有了这两个方法,可以根据现有环境和条件,根据我们的需求,动态的改变Token桶大小和速率。
ber在Github上开源了一套用于服务限流的go语言库ratelimit, 该组件基于Leaky Bucket(漏桶)实现。
我在之前写过《Golang限流器time/rate实现剖析》,讲了Golang标准库中提供的基于Token Bucket实现限流组件的time/rate原理,同时也讲了限流的一些背景。
相比于TokenBucket,只要桶内还有剩余令牌,调用方就可以一直消费。而Leaky Bucket相对来说比较严格,调用方只能严格按照这个间隔顺序进行消费调用。(实际上,uber-go对这个限制也做了一些优化,具体可以看下文详解)
还是老规矩,在正式讲其实现之前,我们先看下ratelimit的使用方法。
ratelimit的使用
我们直接看下uber-go官方库给的例子:
rl := ratelimit.New(100) // per second
prev := time.Now()
for i := 0; i < 10; i++ {
now := rl.Take()
fmt.Println(i, now.Sub(prev))
prev = now
}
在这个例子中,我们给定限流器每秒可以通过100个请求,也就是平均每个请求间隔10ms。
因此,最终会每10ms打印一行数据。输出结果如下:
// Output:
// 0 0
// 1 10ms
// 2 10ms
// 3 10ms
// 4 10ms
// 5 10ms
// 6 10ms
// 7 10ms
// 8 10ms
// 9 10ms
基本实现
要实现以上每秒固定速率的目的,其实还是比较简单的。
在ratelimit的New函数中,传入的参数是每秒允许请求量(RPS)。
我们可以很轻易的换算出每个请求之间的间隔:
limiter.perRequest = time.Second / time.Duration(rate)
以上limiter.perRequest指的就是每个请求之间的间隔时间。
如下图,当请求1处理结束后, 我们记录下请求1的处理完成的时刻, 记为limiter.last。
稍后请求2到来, 如果此刻的时间与limiter.last相比并没有达到perRequest的间隔大小,那么sleep一段时间即可。
漏桶示例图
对应ratelimit的实现代码如下:
sleepFor = t.perRequest - now.Sub(t.last)
if sleepFor > 0 {
t.clock.Sleep(sleepFor)
t.last = now.Add(sleepFor)
} else {
t.last = now
}
最大松弛量
我们讲到,传统的Leaky Bucket,每个请求的间隔是固定的,然而,在实际上的互联网应用中,流量经常是突发性的。对于这种情况,uber-go对Leaky Bucket做了一些改良,引入了最大松弛量(maxSlack)的概念。
我们先理解下整体背景: 假如我们要求每秒限定100个请求,平均每个请求间隔10ms。但是实际情况下,有些请求间隔比较长,有些请求间隔比较短。如下图所示:
请求1完成后,15ms后,请求2才到来,可以对请求2立即处理。请求2完成后,5ms后,请求3到来,这个时候距离上次请求还不足10ms,因此还需要等待5ms。
但是,对于这种情况,实际上三个请求一共消耗了25ms才完成,并不是预期的20ms。在uber-go实现的ratelimit中,可以把之前间隔比较长的请求的时间,匀给后面的使用,保证每秒请求数(RPS)即可。
对于以上case,因为请求2相当于多等了5ms,我们可以把这5ms移给请求3使用。加上请求3本身就是5ms之后过来的,一共刚好10ms,所以请求3无需等待,直接可以处理。此时三个请求也恰好一共是20ms。
如下图所示:
在ratelimit的对应实现中很简单,是把每个请求多余出来的等待时间累加起来,以给后面的抵消使用。
t.sleepFor += t.perRequest - now.Sub(t.last)
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}
注意:这里跟上述代码不同的是,这里是+=。而同时t.perRequest - now.Sub(t.last)是可能为负值的,负值代表请求间隔时间比预期的长。
当t.sleepFor > 0,代表此前的请求多余出来的时间,无法完全抵消此次的所需量,因此需要sleep相应时间, 同时将t.sleepFor置为0。
当t.sleepFor < 0,说明此次请求间隔大于预期间隔,将多出来的时间累加到t.sleepFor即可。
但是,对于某种情况,请求1完成后,请求2过了很久到达(好几个小时都有可能),那么此时对于请求2的请求间隔now.Sub(t.last),会非常大。以至于即使后面大量请求瞬时到达,也无法抵消完这个时间。那这样就失去了限流的意义。
为了防止这种情况,ratelimit就引入了最大松弛量(maxSlack)的概念, 该值为负值,表示允许抵消的最长时间,防止以上情况的出现。
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}
ratelimit中maxSlack的值为-10 * time.Second / time.Duration(rate), 是十个请求的间隔大小。我们也可以理解为ratelimit允许的最大瞬时请求为10。
高级用法
ratelimit的New函数,除了可以配置每秒请求数(QPS), 其实还提供了一套可选配置项Option。
func New(rate int, opts …Option) Limiter
Option的类型为type Option func(l *limiter), 也就是说我们可以提供一些这样类型的函数,作为Option,传给ratelimit, 定制相关需求。
但实际上,自定义Option的用处比较小,因为limiter结构体本身就是个私有类型,我们并不能拿它做任何事情。
我们只需要了解ratelimit目前提供的两个配置项即可:
WithoutSlack
我们上文讲到ratelimit中引入了最大松弛量的概念,而且默认的最大松弛量为10个请求的间隔时间。
但是确实会有这样需求场景,需要严格的限制请求的固定间隔。那么我们就可以利用WithoutSlack来取消松弛量的影响。
limiter := ratelimit.New(100, ratelimit.WithoutSlack)
WithClock(clock Clock)
我们上文讲到,ratelimit的实现时,会计算当前时间与上次请求时间的差值,并sleep相应时间。
在ratelimit基于go标准库的time实现时间相关计算。如果有精度更高或者特殊需求的计时场景,可以用WithClock来替换默认时钟。
通过该方法,只要实现了Clock的interface,就可以自定义时钟了。
type Clock interface {
Now() time.Time
Sleep(time.Duration)
}
clock &= MyClock{}
limiter := ratelimit.New(100, ratelimit.WithClock(clock))
限流器是微服务中必不缺少的一环,可以起到保护下游服务,防止服务过载等作用。上一篇文章《Golang限流器time/rate使用介绍》简单介绍了time/rate的使用方法,本文则着重分析下其实现原理。建议在正式阅读本文之前,先阅读下上一篇文章。
上一篇文章讲到,time/rate是基于Token Bucket(令牌桶)算法实现的限流。本文将会基于源码,深入剖析下Golang是如何实现Token Bucket的。其代码也非常简洁,去除注释后,也就200行左右的代码量。
同时,我也提供了time/rate注释版,辅助大家理解该组件的实现。
背景
简单来说,令牌桶就是想象有一个固定大小的桶,系统会以恒定速率向桶中放Token,桶满则暂时不放。
而用户则从桶中取Token,如果有剩余Token就可以一直取。如果没有剩余Token,则需要等到系统中被放置了Token才行。
一般介绍Token Bucket的时候,都会有一张这样的原理图:
Token Bucket原理图
从这个图中看起来,似乎令牌桶实现应该是这样的:
有一个Timer和一个BlockingQueue。Timer固定的往BlockingQueue中放token。用户则从BlockingQueue中取数据。
这固然是Token Bucket的一种实现方式,这么做也非常直观,但是效率太低了:我们需要不仅多维护一个Timer和BlockingQueue,而且还耗费了一些不必要的内存。
在Golang的timer/rate中的实现, 并没有单独维护一个Timer,而是采用了lazyload的方式,直到每次消费之前才根据时间差更新Token数目,而且也不是用BlockingQueue来存放Token,而是仅仅通过计数的方式。
Token的生成和消费
我们在上一篇文章中讲到,Token的消费方式有三种。但其实在内部实现,最终三种消费方式都调用了reserveN函数来生成和消费Token。
我们看下reserveN函数的具体实现,整个过程非常简单。在正式讲之前,我们先了解一个简单的概念:
在time/rate中,NewLimiter的第一个参数是速率limit,代表了一秒钟可以产生多少Token。
那么简单换算一下,我们就可以知道一个Token的生成间隔是多少。
有了这个生成间隔,我们就可以轻易地得到两个数据:
那么,有了这些转换函数,整个过程就很清晰了,如下:
计算从上次取Token的时间到当前时刻,期间一共新产生了多少Token:
我们只在取Token之前生成新的Token,也就意味着每次取Token的间隔,实际上也是生成Token的间隔。我们可以利用tokensFromDuration, 轻易的算出这段时间一共产生Token的数目。
那么,当前Token数目 = 新产生的Token数目 + 之前剩余的Token数目 - 要消费的Token数目。
如果消费后剩余Token数目大于零,说明此时Token桶内仍不为空,此时Token充足,无需调用侧等待。
如果Token数目小于零,则需等待一段时间。
那么这个时候,我们可以利用durationFromTokens将当前负值的Token数转化为需要等待的时间。
将需要等待的时间等相关结果返回给调用方。
从上面可以看出,其实整个过程就是利用了Token数可以和时间相互转化的原理。而如果Token数为负,则需要等待相应时间即可。
注意:如果当消费时,Token桶中的Token数目已经为负值了,依然可以按照上述流程进行消费。随着负值越来越小,等待的时间将会越来越长。
从结果来看,这个行为跟用Timer+BlockQueue实现是一样的。
此外,整个过程为了保证线程安全,更新令牌桶相关数据时都用了mutex加锁。
对于Allow函数实现时,只要判断需要等待的时间是否为0即可,如果大于0说明需要等待,则返回False,反之返回True。
对于Wait函数,直接t := time.NewTimer(delay),等待对应的时间即可。
float精度问题
从上面原理讲述可以看出,在Token和时间的相互转化函数durationFromTokens和tokensFromDuration中,涉及到float64的乘除运算。
一谈到float的乘除,我们就需要小心精度问题了。
而Golang在这里也踩了坑,以下是tokensFromDuration最初的实现版本
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
return d.Seconds() * float64(limit)
}
这个操作看起来一点问题都没:每秒生成的Token数乘于秒数。
然而,这里的问题在于,d.Seconds()已经是小数了。两个小数相乘,会带来精度的损失。
所以就有了这个issue:golang.org/issues/34861。
修改后新的版本如下:
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
sec := float64(d/time.Second) * float64(limit)
nsec := float64(d%time.Second) * float64(limit)
return sec + nsec/1e9
}
time.Duration是int64的别名,代表纳秒。分别求出秒的整数部分和小数部分,进行相乘后再相加,这样可以得到最精确的精度。
数值溢出问题
我们讲reserveN函数的具体实现时,第一步就是计算从当前时间到上次取Token的时刻,期间一共新产生了多少Token,同时也可得出当前的Token是多少。
我最开始的理解是,直接可以这么做:
// elapsed表示过去的时间差
elapsed := now.Sub(lim.last)
// delta表示这段时间一共新产生了多少Token
delta = tokensFromDuration(now.Sub(lim.last))
tokens := lim.tokens + delta
if(token > lim.burst){
token = lim.burst
}
其中,lim.tokens是当前剩余的Token,lim.last是上次取token的时刻。lim.burst是Token桶的大小。
使用tokensFromDuration计算出新生成了多少Token,累加起来后,不能超过桶的容量即可。
这么做看起来也没什么问题,然而并不是这样。
在time/rate里面是这么做的,如下代码所示:
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
if elapsed > maxElapsed {
elapsed = maxElapsed
}
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
与我们最开始的代码不一样的是,它没有直接用now.Sub(lim.last)来转化为对应的Token数,而是
先用lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens),计算把桶填满的时间maxElapsed。
取elapsed和maxElapsed的最小值。
这么做算出的结果肯定是正确的,但是这么做相比于我们的做法,好处在哪里?
对于我们的代码,当last非常小的时候(或者当其为初始值0的时候),此时now.Sub(lim.last)的值就会非常大,如果lim.limit即每秒生成的Token数目也非常大时,直接将二者进行乘法运算,结果有可能会溢出。
因此,time/rate先计算了把桶填满的时间,将其作为时间差值的上限,这样就规避了溢出的问题。
Token的归还
而对于Reserve函数,返回的结果中,我们可以通过Reservation.Delay()函数,得到需要等待时间。
同时调用方可以根据返回条件和现有情况,可以调用Reservation.Cancel()函数,取消此次消费。
当调用Cancel()函数时,消费的Token数将会尽可能归还给Token桶。
此外,我们在上一篇文章中讲到,Wait函数可以通过Context进行取消或者超时等,
当通过Context进行取消或超时时,此时消费的Token数也会归还给Token桶。
然而,归还Token的时候,并不是简单的将Token数直接累加到现有Token桶的数目上,这里还有一些注意点:
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
if restoreTokens <= 0 {
return
}
以上代码就是计算需要归还多少的Token。其中:
r.tokens指的是本次消费的Token数
r.timeToAct指的是Token桶可以满足本次消费数目的时刻,也就是消费的时刻+等待的时长。
r.lim.lastEvent指的是最近一次消费的timeToAct值
其中:r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) 指的是,从该次消费到当前时间,一共又新消费了多少Token数目。
根据代码来看,要归还的Token要是该次消费的Token减去新消费的Token。
不过这里我还没有想明白,为什么归还的时候,要减去新消费数目。
按照我的理解,直接归还全部Token数目,这样对于下一次消费是无感知影响的。这块的具体原因还需要进一步探索。
总结
Token Bucket其实非常适合互联网突发式请求的场景,其请求Token时并不是严格的限制为固定的速率,而是中间有一个桶作为缓冲。
只要桶中还有Token,请求就还可以一直进行。当突发量激增到一定程度,则才会按照预定速率进行消费。
此外在维基百科中,也提到了分层Token Bucket(HTB)作为传统Token Bucket的进一步优化,Linux内核中也用它进行流量控制。