src

Go monorepo.
git clone git://code.dwrz.net/src
Log | Files | Refs

adaptive_ratelimit.go (3534B)


      1 package retry
      2 
      3 import (
      4 	"math"
      5 	"sync"
      6 	"time"
      7 
      8 	"github.com/aws/aws-sdk-go-v2/internal/sdk"
      9 )
     10 
     11 type adaptiveRateLimit struct {
     12 	tokenBucketEnabled bool
     13 
     14 	smooth        float64
     15 	beta          float64
     16 	scaleConstant float64
     17 	minFillRate   float64
     18 
     19 	fillRate         float64
     20 	calculatedRate   float64
     21 	lastRefilled     time.Time
     22 	measuredTxRate   float64
     23 	lastTxRateBucket float64
     24 	requestCount     int64
     25 	lastMaxRate      float64
     26 	lastThrottleTime time.Time
     27 	timeWindow       float64
     28 
     29 	tokenBucket *adaptiveTokenBucket
     30 
     31 	mu sync.Mutex
     32 }
     33 
     34 func newAdaptiveRateLimit() *adaptiveRateLimit {
     35 	now := sdk.NowTime()
     36 	return &adaptiveRateLimit{
     37 		smooth:        0.8,
     38 		beta:          0.7,
     39 		scaleConstant: 0.4,
     40 
     41 		minFillRate: 0.5,
     42 
     43 		lastTxRateBucket: math.Floor(timeFloat64Seconds(now)),
     44 		lastThrottleTime: now,
     45 
     46 		tokenBucket: newAdaptiveTokenBucket(0),
     47 	}
     48 }
     49 
     50 func (a *adaptiveRateLimit) Enable(v bool) {
     51 	a.mu.Lock()
     52 	defer a.mu.Unlock()
     53 
     54 	a.tokenBucketEnabled = v
     55 }
     56 
     57 func (a *adaptiveRateLimit) AcquireToken(amount uint) (
     58 	tokenAcquired bool, waitTryAgain time.Duration,
     59 ) {
     60 	a.mu.Lock()
     61 	defer a.mu.Unlock()
     62 
     63 	if !a.tokenBucketEnabled {
     64 		return true, 0
     65 	}
     66 
     67 	a.tokenBucketRefill()
     68 
     69 	available, ok := a.tokenBucket.Retrieve(float64(amount))
     70 	if !ok {
     71 		waitDur := float64Seconds((float64(amount) - available) / a.fillRate)
     72 		return false, waitDur
     73 	}
     74 
     75 	return true, 0
     76 }
     77 
     78 func (a *adaptiveRateLimit) Update(throttled bool) {
     79 	a.mu.Lock()
     80 	defer a.mu.Unlock()
     81 
     82 	a.updateMeasuredRate()
     83 
     84 	if throttled {
     85 		rateToUse := a.measuredTxRate
     86 		if a.tokenBucketEnabled {
     87 			rateToUse = math.Min(a.measuredTxRate, a.fillRate)
     88 		}
     89 
     90 		a.lastMaxRate = rateToUse
     91 		a.calculateTimeWindow()
     92 		a.lastThrottleTime = sdk.NowTime()
     93 		a.calculatedRate = a.cubicThrottle(rateToUse)
     94 		a.tokenBucketEnabled = true
     95 	} else {
     96 		a.calculateTimeWindow()
     97 		a.calculatedRate = a.cubicSuccess(sdk.NowTime())
     98 	}
     99 
    100 	newRate := math.Min(a.calculatedRate, 2*a.measuredTxRate)
    101 	a.tokenBucketUpdateRate(newRate)
    102 }
    103 
    104 func (a *adaptiveRateLimit) cubicSuccess(t time.Time) float64 {
    105 	dt := secondsFloat64(t.Sub(a.lastThrottleTime))
    106 	return (a.scaleConstant * math.Pow(dt-a.timeWindow, 3)) + a.lastMaxRate
    107 }
    108 
    109 func (a *adaptiveRateLimit) cubicThrottle(rateToUse float64) float64 {
    110 	return rateToUse * a.beta
    111 }
    112 
    113 func (a *adaptiveRateLimit) calculateTimeWindow() {
    114 	a.timeWindow = math.Pow((a.lastMaxRate*(1.-a.beta))/a.scaleConstant, 1./3.)
    115 }
    116 
    117 func (a *adaptiveRateLimit) tokenBucketUpdateRate(newRPS float64) {
    118 	a.tokenBucketRefill()
    119 	a.fillRate = math.Max(newRPS, a.minFillRate)
    120 	a.tokenBucket.Resize(newRPS)
    121 }
    122 
    123 func (a *adaptiveRateLimit) updateMeasuredRate() {
    124 	now := sdk.NowTime()
    125 	timeBucket := math.Floor(timeFloat64Seconds(now)*2.) / 2.
    126 	a.requestCount++
    127 
    128 	if timeBucket > a.lastTxRateBucket {
    129 		currentRate := float64(a.requestCount) / (timeBucket - a.lastTxRateBucket)
    130 		a.measuredTxRate = (currentRate * a.smooth) + (a.measuredTxRate * (1. - a.smooth))
    131 		a.requestCount = 0
    132 		a.lastTxRateBucket = timeBucket
    133 	}
    134 }
    135 
    136 func (a *adaptiveRateLimit) tokenBucketRefill() {
    137 	now := sdk.NowTime()
    138 	if a.lastRefilled.IsZero() {
    139 		a.lastRefilled = now
    140 		return
    141 	}
    142 
    143 	fillAmount := secondsFloat64(now.Sub(a.lastRefilled)) * a.fillRate
    144 	a.tokenBucket.Refund(fillAmount)
    145 	a.lastRefilled = now
    146 }
    147 
    148 func float64Seconds(v float64) time.Duration {
    149 	return time.Duration(v * float64(time.Second))
    150 }
    151 
    152 func secondsFloat64(v time.Duration) float64 {
    153 	return float64(v) / float64(time.Second)
    154 }
    155 
    156 func timeFloat64Seconds(v time.Time) float64 {
    157 	return float64(v.UnixNano()) / float64(time.Second)
    158 }