middleware.go (11400B)
1 package retry 2 3 import ( 4 "context" 5 "fmt" 6 "github.com/aws/aws-sdk-go-v2/aws/middleware/private/metrics" 7 "strconv" 8 "strings" 9 "time" 10 11 "github.com/aws/aws-sdk-go-v2/aws" 12 awsmiddle "github.com/aws/aws-sdk-go-v2/aws/middleware" 13 "github.com/aws/aws-sdk-go-v2/internal/sdk" 14 "github.com/aws/smithy-go/logging" 15 smithymiddle "github.com/aws/smithy-go/middleware" 16 "github.com/aws/smithy-go/transport/http" 17 ) 18 19 // RequestCloner is a function that can take an input request type and clone 20 // the request for use in a subsequent retry attempt. 21 type RequestCloner func(interface{}) interface{} 22 23 type retryMetadata struct { 24 AttemptNum int 25 AttemptTime time.Time 26 MaxAttempts int 27 AttemptClockSkew time.Duration 28 } 29 30 // Attempt is a Smithy Finalize middleware that handles retry attempts using 31 // the provided Retryer implementation. 32 type Attempt struct { 33 // Enable the logging of retry attempts performed by the SDK. This will 34 // include logging retry attempts, unretryable errors, and when max 35 // attempts are reached. 36 LogAttempts bool 37 38 retryer aws.RetryerV2 39 requestCloner RequestCloner 40 } 41 42 // NewAttemptMiddleware returns a new Attempt retry middleware. 43 func NewAttemptMiddleware(retryer aws.Retryer, requestCloner RequestCloner, optFns ...func(*Attempt)) *Attempt { 44 m := &Attempt{ 45 retryer: wrapAsRetryerV2(retryer), 46 requestCloner: requestCloner, 47 } 48 for _, fn := range optFns { 49 fn(m) 50 } 51 return m 52 } 53 54 // ID returns the middleware identifier 55 func (r *Attempt) ID() string { return "Retry" } 56 57 func (r Attempt) logf(logger logging.Logger, classification logging.Classification, format string, v ...interface{}) { 58 if !r.LogAttempts { 59 return 60 } 61 logger.Logf(classification, format, v...) 62 } 63 64 // HandleFinalize utilizes the provider Retryer implementation to attempt 65 // retries over the next handler 66 func (r *Attempt) HandleFinalize(ctx context.Context, in smithymiddle.FinalizeInput, next smithymiddle.FinalizeHandler) ( 67 out smithymiddle.FinalizeOutput, metadata smithymiddle.Metadata, err error, 68 ) { 69 var attemptNum int 70 var attemptClockSkew time.Duration 71 var attemptResults AttemptResults 72 73 maxAttempts := r.retryer.MaxAttempts() 74 releaseRetryToken := nopRelease 75 76 for { 77 attemptNum++ 78 attemptInput := in 79 attemptInput.Request = r.requestCloner(attemptInput.Request) 80 81 // Record the metadata for the for attempt being started. 82 attemptCtx := setRetryMetadata(ctx, retryMetadata{ 83 AttemptNum: attemptNum, 84 AttemptTime: sdk.NowTime().UTC(), 85 MaxAttempts: maxAttempts, 86 AttemptClockSkew: attemptClockSkew, 87 }) 88 89 var attemptResult AttemptResult 90 out, attemptResult, releaseRetryToken, err = r.handleAttempt(attemptCtx, attemptInput, releaseRetryToken, next) 91 attemptClockSkew, _ = awsmiddle.GetAttemptSkew(attemptResult.ResponseMetadata) 92 93 // AttemptResult Retried states that the attempt was not successful, and 94 // should be retried. 95 shouldRetry := attemptResult.Retried 96 97 // Add attempt metadata to list of all attempt metadata 98 attemptResults.Results = append(attemptResults.Results, attemptResult) 99 100 if !shouldRetry { 101 // Ensure the last response's metadata is used as the bases for result 102 // metadata returned by the stack. The Slice of attempt results 103 // will be added to this cloned metadata. 104 metadata = attemptResult.ResponseMetadata.Clone() 105 106 break 107 } 108 } 109 110 addAttemptResults(&metadata, attemptResults) 111 return out, metadata, err 112 } 113 114 // handleAttempt handles an individual request attempt. 115 func (r *Attempt) handleAttempt( 116 ctx context.Context, in smithymiddle.FinalizeInput, releaseRetryToken func(error) error, next smithymiddle.FinalizeHandler, 117 ) ( 118 out smithymiddle.FinalizeOutput, attemptResult AttemptResult, _ func(error) error, err error, 119 ) { 120 defer func() { 121 attemptResult.Err = err 122 }() 123 124 // Short circuit if this attempt never can succeed because the context is 125 // canceled. This reduces the chance of token pools being modified for 126 // attempts that will not be made 127 select { 128 case <-ctx.Done(): 129 return out, attemptResult, nopRelease, ctx.Err() 130 default: 131 } 132 133 //------------------------------ 134 // Get Attempt Token 135 //------------------------------ 136 releaseAttemptToken, err := r.retryer.GetAttemptToken(ctx) 137 if err != nil { 138 return out, attemptResult, nopRelease, fmt.Errorf( 139 "failed to get retry Send token, %w", err) 140 } 141 142 //------------------------------ 143 // Send Attempt 144 //------------------------------ 145 logger := smithymiddle.GetLogger(ctx) 146 service, operation := awsmiddle.GetServiceID(ctx), awsmiddle.GetOperationName(ctx) 147 retryMetadata, _ := getRetryMetadata(ctx) 148 attemptNum := retryMetadata.AttemptNum 149 maxAttempts := retryMetadata.MaxAttempts 150 151 // Following attempts must ensure the request payload stream starts in a 152 // rewound state. 153 if attemptNum > 1 { 154 if rewindable, ok := in.Request.(interface{ RewindStream() error }); ok { 155 if rewindErr := rewindable.RewindStream(); rewindErr != nil { 156 return out, attemptResult, nopRelease, fmt.Errorf( 157 "failed to rewind transport stream for retry, %w", rewindErr) 158 } 159 } 160 161 r.logf(logger, logging.Debug, "retrying request %s/%s, attempt %d", 162 service, operation, attemptNum) 163 } 164 165 var metadata smithymiddle.Metadata 166 out, metadata, err = next.HandleFinalize(ctx, in) 167 attemptResult.ResponseMetadata = metadata 168 169 //------------------------------ 170 // Bookkeeping 171 //------------------------------ 172 // Release the retry token based on the state of the attempt's error (if any). 173 if releaseError := releaseRetryToken(err); releaseError != nil && err != nil { 174 return out, attemptResult, nopRelease, fmt.Errorf( 175 "failed to release retry token after request error, %w", err) 176 } 177 // Release the attempt token based on the state of the attempt's error (if any). 178 if releaseError := releaseAttemptToken(err); releaseError != nil && err != nil { 179 return out, attemptResult, nopRelease, fmt.Errorf( 180 "failed to release initial token after request error, %w", err) 181 } 182 // If there was no error making the attempt, nothing further to do. There 183 // will be nothing to retry. 184 if err == nil { 185 return out, attemptResult, nopRelease, err 186 } 187 188 //------------------------------ 189 // Is Retryable and Should Retry 190 //------------------------------ 191 // If the attempt failed with an unretryable error, nothing further to do 192 // but return, and inform the caller about the terminal failure. 193 retryable := r.retryer.IsErrorRetryable(err) 194 if !retryable { 195 r.logf(logger, logging.Debug, "request failed with unretryable error %v", err) 196 return out, attemptResult, nopRelease, err 197 } 198 199 // set retryable to true 200 attemptResult.Retryable = true 201 202 // Once the maximum number of attempts have been exhausted there is nothing 203 // further to do other than inform the caller about the terminal failure. 204 if maxAttempts > 0 && attemptNum >= maxAttempts { 205 r.logf(logger, logging.Debug, "max retry attempts exhausted, max %d", maxAttempts) 206 err = &MaxAttemptsError{ 207 Attempt: attemptNum, 208 Err: err, 209 } 210 return out, attemptResult, nopRelease, err 211 } 212 213 //------------------------------ 214 // Get Retry (aka Retry Quota) Token 215 //------------------------------ 216 // Get a retry token that will be released after the 217 releaseRetryToken, retryTokenErr := r.retryer.GetRetryToken(ctx, err) 218 if retryTokenErr != nil { 219 return out, attemptResult, nopRelease, retryTokenErr 220 } 221 222 //------------------------------ 223 // Retry Delay and Sleep 224 //------------------------------ 225 // Get the retry delay before another attempt can be made, and sleep for 226 // that time. Potentially early exist if the sleep is canceled via the 227 // context. 228 retryDelay, reqErr := r.retryer.RetryDelay(attemptNum, err) 229 mctx := metrics.Context(ctx) 230 if mctx != nil { 231 attempt, err := mctx.Data().LatestAttempt() 232 if err != nil { 233 attempt.RetryDelay = retryDelay 234 } 235 } 236 if reqErr != nil { 237 return out, attemptResult, releaseRetryToken, reqErr 238 } 239 if reqErr = sdk.SleepWithContext(ctx, retryDelay); reqErr != nil { 240 err = &aws.RequestCanceledError{Err: reqErr} 241 return out, attemptResult, releaseRetryToken, err 242 } 243 244 // The request should be re-attempted. 245 attemptResult.Retried = true 246 247 return out, attemptResult, releaseRetryToken, err 248 } 249 250 // MetricsHeader attaches SDK request metric header for retries to the transport 251 type MetricsHeader struct{} 252 253 // ID returns the middleware identifier 254 func (r *MetricsHeader) ID() string { 255 return "RetryMetricsHeader" 256 } 257 258 // HandleFinalize attaches the SDK request metric header to the transport layer 259 func (r MetricsHeader) HandleFinalize(ctx context.Context, in smithymiddle.FinalizeInput, next smithymiddle.FinalizeHandler) ( 260 out smithymiddle.FinalizeOutput, metadata smithymiddle.Metadata, err error, 261 ) { 262 retryMetadata, _ := getRetryMetadata(ctx) 263 264 const retryMetricHeader = "Amz-Sdk-Request" 265 var parts []string 266 267 parts = append(parts, "attempt="+strconv.Itoa(retryMetadata.AttemptNum)) 268 if retryMetadata.MaxAttempts != 0 { 269 parts = append(parts, "max="+strconv.Itoa(retryMetadata.MaxAttempts)) 270 } 271 272 var ttl time.Time 273 if deadline, ok := ctx.Deadline(); ok { 274 ttl = deadline 275 } 276 277 // Only append the TTL if it can be determined. 278 if !ttl.IsZero() && retryMetadata.AttemptClockSkew > 0 { 279 const unixTimeFormat = "20060102T150405Z" 280 ttl = ttl.Add(retryMetadata.AttemptClockSkew) 281 parts = append(parts, "ttl="+ttl.Format(unixTimeFormat)) 282 } 283 284 switch req := in.Request.(type) { 285 case *http.Request: 286 req.Header[retryMetricHeader] = append(req.Header[retryMetricHeader][:0], strings.Join(parts, "; ")) 287 default: 288 return out, metadata, fmt.Errorf("unknown transport type %T", req) 289 } 290 291 return next.HandleFinalize(ctx, in) 292 } 293 294 type retryMetadataKey struct{} 295 296 // getRetryMetadata retrieves retryMetadata from the context and a bool 297 // indicating if it was set. 298 // 299 // Scoped to stack values. Use github.com/aws/smithy-go/middleware#ClearStackValues 300 // to clear all stack values. 301 func getRetryMetadata(ctx context.Context) (metadata retryMetadata, ok bool) { 302 metadata, ok = smithymiddle.GetStackValue(ctx, retryMetadataKey{}).(retryMetadata) 303 return metadata, ok 304 } 305 306 // setRetryMetadata sets the retryMetadata on the context. 307 // 308 // Scoped to stack values. Use github.com/aws/smithy-go/middleware#ClearStackValues 309 // to clear all stack values. 310 func setRetryMetadata(ctx context.Context, metadata retryMetadata) context.Context { 311 return smithymiddle.WithStackValue(ctx, retryMetadataKey{}, metadata) 312 } 313 314 // AddRetryMiddlewaresOptions is the set of options that can be passed to 315 // AddRetryMiddlewares for configuring retry associated middleware. 316 type AddRetryMiddlewaresOptions struct { 317 Retryer aws.Retryer 318 319 // Enable the logging of retry attempts performed by the SDK. This will 320 // include logging retry attempts, unretryable errors, and when max 321 // attempts are reached. 322 LogRetryAttempts bool 323 } 324 325 // AddRetryMiddlewares adds retry middleware to operation middleware stack 326 func AddRetryMiddlewares(stack *smithymiddle.Stack, options AddRetryMiddlewaresOptions) error { 327 attempt := NewAttemptMiddleware(options.Retryer, http.RequestCloner, func(middleware *Attempt) { 328 middleware.LogAttempts = options.LogRetryAttempts 329 }) 330 331 // index retry to before signing, if signing exists 332 if err := stack.Finalize.Insert(attempt, "Signing", smithymiddle.Before); err != nil { 333 return err 334 } 335 336 if err := stack.Finalize.Insert(&MetricsHeader{}, attempt.ID(), smithymiddle.After); err != nil { 337 return err 338 } 339 return nil 340 }