code.dwrz.net

Go monorepo.
Log | Files | Refs

adaptive.go (5395B)


      1 package retry
      2 
      3 import (
      4 	"context"
      5 	"fmt"
      6 	"time"
      7 
      8 	"github.com/aws/aws-sdk-go-v2/aws"
      9 	"github.com/aws/aws-sdk-go-v2/internal/sdk"
     10 )
     11 
     12 const (
     13 	// DefaultRequestCost is the cost of a single request from the adaptive
     14 	// rate limited token bucket.
     15 	DefaultRequestCost uint = 1
     16 )
     17 
     18 // DefaultThrottles provides the set of errors considered throttle errors that
     19 // are checked by default.
     20 var DefaultThrottles = []IsErrorThrottle{
     21 	ThrottleErrorCode{
     22 		Codes: DefaultThrottleErrorCodes,
     23 	},
     24 }
     25 
     26 // AdaptiveModeOptions provides the functional options for configuring the
     27 // adaptive retry mode, and delay behavior.
     28 type AdaptiveModeOptions struct {
     29 	// If the adaptive token bucket is empty, when an attempt will be made
     30 	// AdaptiveMode will sleep until a token is available. This can occur when
     31 	// attempts fail with throttle errors. Use this option to disable the sleep
     32 	// until token is available, and return error immediately.
     33 	FailOnNoAttemptTokens bool
     34 
     35 	// The cost of an attempt from the AdaptiveMode's adaptive token bucket.
     36 	RequestCost uint
     37 
     38 	// Set of strategies to determine if the attempt failed due to a throttle
     39 	// error.
     40 	//
     41 	// It is safe to append to this list in NewAdaptiveMode's functional options.
     42 	Throttles []IsErrorThrottle
     43 
     44 	// Set of options for standard retry mode that AdaptiveMode is built on top
     45 	// of. AdaptiveMode may apply its own defaults to Standard retry mode that
     46 	// are different than the defaults of NewStandard. Use these options to
     47 	// override the default options.
     48 	StandardOptions []func(*StandardOptions)
     49 }
     50 
     51 // AdaptiveMode provides an experimental retry strategy that expands on the
     52 // Standard retry strategy, adding client attempt rate limits. The attempt rate
     53 // limit is initially unrestricted, but becomes restricted when the attempt
     54 // fails with for a throttle error. When restricted AdaptiveMode may need to
     55 // sleep before an attempt is made, if too many throttles have been received.
     56 // AdaptiveMode's sleep can be canceled with context cancel. Set
     57 // AdaptiveModeOptions FailOnNoAttemptTokens to change the behavior from sleep,
     58 // to fail fast.
     59 //
     60 // Eventually unrestricted attempt rate limit will be restored once attempts no
     61 // longer are failing due to throttle errors.
     62 type AdaptiveMode struct {
     63 	options   AdaptiveModeOptions
     64 	throttles IsErrorThrottles
     65 
     66 	retryer   aws.RetryerV2
     67 	rateLimit *adaptiveRateLimit
     68 }
     69 
     70 // NewAdaptiveMode returns an initialized AdaptiveMode retry strategy.
     71 func NewAdaptiveMode(optFns ...func(*AdaptiveModeOptions)) *AdaptiveMode {
     72 	o := AdaptiveModeOptions{
     73 		RequestCost: DefaultRequestCost,
     74 		Throttles:   append([]IsErrorThrottle{}, DefaultThrottles...),
     75 	}
     76 	for _, fn := range optFns {
     77 		fn(&o)
     78 	}
     79 
     80 	return &AdaptiveMode{
     81 		options:   o,
     82 		throttles: IsErrorThrottles(o.Throttles),
     83 		retryer:   NewStandard(o.StandardOptions...),
     84 		rateLimit: newAdaptiveRateLimit(),
     85 	}
     86 }
     87 
     88 // IsErrorRetryable returns if the failed attempt is retryable. This check
     89 // should determine if the error can be retried, or if the error is
     90 // terminal.
     91 func (a *AdaptiveMode) IsErrorRetryable(err error) bool {
     92 	return a.retryer.IsErrorRetryable(err)
     93 }
     94 
     95 // MaxAttempts returns the maximum number of attempts that can be made for
     96 // an attempt before failing. A value of 0 implies that the attempt should
     97 // be retried until it succeeds if the errors are retryable.
     98 func (a *AdaptiveMode) MaxAttempts() int {
     99 	return a.retryer.MaxAttempts()
    100 }
    101 
    102 // RetryDelay returns the delay that should be used before retrying the
    103 // attempt. Will return error if the if the delay could not be determined.
    104 func (a *AdaptiveMode) RetryDelay(attempt int, opErr error) (
    105 	time.Duration, error,
    106 ) {
    107 	return a.retryer.RetryDelay(attempt, opErr)
    108 }
    109 
    110 // GetRetryToken attempts to deduct the retry cost from the retry token pool.
    111 // Returning the token release function, or error.
    112 func (a *AdaptiveMode) GetRetryToken(ctx context.Context, opErr error) (
    113 	releaseToken func(error) error, err error,
    114 ) {
    115 	return a.retryer.GetRetryToken(ctx, opErr)
    116 }
    117 
    118 // GetInitialToken returns the initial attempt token that can increment the
    119 // retry token pool if the attempt is successful.
    120 //
    121 // Deprecated: This method does not provide a way to block using Context,
    122 // nor can it return an error. Use RetryerV2, and GetAttemptToken instead. Only
    123 // present to implement Retryer interface.
    124 func (a *AdaptiveMode) GetInitialToken() (releaseToken func(error) error) {
    125 	return nopRelease
    126 }
    127 
    128 // GetAttemptToken returns the attempt token that can be used to rate limit
    129 // attempt calls. Will be used by the SDK's retry package's Attempt
    130 // middleware to get an attempt token prior to calling the temp and releasing
    131 // the attempt token after the attempt has been made.
    132 func (a *AdaptiveMode) GetAttemptToken(ctx context.Context) (func(error) error, error) {
    133 	for {
    134 		acquiredToken, waitTryAgain := a.rateLimit.AcquireToken(a.options.RequestCost)
    135 		if acquiredToken {
    136 			break
    137 		}
    138 		if a.options.FailOnNoAttemptTokens {
    139 			return nil, fmt.Errorf(
    140 				"unable to get attempt token, and FailOnNoAttemptTokens enables")
    141 		}
    142 
    143 		if err := sdk.SleepWithContext(ctx, waitTryAgain); err != nil {
    144 			return nil, fmt.Errorf("failed to wait for token to be available, %w", err)
    145 		}
    146 	}
    147 
    148 	return a.handleResponse, nil
    149 }
    150 
    151 func (a *AdaptiveMode) handleResponse(opErr error) error {
    152 	throttled := a.throttles.IsErrorThrottle(opErr).Bool()
    153 
    154 	a.rateLimit.Update(throttled)
    155 	return nil
    156 }