src

Go monorepo.
git clone git://code.dwrz.net/src
Log | Files | Refs

timeout_read_closer.go (2797B)


      1 package http
      2 
      3 import (
      4 	"context"
      5 	"fmt"
      6 	"io"
      7 	"time"
      8 
      9 	"github.com/aws/smithy-go"
     10 	"github.com/aws/smithy-go/middleware"
     11 	smithyhttp "github.com/aws/smithy-go/transport/http"
     12 )
     13 
     14 type readResult struct {
     15 	n   int
     16 	err error
     17 }
     18 
     19 // ResponseTimeoutError is an error when the reads from the response are
     20 // delayed longer than the timeout the read was configured for.
     21 type ResponseTimeoutError struct {
     22 	TimeoutDur time.Duration
     23 }
     24 
     25 // Timeout returns that the error is was caused by a timeout, and can be
     26 // retried.
     27 func (*ResponseTimeoutError) Timeout() bool { return true }
     28 
     29 func (e *ResponseTimeoutError) Error() string {
     30 	return fmt.Sprintf("read on body reach timeout limit, %v", e.TimeoutDur)
     31 }
     32 
     33 // timeoutReadCloser will handle body reads that take too long.
     34 // We will return a ErrReadTimeout error if a timeout occurs.
     35 type timeoutReadCloser struct {
     36 	reader   io.ReadCloser
     37 	duration time.Duration
     38 }
     39 
     40 // Read will spin off a goroutine to call the reader's Read method. We will
     41 // select on the timer's channel or the read's channel. Whoever completes first
     42 // will be returned.
     43 func (r *timeoutReadCloser) Read(b []byte) (int, error) {
     44 	timer := time.NewTimer(r.duration)
     45 	c := make(chan readResult, 1)
     46 
     47 	go func() {
     48 		n, err := r.reader.Read(b)
     49 		timer.Stop()
     50 		c <- readResult{n: n, err: err}
     51 	}()
     52 
     53 	select {
     54 	case data := <-c:
     55 		return data.n, data.err
     56 	case <-timer.C:
     57 		return 0, &ResponseTimeoutError{TimeoutDur: r.duration}
     58 	}
     59 }
     60 
     61 func (r *timeoutReadCloser) Close() error {
     62 	return r.reader.Close()
     63 }
     64 
     65 // AddResponseReadTimeoutMiddleware adds a middleware to the stack that wraps the
     66 // response body so that a read that takes too long will return an error.
     67 func AddResponseReadTimeoutMiddleware(stack *middleware.Stack, duration time.Duration) error {
     68 	return stack.Deserialize.Add(&readTimeout{duration: duration}, middleware.After)
     69 }
     70 
     71 // readTimeout wraps the response body with a timeoutReadCloser
     72 type readTimeout struct {
     73 	duration time.Duration
     74 }
     75 
     76 // ID returns the id of the middleware
     77 func (*readTimeout) ID() string {
     78 	return "ReadResponseTimeout"
     79 }
     80 
     81 // HandleDeserialize implements the DeserializeMiddleware interface
     82 func (m *readTimeout) HandleDeserialize(
     83 	ctx context.Context, in middleware.DeserializeInput, next middleware.DeserializeHandler,
     84 ) (
     85 	out middleware.DeserializeOutput, metadata middleware.Metadata, err error,
     86 ) {
     87 	out, metadata, err = next.HandleDeserialize(ctx, in)
     88 	if err != nil {
     89 		return out, metadata, err
     90 	}
     91 
     92 	response, ok := out.RawResponse.(*smithyhttp.Response)
     93 	if !ok {
     94 		return out, metadata, &smithy.DeserializationError{Err: fmt.Errorf("unknown transport type %T", out.RawResponse)}
     95 	}
     96 
     97 	response.Body = &timeoutReadCloser{
     98 		reader:   response.Body,
     99 		duration: m.duration,
    100 	}
    101 	out.RawResponse = response
    102 
    103 	return out, metadata, err
    104 }