電腦系統(tǒng)做的好的網(wǎng)站百度app客服電話
背景
服務(wù)請求下游,oom,排查下來發(fā)現(xiàn)是一個下游組件qps陡增導(dǎo)致
但是司內(nèi)網(wǎng)絡(luò)框架比較挫,竟然不負(fù)責(zé)框架內(nèi)存問題(有內(nèi)存管理模塊,但邏輯又是無限制使用內(nèi)存)
每個請求一個r、w buffer,請求無限制,內(nèi)存不夠直接就oom,然后就被linux給遷移掉了
所以才有了加限流的必要性(粉飾太平)
所以站在更高維度去考慮這個問題,就變成了一個網(wǎng)絡(luò)框架是否要去管理內(nèi)存?
作用
限制請求速率,保護服務(wù),以免服務(wù)過載
常用的限流方法
固定窗口、滑動窗口、漏桶、令牌桶
令牌桶:
一個固定大小的桶,系統(tǒng)會以恒定速率向桶中放 Token,桶滿則暫時不放
如果桶中有剩余 Token 就可以一直取,如果沒有剩余 Token,則需要等到桶中被放置 Token/直接返回失敗
本次學(xué)習(xí)令牌
golang.org/x/time/rate
代碼實現(xiàn)
// A Limiter controls how frequently events are allowed to happen.
// Limiter 用來限制發(fā)生事件的頻率
//
// It implements a "token bucket" of size b, initially full and refilled
// at rate r tokens per second.
// 初始的時候滿的,然后以每秒r tokens的速率來填充,bucket大小為b
//
// Informally, in any large enough time interval, the Limiter limits the
// rate to r tokens per second, with a maximum burst size of b events.
//
// As a special case, if r == Inf (the infinite rate), b is ignored.
// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
// r還能設(shè)置為 Inf?
//
// The zero value is a valid Limiter, but it will reject all events.
// Use NewLimiter to create non-zero Limiters.
// 還能有拒絕全部的零值Limiter?妙啊
//
// Limiter has three main methods, Allow, Reserve, and Wait.
// Most callers should use Wait.
// 三個方法:Allow, Reserve, Wait,每個方法都會消耗一個token
//
// Each of the three methods consumes a single token.
// They differ in their behavior when no token is available.
// If no token is available, Allow returns false.
// Allow:非阻塞
//
// If no token is available, Reserve returns a reservation for a future token
// and the amount of time the caller must wait before using it.
// Reserve 還能預(yù)定? 牛哇,還能返回如果一定要使用的話,要等多久
//
// If no token is available, Wait blocks until one can be obtained
// or its associated context.Context is canceled.
// Wait:阻塞
//
// The methods AllowN, ReserveN, and WaitN consume n tokens.
// AllowN、ReserveN、WaitN:消費n個token
type Limiter struct {mu sync.Mutex// 每秒的事件數(shù),即事件速率limit Limit// 單次調(diào)用(Allow, Reserve, Wait)中消費token的最大數(shù)// 更高的 Burst 的值,將會一次允許更多的事件發(fā)生(at once)burst inttokens float64// last is the last time the limiter's tokens field was updated// limiter的tokens字段的前一次被更新的事件last time.Time// lastEvent is the latest time of a rate-limited event (past or future)// 速率受限事件(past or future)的前一次時間lastEvent time.Time
}// A zero Limit allows no events.
type Limit float64
看了結(jié)構(gòu)體就知道如何設(shè)計一個很粗糙的限流器了
沒有用復(fù)雜的結(jié)構(gòu),就是一個簡單的原子計數(shù)
使用
// NewLimiter returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
// 速率r,一次b個突發(fā)
func NewLimiter(r Limit, b int) *Limiter {return &Limiter{limit: r,burst: b,}
}
NewLimiter的參數(shù)除了用Limit傳,還能傳間隔
func Every(interval time.Duration) Limit {if interval <= 0 {return Inf}return 1 / Limit(interval.Seconds())
}
Allow、AllowN、Reserve、ReserveN、Wait、WaitN
里面用的都是 reserveN
// Allow reports whether an event may happen now.
func (lim *Limiter) Allow() bool {return lim.AllowN(time.Now(), 1)
}// AllowN reports whether n events may happen at time t.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(t time.Time, n int) bool {return lim.reserveN(t, n, 0).ok
}// Reserve is shorthand for ReserveN(time.Now(), 1).
func (lim *Limiter) Reserve() *Reservation {return lim.ReserveN(time.Now(), 1)
}// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
// The Limiter takes this Reservation into account when allowing future events.
// The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size.
// Usage example:
//
// r := lim.ReserveN(time.Now(), 1)
// if !r.OK() {
// // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
// return
// }
// time.Sleep(r.Delay())
// Act()
//
// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
// If you need to respect a deadline or cancel the delay, use Wait instead.
// To drop or skip events exceeding rate limit, use Allow instead.
func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation {r := lim.reserveN(t, n, InfDuration)return &r
}// Wait is shorthand for WaitN(ctx, 1).
func (lim *Limiter) Wait(ctx context.Context) (err error) {return lim.WaitN(ctx, 1)
}// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
// The burst limit is ignored if the rate limit is Inf.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {// The test code calls lim.wait with a fake timer generator.// This is the real timer generator.newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {timer := time.NewTimer(d)return timer.C, timer.Stop, func() {}}return lim.wait(ctx, n, time.Now(), newTimer)
}// wait is the internal implementation of WaitN.
func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {lim.mu.Lock()burst := lim.burstlimit := lim.limitlim.mu.Unlock()// 在限流的情況下,一次請求索要超過 burst 的令牌if n > burst && limit != Inf {return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)}// Check if ctx is already cancelled// 用這種方式檢查ctx是否結(jié)束select {case <-ctx.Done():return ctx.Err()default:// 這里有 default,所以不會卡住}// Determine wait limitwaitLimit := InfDuration// 它竟然還照顧了ctx的Deadline,牛哇牛哇if deadline, ok := ctx.Deadline(); ok {// t是當(dāng)前時間,deadline-t就是等待時長waitLimit = deadline.Sub(t)}// Reserver := lim.reserveN(t, n, waitLimit)if !r.ok {return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)}// Wait if necessary// DelayFrom returns the duration for which the reservation holder must wait// before taking the reserved action. Zero duration means act immediately.// InfDuration means the limiter cannot grant the tokens requested in this// Reservation within the maximum wait time.// 0時長意味著立即執(zhí)行,不用等limiter// InfDuration時長意味著在此次最大的等待時間里,無法授權(quán)這么多的tokendelay := r.DelayFrom(t)if delay == 0 {return nil}// newTimer 中創(chuàng)建了一個定時器,這里用完要停止,不然系統(tǒng)中的定時器越來越多// ch:delay定時器// stop:定時器取消函數(shù)// advance:僅用于測試,設(shè)置鉤子,牛哇牛哇ch, stop, advance := newTimer(delay)defer stop()advance() // only has an effect when testing// 等待誰先來select {case <-ch:// We can proceed.return nilcase <-ctx.Done():// Context was canceled before we could proceed. Cancel the// reservation, which may permit other events to proceed sooner.r.Cancel()return ctx.Err()}
}
源碼學(xué)習(xí)
在一個快變現(xiàn)的現(xiàn)狀下,lim.reserveN都沒有心思學(xué),悲哀
lim.reserveN
問題
它是單實例還是分布式的?
在單個實例中對資源訪問或操作進行限速,屬于單實例限流
分布式限流通常涉及到跨進程或跨機器的狀態(tài)共享與同步,通常需要額外的基礎(chǔ)設(shè)施支持,比如分布式緩存(例如 Redis)或數(shù)據(jù)庫,來保持限流狀態(tài)的一致性
golang.org/x/time/rate 包并不為這些提供內(nèi)置支持
如果需要在分布式環(huán)境中實現(xiàn)限流,需要考慮使用一個中心化的存儲解決方案來同步不同節(jié)點之間的限流狀態(tài)
或者采用其他的分布式限流策略。這可能涉及到一些復(fù)雜性
因為需要管理共享狀態(tài)和處理分布式系統(tǒng)中可能出現(xiàn)的各種問題
例如網(wǎng)絡(luò)分區(qū)、延遲波動、以及同步狀態(tài)時的競態(tài)條件等