adaptive_ratelimit.go (3534B)
1 package retry 2 3 import ( 4 "math" 5 "sync" 6 "time" 7 8 "github.com/aws/aws-sdk-go-v2/internal/sdk" 9 ) 10 11 type adaptiveRateLimit struct { 12 tokenBucketEnabled bool 13 14 smooth float64 15 beta float64 16 scaleConstant float64 17 minFillRate float64 18 19 fillRate float64 20 calculatedRate float64 21 lastRefilled time.Time 22 measuredTxRate float64 23 lastTxRateBucket float64 24 requestCount int64 25 lastMaxRate float64 26 lastThrottleTime time.Time 27 timeWindow float64 28 29 tokenBucket *adaptiveTokenBucket 30 31 mu sync.Mutex 32 } 33 34 func newAdaptiveRateLimit() *adaptiveRateLimit { 35 now := sdk.NowTime() 36 return &adaptiveRateLimit{ 37 smooth: 0.8, 38 beta: 0.7, 39 scaleConstant: 0.4, 40 41 minFillRate: 0.5, 42 43 lastTxRateBucket: math.Floor(timeFloat64Seconds(now)), 44 lastThrottleTime: now, 45 46 tokenBucket: newAdaptiveTokenBucket(0), 47 } 48 } 49 50 func (a *adaptiveRateLimit) Enable(v bool) { 51 a.mu.Lock() 52 defer a.mu.Unlock() 53 54 a.tokenBucketEnabled = v 55 } 56 57 func (a *adaptiveRateLimit) AcquireToken(amount uint) ( 58 tokenAcquired bool, waitTryAgain time.Duration, 59 ) { 60 a.mu.Lock() 61 defer a.mu.Unlock() 62 63 if !a.tokenBucketEnabled { 64 return true, 0 65 } 66 67 a.tokenBucketRefill() 68 69 available, ok := a.tokenBucket.Retrieve(float64(amount)) 70 if !ok { 71 waitDur := float64Seconds((float64(amount) - available) / a.fillRate) 72 return false, waitDur 73 } 74 75 return true, 0 76 } 77 78 func (a *adaptiveRateLimit) Update(throttled bool) { 79 a.mu.Lock() 80 defer a.mu.Unlock() 81 82 a.updateMeasuredRate() 83 84 if throttled { 85 rateToUse := a.measuredTxRate 86 if a.tokenBucketEnabled { 87 rateToUse = math.Min(a.measuredTxRate, a.fillRate) 88 } 89 90 a.lastMaxRate = rateToUse 91 a.calculateTimeWindow() 92 a.lastThrottleTime = sdk.NowTime() 93 a.calculatedRate = a.cubicThrottle(rateToUse) 94 a.tokenBucketEnabled = true 95 } else { 96 a.calculateTimeWindow() 97 a.calculatedRate = a.cubicSuccess(sdk.NowTime()) 98 } 99 100 newRate := math.Min(a.calculatedRate, 2*a.measuredTxRate) 101 a.tokenBucketUpdateRate(newRate) 102 } 103 104 func (a *adaptiveRateLimit) cubicSuccess(t time.Time) float64 { 105 dt := secondsFloat64(t.Sub(a.lastThrottleTime)) 106 return (a.scaleConstant * math.Pow(dt-a.timeWindow, 3)) + a.lastMaxRate 107 } 108 109 func (a *adaptiveRateLimit) cubicThrottle(rateToUse float64) float64 { 110 return rateToUse * a.beta 111 } 112 113 func (a *adaptiveRateLimit) calculateTimeWindow() { 114 a.timeWindow = math.Pow((a.lastMaxRate*(1.-a.beta))/a.scaleConstant, 1./3.) 115 } 116 117 func (a *adaptiveRateLimit) tokenBucketUpdateRate(newRPS float64) { 118 a.tokenBucketRefill() 119 a.fillRate = math.Max(newRPS, a.minFillRate) 120 a.tokenBucket.Resize(newRPS) 121 } 122 123 func (a *adaptiveRateLimit) updateMeasuredRate() { 124 now := sdk.NowTime() 125 timeBucket := math.Floor(timeFloat64Seconds(now)*2.) / 2. 126 a.requestCount++ 127 128 if timeBucket > a.lastTxRateBucket { 129 currentRate := float64(a.requestCount) / (timeBucket - a.lastTxRateBucket) 130 a.measuredTxRate = (currentRate * a.smooth) + (a.measuredTxRate * (1. - a.smooth)) 131 a.requestCount = 0 132 a.lastTxRateBucket = timeBucket 133 } 134 } 135 136 func (a *adaptiveRateLimit) tokenBucketRefill() { 137 now := sdk.NowTime() 138 if a.lastRefilled.IsZero() { 139 a.lastRefilled = now 140 return 141 } 142 143 fillAmount := secondsFloat64(now.Sub(a.lastRefilled)) * a.fillRate 144 a.tokenBucket.Refund(fillAmount) 145 a.lastRefilled = now 146 } 147 148 func float64Seconds(v float64) time.Duration { 149 return time.Duration(v * float64(time.Second)) 150 } 151 152 func secondsFloat64(v time.Duration) float64 { 153 return float64(v) / float64(time.Second) 154 } 155 156 func timeFloat64Seconds(v time.Time) float64 { 157 return float64(v.UnixNano()) / float64(time.Second) 158 }