adaptive.go (5395B)
1 package retry 2 3 import ( 4 "context" 5 "fmt" 6 "time" 7 8 "github.com/aws/aws-sdk-go-v2/aws" 9 "github.com/aws/aws-sdk-go-v2/internal/sdk" 10 ) 11 12 const ( 13 // DefaultRequestCost is the cost of a single request from the adaptive 14 // rate limited token bucket. 15 DefaultRequestCost uint = 1 16 ) 17 18 // DefaultThrottles provides the set of errors considered throttle errors that 19 // are checked by default. 20 var DefaultThrottles = []IsErrorThrottle{ 21 ThrottleErrorCode{ 22 Codes: DefaultThrottleErrorCodes, 23 }, 24 } 25 26 // AdaptiveModeOptions provides the functional options for configuring the 27 // adaptive retry mode, and delay behavior. 28 type AdaptiveModeOptions struct { 29 // If the adaptive token bucket is empty, when an attempt will be made 30 // AdaptiveMode will sleep until a token is available. This can occur when 31 // attempts fail with throttle errors. Use this option to disable the sleep 32 // until token is available, and return error immediately. 33 FailOnNoAttemptTokens bool 34 35 // The cost of an attempt from the AdaptiveMode's adaptive token bucket. 36 RequestCost uint 37 38 // Set of strategies to determine if the attempt failed due to a throttle 39 // error. 40 // 41 // It is safe to append to this list in NewAdaptiveMode's functional options. 42 Throttles []IsErrorThrottle 43 44 // Set of options for standard retry mode that AdaptiveMode is built on top 45 // of. AdaptiveMode may apply its own defaults to Standard retry mode that 46 // are different than the defaults of NewStandard. Use these options to 47 // override the default options. 48 StandardOptions []func(*StandardOptions) 49 } 50 51 // AdaptiveMode provides an experimental retry strategy that expands on the 52 // Standard retry strategy, adding client attempt rate limits. The attempt rate 53 // limit is initially unrestricted, but becomes restricted when the attempt 54 // fails with for a throttle error. When restricted AdaptiveMode may need to 55 // sleep before an attempt is made, if too many throttles have been received. 56 // AdaptiveMode's sleep can be canceled with context cancel. Set 57 // AdaptiveModeOptions FailOnNoAttemptTokens to change the behavior from sleep, 58 // to fail fast. 59 // 60 // Eventually unrestricted attempt rate limit will be restored once attempts no 61 // longer are failing due to throttle errors. 62 type AdaptiveMode struct { 63 options AdaptiveModeOptions 64 throttles IsErrorThrottles 65 66 retryer aws.RetryerV2 67 rateLimit *adaptiveRateLimit 68 } 69 70 // NewAdaptiveMode returns an initialized AdaptiveMode retry strategy. 71 func NewAdaptiveMode(optFns ...func(*AdaptiveModeOptions)) *AdaptiveMode { 72 o := AdaptiveModeOptions{ 73 RequestCost: DefaultRequestCost, 74 Throttles: append([]IsErrorThrottle{}, DefaultThrottles...), 75 } 76 for _, fn := range optFns { 77 fn(&o) 78 } 79 80 return &AdaptiveMode{ 81 options: o, 82 throttles: IsErrorThrottles(o.Throttles), 83 retryer: NewStandard(o.StandardOptions...), 84 rateLimit: newAdaptiveRateLimit(), 85 } 86 } 87 88 // IsErrorRetryable returns if the failed attempt is retryable. This check 89 // should determine if the error can be retried, or if the error is 90 // terminal. 91 func (a *AdaptiveMode) IsErrorRetryable(err error) bool { 92 return a.retryer.IsErrorRetryable(err) 93 } 94 95 // MaxAttempts returns the maximum number of attempts that can be made for 96 // an attempt before failing. A value of 0 implies that the attempt should 97 // be retried until it succeeds if the errors are retryable. 98 func (a *AdaptiveMode) MaxAttempts() int { 99 return a.retryer.MaxAttempts() 100 } 101 102 // RetryDelay returns the delay that should be used before retrying the 103 // attempt. Will return error if the if the delay could not be determined. 104 func (a *AdaptiveMode) RetryDelay(attempt int, opErr error) ( 105 time.Duration, error, 106 ) { 107 return a.retryer.RetryDelay(attempt, opErr) 108 } 109 110 // GetRetryToken attempts to deduct the retry cost from the retry token pool. 111 // Returning the token release function, or error. 112 func (a *AdaptiveMode) GetRetryToken(ctx context.Context, opErr error) ( 113 releaseToken func(error) error, err error, 114 ) { 115 return a.retryer.GetRetryToken(ctx, opErr) 116 } 117 118 // GetInitialToken returns the initial attempt token that can increment the 119 // retry token pool if the attempt is successful. 120 // 121 // Deprecated: This method does not provide a way to block using Context, 122 // nor can it return an error. Use RetryerV2, and GetAttemptToken instead. Only 123 // present to implement Retryer interface. 124 func (a *AdaptiveMode) GetInitialToken() (releaseToken func(error) error) { 125 return nopRelease 126 } 127 128 // GetAttemptToken returns the attempt token that can be used to rate limit 129 // attempt calls. Will be used by the SDK's retry package's Attempt 130 // middleware to get an attempt token prior to calling the temp and releasing 131 // the attempt token after the attempt has been made. 132 func (a *AdaptiveMode) GetAttemptToken(ctx context.Context) (func(error) error, error) { 133 for { 134 acquiredToken, waitTryAgain := a.rateLimit.AcquireToken(a.options.RequestCost) 135 if acquiredToken { 136 break 137 } 138 if a.options.FailOnNoAttemptTokens { 139 return nil, fmt.Errorf( 140 "unable to get attempt token, and FailOnNoAttemptTokens enables") 141 } 142 143 if err := sdk.SleepWithContext(ctx, waitTryAgain); err != nil { 144 return nil, fmt.Errorf("failed to wait for token to be available, %w", err) 145 } 146 } 147 148 return a.handleResponse, nil 149 } 150 151 func (a *AdaptiveMode) handleResponse(opErr error) error { 152 throttled := a.throttles.IsErrorThrottle(opErr).Bool() 153 154 a.rateLimit.Update(throttled) 155 return nil 156 }