src

Go monorepo.
git clone git://code.dwrz.net/src
Log | Files | Refs

middleware.go (11400B)


      1 package retry
      2 
      3 import (
      4 	"context"
      5 	"fmt"
      6 	"github.com/aws/aws-sdk-go-v2/aws/middleware/private/metrics"
      7 	"strconv"
      8 	"strings"
      9 	"time"
     10 
     11 	"github.com/aws/aws-sdk-go-v2/aws"
     12 	awsmiddle "github.com/aws/aws-sdk-go-v2/aws/middleware"
     13 	"github.com/aws/aws-sdk-go-v2/internal/sdk"
     14 	"github.com/aws/smithy-go/logging"
     15 	smithymiddle "github.com/aws/smithy-go/middleware"
     16 	"github.com/aws/smithy-go/transport/http"
     17 )
     18 
     19 // RequestCloner is a function that can take an input request type and clone
     20 // the request for use in a subsequent retry attempt.
     21 type RequestCloner func(interface{}) interface{}
     22 
     23 type retryMetadata struct {
     24 	AttemptNum       int
     25 	AttemptTime      time.Time
     26 	MaxAttempts      int
     27 	AttemptClockSkew time.Duration
     28 }
     29 
     30 // Attempt is a Smithy Finalize middleware that handles retry attempts using
     31 // the provided Retryer implementation.
     32 type Attempt struct {
     33 	// Enable the logging of retry attempts performed by the SDK. This will
     34 	// include logging retry attempts, unretryable errors, and when max
     35 	// attempts are reached.
     36 	LogAttempts bool
     37 
     38 	retryer       aws.RetryerV2
     39 	requestCloner RequestCloner
     40 }
     41 
     42 // NewAttemptMiddleware returns a new Attempt retry middleware.
     43 func NewAttemptMiddleware(retryer aws.Retryer, requestCloner RequestCloner, optFns ...func(*Attempt)) *Attempt {
     44 	m := &Attempt{
     45 		retryer:       wrapAsRetryerV2(retryer),
     46 		requestCloner: requestCloner,
     47 	}
     48 	for _, fn := range optFns {
     49 		fn(m)
     50 	}
     51 	return m
     52 }
     53 
     54 // ID returns the middleware identifier
     55 func (r *Attempt) ID() string { return "Retry" }
     56 
     57 func (r Attempt) logf(logger logging.Logger, classification logging.Classification, format string, v ...interface{}) {
     58 	if !r.LogAttempts {
     59 		return
     60 	}
     61 	logger.Logf(classification, format, v...)
     62 }
     63 
     64 // HandleFinalize utilizes the provider Retryer implementation to attempt
     65 // retries over the next handler
     66 func (r *Attempt) HandleFinalize(ctx context.Context, in smithymiddle.FinalizeInput, next smithymiddle.FinalizeHandler) (
     67 	out smithymiddle.FinalizeOutput, metadata smithymiddle.Metadata, err error,
     68 ) {
     69 	var attemptNum int
     70 	var attemptClockSkew time.Duration
     71 	var attemptResults AttemptResults
     72 
     73 	maxAttempts := r.retryer.MaxAttempts()
     74 	releaseRetryToken := nopRelease
     75 
     76 	for {
     77 		attemptNum++
     78 		attemptInput := in
     79 		attemptInput.Request = r.requestCloner(attemptInput.Request)
     80 
     81 		// Record the metadata for the for attempt being started.
     82 		attemptCtx := setRetryMetadata(ctx, retryMetadata{
     83 			AttemptNum:       attemptNum,
     84 			AttemptTime:      sdk.NowTime().UTC(),
     85 			MaxAttempts:      maxAttempts,
     86 			AttemptClockSkew: attemptClockSkew,
     87 		})
     88 
     89 		var attemptResult AttemptResult
     90 		out, attemptResult, releaseRetryToken, err = r.handleAttempt(attemptCtx, attemptInput, releaseRetryToken, next)
     91 		attemptClockSkew, _ = awsmiddle.GetAttemptSkew(attemptResult.ResponseMetadata)
     92 
     93 		// AttemptResult Retried states that the attempt was not successful, and
     94 		// should be retried.
     95 		shouldRetry := attemptResult.Retried
     96 
     97 		// Add attempt metadata to list of all attempt metadata
     98 		attemptResults.Results = append(attemptResults.Results, attemptResult)
     99 
    100 		if !shouldRetry {
    101 			// Ensure the last response's metadata is used as the bases for result
    102 			// metadata returned by the stack. The Slice of attempt results
    103 			// will be added to this cloned metadata.
    104 			metadata = attemptResult.ResponseMetadata.Clone()
    105 
    106 			break
    107 		}
    108 	}
    109 
    110 	addAttemptResults(&metadata, attemptResults)
    111 	return out, metadata, err
    112 }
    113 
    114 // handleAttempt handles an individual request attempt.
    115 func (r *Attempt) handleAttempt(
    116 	ctx context.Context, in smithymiddle.FinalizeInput, releaseRetryToken func(error) error, next smithymiddle.FinalizeHandler,
    117 ) (
    118 	out smithymiddle.FinalizeOutput, attemptResult AttemptResult, _ func(error) error, err error,
    119 ) {
    120 	defer func() {
    121 		attemptResult.Err = err
    122 	}()
    123 
    124 	// Short circuit if this attempt never can succeed because the context is
    125 	// canceled. This reduces the chance of token pools being modified for
    126 	// attempts that will not be made
    127 	select {
    128 	case <-ctx.Done():
    129 		return out, attemptResult, nopRelease, ctx.Err()
    130 	default:
    131 	}
    132 
    133 	//------------------------------
    134 	// Get Attempt Token
    135 	//------------------------------
    136 	releaseAttemptToken, err := r.retryer.GetAttemptToken(ctx)
    137 	if err != nil {
    138 		return out, attemptResult, nopRelease, fmt.Errorf(
    139 			"failed to get retry Send token, %w", err)
    140 	}
    141 
    142 	//------------------------------
    143 	// Send Attempt
    144 	//------------------------------
    145 	logger := smithymiddle.GetLogger(ctx)
    146 	service, operation := awsmiddle.GetServiceID(ctx), awsmiddle.GetOperationName(ctx)
    147 	retryMetadata, _ := getRetryMetadata(ctx)
    148 	attemptNum := retryMetadata.AttemptNum
    149 	maxAttempts := retryMetadata.MaxAttempts
    150 
    151 	// Following attempts must ensure the request payload stream starts in a
    152 	// rewound state.
    153 	if attemptNum > 1 {
    154 		if rewindable, ok := in.Request.(interface{ RewindStream() error }); ok {
    155 			if rewindErr := rewindable.RewindStream(); rewindErr != nil {
    156 				return out, attemptResult, nopRelease, fmt.Errorf(
    157 					"failed to rewind transport stream for retry, %w", rewindErr)
    158 			}
    159 		}
    160 
    161 		r.logf(logger, logging.Debug, "retrying request %s/%s, attempt %d",
    162 			service, operation, attemptNum)
    163 	}
    164 
    165 	var metadata smithymiddle.Metadata
    166 	out, metadata, err = next.HandleFinalize(ctx, in)
    167 	attemptResult.ResponseMetadata = metadata
    168 
    169 	//------------------------------
    170 	// Bookkeeping
    171 	//------------------------------
    172 	// Release the retry token based on the state of the attempt's error (if any).
    173 	if releaseError := releaseRetryToken(err); releaseError != nil && err != nil {
    174 		return out, attemptResult, nopRelease, fmt.Errorf(
    175 			"failed to release retry token after request error, %w", err)
    176 	}
    177 	// Release the attempt token based on the state of the attempt's error (if any).
    178 	if releaseError := releaseAttemptToken(err); releaseError != nil && err != nil {
    179 		return out, attemptResult, nopRelease, fmt.Errorf(
    180 			"failed to release initial token after request error, %w", err)
    181 	}
    182 	// If there was no error making the attempt, nothing further to do. There
    183 	// will be nothing to retry.
    184 	if err == nil {
    185 		return out, attemptResult, nopRelease, err
    186 	}
    187 
    188 	//------------------------------
    189 	// Is Retryable and Should Retry
    190 	//------------------------------
    191 	// If the attempt failed with an unretryable error, nothing further to do
    192 	// but return, and inform the caller about the terminal failure.
    193 	retryable := r.retryer.IsErrorRetryable(err)
    194 	if !retryable {
    195 		r.logf(logger, logging.Debug, "request failed with unretryable error %v", err)
    196 		return out, attemptResult, nopRelease, err
    197 	}
    198 
    199 	// set retryable to true
    200 	attemptResult.Retryable = true
    201 
    202 	// Once the maximum number of attempts have been exhausted there is nothing
    203 	// further to do other than inform the caller about the terminal failure.
    204 	if maxAttempts > 0 && attemptNum >= maxAttempts {
    205 		r.logf(logger, logging.Debug, "max retry attempts exhausted, max %d", maxAttempts)
    206 		err = &MaxAttemptsError{
    207 			Attempt: attemptNum,
    208 			Err:     err,
    209 		}
    210 		return out, attemptResult, nopRelease, err
    211 	}
    212 
    213 	//------------------------------
    214 	// Get Retry (aka Retry Quota) Token
    215 	//------------------------------
    216 	// Get a retry token that will be released after the
    217 	releaseRetryToken, retryTokenErr := r.retryer.GetRetryToken(ctx, err)
    218 	if retryTokenErr != nil {
    219 		return out, attemptResult, nopRelease, retryTokenErr
    220 	}
    221 
    222 	//------------------------------
    223 	// Retry Delay and Sleep
    224 	//------------------------------
    225 	// Get the retry delay before another attempt can be made, and sleep for
    226 	// that time. Potentially early exist if the sleep is canceled via the
    227 	// context.
    228 	retryDelay, reqErr := r.retryer.RetryDelay(attemptNum, err)
    229 	mctx := metrics.Context(ctx)
    230 	if mctx != nil {
    231 		attempt, err := mctx.Data().LatestAttempt()
    232 		if err != nil {
    233 			attempt.RetryDelay = retryDelay
    234 		}
    235 	}
    236 	if reqErr != nil {
    237 		return out, attemptResult, releaseRetryToken, reqErr
    238 	}
    239 	if reqErr = sdk.SleepWithContext(ctx, retryDelay); reqErr != nil {
    240 		err = &aws.RequestCanceledError{Err: reqErr}
    241 		return out, attemptResult, releaseRetryToken, err
    242 	}
    243 
    244 	// The request should be re-attempted.
    245 	attemptResult.Retried = true
    246 
    247 	return out, attemptResult, releaseRetryToken, err
    248 }
    249 
    250 // MetricsHeader attaches SDK request metric header for retries to the transport
    251 type MetricsHeader struct{}
    252 
    253 // ID returns the middleware identifier
    254 func (r *MetricsHeader) ID() string {
    255 	return "RetryMetricsHeader"
    256 }
    257 
    258 // HandleFinalize attaches the SDK request metric header to the transport layer
    259 func (r MetricsHeader) HandleFinalize(ctx context.Context, in smithymiddle.FinalizeInput, next smithymiddle.FinalizeHandler) (
    260 	out smithymiddle.FinalizeOutput, metadata smithymiddle.Metadata, err error,
    261 ) {
    262 	retryMetadata, _ := getRetryMetadata(ctx)
    263 
    264 	const retryMetricHeader = "Amz-Sdk-Request"
    265 	var parts []string
    266 
    267 	parts = append(parts, "attempt="+strconv.Itoa(retryMetadata.AttemptNum))
    268 	if retryMetadata.MaxAttempts != 0 {
    269 		parts = append(parts, "max="+strconv.Itoa(retryMetadata.MaxAttempts))
    270 	}
    271 
    272 	var ttl time.Time
    273 	if deadline, ok := ctx.Deadline(); ok {
    274 		ttl = deadline
    275 	}
    276 
    277 	// Only append the TTL if it can be determined.
    278 	if !ttl.IsZero() && retryMetadata.AttemptClockSkew > 0 {
    279 		const unixTimeFormat = "20060102T150405Z"
    280 		ttl = ttl.Add(retryMetadata.AttemptClockSkew)
    281 		parts = append(parts, "ttl="+ttl.Format(unixTimeFormat))
    282 	}
    283 
    284 	switch req := in.Request.(type) {
    285 	case *http.Request:
    286 		req.Header[retryMetricHeader] = append(req.Header[retryMetricHeader][:0], strings.Join(parts, "; "))
    287 	default:
    288 		return out, metadata, fmt.Errorf("unknown transport type %T", req)
    289 	}
    290 
    291 	return next.HandleFinalize(ctx, in)
    292 }
    293 
    294 type retryMetadataKey struct{}
    295 
    296 // getRetryMetadata retrieves retryMetadata from the context and a bool
    297 // indicating if it was set.
    298 //
    299 // Scoped to stack values. Use github.com/aws/smithy-go/middleware#ClearStackValues
    300 // to clear all stack values.
    301 func getRetryMetadata(ctx context.Context) (metadata retryMetadata, ok bool) {
    302 	metadata, ok = smithymiddle.GetStackValue(ctx, retryMetadataKey{}).(retryMetadata)
    303 	return metadata, ok
    304 }
    305 
    306 // setRetryMetadata sets the retryMetadata on the context.
    307 //
    308 // Scoped to stack values. Use github.com/aws/smithy-go/middleware#ClearStackValues
    309 // to clear all stack values.
    310 func setRetryMetadata(ctx context.Context, metadata retryMetadata) context.Context {
    311 	return smithymiddle.WithStackValue(ctx, retryMetadataKey{}, metadata)
    312 }
    313 
    314 // AddRetryMiddlewaresOptions is the set of options that can be passed to
    315 // AddRetryMiddlewares for configuring retry associated middleware.
    316 type AddRetryMiddlewaresOptions struct {
    317 	Retryer aws.Retryer
    318 
    319 	// Enable the logging of retry attempts performed by the SDK. This will
    320 	// include logging retry attempts, unretryable errors, and when max
    321 	// attempts are reached.
    322 	LogRetryAttempts bool
    323 }
    324 
    325 // AddRetryMiddlewares adds retry middleware to operation middleware stack
    326 func AddRetryMiddlewares(stack *smithymiddle.Stack, options AddRetryMiddlewaresOptions) error {
    327 	attempt := NewAttemptMiddleware(options.Retryer, http.RequestCloner, func(middleware *Attempt) {
    328 		middleware.LogAttempts = options.LogRetryAttempts
    329 	})
    330 
    331 	// index retry to before signing, if signing exists
    332 	if err := stack.Finalize.Insert(attempt, "Signing", smithymiddle.Before); err != nil {
    333 		return err
    334 	}
    335 
    336 	if err := stack.Finalize.Insert(&MetricsHeader{}, attempt.ID(), smithymiddle.After); err != nil {
    337 		return err
    338 	}
    339 	return nil
    340 }