src

Go monorepo.
git clone git://code.dwrz.net/src
Log | Files | Refs

metrics.go (11385B)


      1 // Package metrics implements metrics gathering for SDK development purposes.
      2 //
      3 // This package is designated as private and is intended for use only by the
      4 // AWS client runtime. The exported API therein is not considered stable and
      5 // is subject to breaking changes without notice.
      6 package metrics
      7 
      8 import (
      9 	"context"
     10 	"encoding/json"
     11 	"fmt"
     12 	"sync"
     13 	"time"
     14 
     15 	"github.com/aws/smithy-go/middleware"
     16 )
     17 
     18 const (
     19 	// ServiceIDKey is the key for the service ID metric.
     20 	ServiceIDKey = "ServiceId"
     21 	// OperationNameKey is the key for the operation name metric.
     22 	OperationNameKey = "OperationName"
     23 	// ClientRequestIDKey is the key for the client request ID metric.
     24 	ClientRequestIDKey = "ClientRequestId"
     25 	// APICallDurationKey is the key for the API call duration metric.
     26 	APICallDurationKey = "ApiCallDuration"
     27 	// APICallSuccessfulKey is the key for the API call successful metric.
     28 	APICallSuccessfulKey = "ApiCallSuccessful"
     29 	// MarshallingDurationKey is the key for the marshalling duration metric.
     30 	MarshallingDurationKey = "MarshallingDuration"
     31 	// InThroughputKey is the key for the input throughput metric.
     32 	InThroughputKey = "InThroughput"
     33 	// OutThroughputKey is the key for the output throughput metric.
     34 	OutThroughputKey = "OutThroughput"
     35 	// RetryCountKey is the key for the retry count metric.
     36 	RetryCountKey = "RetryCount"
     37 	// HTTPStatusCodeKey is the key for the HTTP status code metric.
     38 	HTTPStatusCodeKey = "HttpStatusCode"
     39 	// AWSExtendedRequestIDKey is the key for the AWS extended request ID metric.
     40 	AWSExtendedRequestIDKey = "AwsExtendedRequestId"
     41 	// AWSRequestIDKey is the key for the AWS request ID metric.
     42 	AWSRequestIDKey = "AwsRequestId"
     43 	// BackoffDelayDurationKey is the key for the backoff delay duration metric.
     44 	BackoffDelayDurationKey = "BackoffDelayDuration"
     45 	// StreamThroughputKey is the key for the stream throughput metric.
     46 	StreamThroughputKey = "Throughput"
     47 	// ConcurrencyAcquireDurationKey is the key for the concurrency acquire duration metric.
     48 	ConcurrencyAcquireDurationKey = "ConcurrencyAcquireDuration"
     49 	// PendingConcurrencyAcquiresKey is the key for the pending concurrency acquires metric.
     50 	PendingConcurrencyAcquiresKey = "PendingConcurrencyAcquires"
     51 	// SigningDurationKey is the key for the signing duration metric.
     52 	SigningDurationKey = "SigningDuration"
     53 	// UnmarshallingDurationKey is the key for the unmarshalling duration metric.
     54 	UnmarshallingDurationKey = "UnmarshallingDuration"
     55 	// TimeToFirstByteKey is the key for the time to first byte metric.
     56 	TimeToFirstByteKey = "TimeToFirstByte"
     57 	// ServiceCallDurationKey is the key for the service call duration metric.
     58 	ServiceCallDurationKey = "ServiceCallDuration"
     59 	// EndpointResolutionDurationKey is the key for the endpoint resolution duration metric.
     60 	EndpointResolutionDurationKey = "EndpointResolutionDuration"
     61 	// AttemptNumberKey is the key for the attempt number metric.
     62 	AttemptNumberKey = "AttemptNumber"
     63 	// MaxConcurrencyKey is the key for the max concurrency metric.
     64 	MaxConcurrencyKey = "MaxConcurrency"
     65 	// AvailableConcurrencyKey is the key for the available concurrency metric.
     66 	AvailableConcurrencyKey = "AvailableConcurrency"
     67 )
     68 
     69 // MetricPublisher provides the interface to provide custom MetricPublishers.
     70 // PostRequestMetrics will be invoked by the MetricCollection middleware to post request.
     71 // PostStreamMetrics will be invoked by ReadCloserWithMetrics to post stream metrics.
     72 type MetricPublisher interface {
     73 	PostRequestMetrics(*MetricData) error
     74 	PostStreamMetrics(*MetricData) error
     75 }
     76 
     77 // Serializer provides the interface to provide custom Serializers.
     78 // Serialize will transform any input object in its corresponding string representation.
     79 type Serializer interface {
     80 	Serialize(obj interface{}) (string, error)
     81 }
     82 
     83 // DefaultSerializer is an implementation of the Serializer interface.
     84 type DefaultSerializer struct{}
     85 
     86 // Serialize uses the default JSON serializer to obtain the string representation of an object.
     87 func (DefaultSerializer) Serialize(obj interface{}) (string, error) {
     88 	bytes, err := json.Marshal(obj)
     89 	if err != nil {
     90 		return "", err
     91 	}
     92 	return string(bytes), nil
     93 }
     94 
     95 type metricContextKey struct{}
     96 
     97 // MetricContext contains fields to store metric-related information.
     98 type MetricContext struct {
     99 	connectionCounter *SharedConnectionCounter
    100 	publisher         MetricPublisher
    101 	data              *MetricData
    102 }
    103 
    104 // MetricData stores the collected metric data.
    105 type MetricData struct {
    106 	RequestStartTime           time.Time
    107 	RequestEndTime             time.Time
    108 	APICallDuration            time.Duration
    109 	SerializeStartTime         time.Time
    110 	SerializeEndTime           time.Time
    111 	MarshallingDuration        time.Duration
    112 	ResolveEndpointStartTime   time.Time
    113 	ResolveEndpointEndTime     time.Time
    114 	EndpointResolutionDuration time.Duration
    115 	InThroughput               float64
    116 	OutThroughput              float64
    117 	RetryCount                 int
    118 	Success                    uint8
    119 	StatusCode                 int
    120 	ClientRequestID            string
    121 	ServiceID                  string
    122 	OperationName              string
    123 	PartitionID                string
    124 	Region                     string
    125 	RequestContentLength       int64
    126 	Stream                     StreamMetrics
    127 	Attempts                   []AttemptMetrics
    128 }
    129 
    130 // StreamMetrics stores metrics related to streaming data.
    131 type StreamMetrics struct {
    132 	ReadDuration time.Duration
    133 	ReadBytes    int64
    134 	Throughput   float64
    135 }
    136 
    137 // AttemptMetrics stores metrics related to individual attempts.
    138 type AttemptMetrics struct {
    139 	ServiceCallStart           time.Time
    140 	ServiceCallEnd             time.Time
    141 	ServiceCallDuration        time.Duration
    142 	FirstByteTime              time.Time
    143 	TimeToFirstByte            time.Duration
    144 	ConnRequestedTime          time.Time
    145 	ConnObtainedTime           time.Time
    146 	ConcurrencyAcquireDuration time.Duration
    147 	CredentialFetchStartTime   time.Time
    148 	CredentialFetchEndTime     time.Time
    149 	SignStartTime              time.Time
    150 	SignEndTime                time.Time
    151 	SigningDuration            time.Duration
    152 	DeserializeStartTime       time.Time
    153 	DeserializeEndTime         time.Time
    154 	UnMarshallingDuration      time.Duration
    155 	RetryDelay                 time.Duration
    156 	ResponseContentLength      int64
    157 	StatusCode                 int
    158 	RequestID                  string
    159 	ExtendedRequestID          string
    160 	HTTPClient                 string
    161 	MaxConcurrency             int
    162 	PendingConnectionAcquires  int
    163 	AvailableConcurrency       int
    164 	ActiveRequests             int
    165 	ReusedConnection           bool
    166 }
    167 
    168 // Data returns the MetricData associated with the MetricContext.
    169 func (mc *MetricContext) Data() *MetricData {
    170 	return mc.data
    171 }
    172 
    173 // ConnectionCounter returns the SharedConnectionCounter associated with the MetricContext.
    174 func (mc *MetricContext) ConnectionCounter() *SharedConnectionCounter {
    175 	return mc.connectionCounter
    176 }
    177 
    178 // Publisher returns the MetricPublisher associated with the MetricContext.
    179 func (mc *MetricContext) Publisher() MetricPublisher {
    180 	return mc.publisher
    181 }
    182 
    183 // ComputeRequestMetrics calculates and populates derived metrics based on the collected data.
    184 func (md *MetricData) ComputeRequestMetrics() {
    185 
    186 	for idx := range md.Attempts {
    187 		attempt := &md.Attempts[idx]
    188 		attempt.ConcurrencyAcquireDuration = attempt.ConnObtainedTime.Sub(attempt.ConnRequestedTime)
    189 		attempt.SigningDuration = attempt.SignEndTime.Sub(attempt.SignStartTime)
    190 		attempt.UnMarshallingDuration = attempt.DeserializeEndTime.Sub(attempt.DeserializeStartTime)
    191 		attempt.TimeToFirstByte = attempt.FirstByteTime.Sub(attempt.ServiceCallStart)
    192 		attempt.ServiceCallDuration = attempt.ServiceCallEnd.Sub(attempt.ServiceCallStart)
    193 	}
    194 
    195 	md.APICallDuration = md.RequestEndTime.Sub(md.RequestStartTime)
    196 	md.MarshallingDuration = md.SerializeEndTime.Sub(md.SerializeStartTime)
    197 	md.EndpointResolutionDuration = md.ResolveEndpointEndTime.Sub(md.ResolveEndpointStartTime)
    198 
    199 	md.RetryCount = len(md.Attempts) - 1
    200 
    201 	latestAttempt, err := md.LatestAttempt()
    202 
    203 	if err != nil {
    204 		fmt.Printf("error retrieving attempts data due to: %s. Skipping Throughput metrics", err.Error())
    205 	} else {
    206 
    207 		md.StatusCode = latestAttempt.StatusCode
    208 
    209 		if md.Success == 1 {
    210 			if latestAttempt.ResponseContentLength > 0 && latestAttempt.ServiceCallDuration > 0 {
    211 				md.InThroughput = float64(latestAttempt.ResponseContentLength) / latestAttempt.ServiceCallDuration.Seconds()
    212 			}
    213 			if md.RequestContentLength > 0 && latestAttempt.ServiceCallDuration > 0 {
    214 				md.OutThroughput = float64(md.RequestContentLength) / latestAttempt.ServiceCallDuration.Seconds()
    215 			}
    216 		}
    217 	}
    218 }
    219 
    220 // LatestAttempt returns the latest attempt metrics.
    221 // It returns an error if no attempts are initialized.
    222 func (md *MetricData) LatestAttempt() (*AttemptMetrics, error) {
    223 	if md.Attempts == nil || len(md.Attempts) == 0 {
    224 		return nil, fmt.Errorf("no attempts initialized. NewAttempt() should be called first")
    225 	}
    226 	return &md.Attempts[len(md.Attempts)-1], nil
    227 }
    228 
    229 // NewAttempt initializes new attempt metrics.
    230 func (md *MetricData) NewAttempt() {
    231 	if md.Attempts == nil {
    232 		md.Attempts = []AttemptMetrics{}
    233 	}
    234 	md.Attempts = append(md.Attempts, AttemptMetrics{})
    235 }
    236 
    237 // SharedConnectionCounter is a counter shared across API calls.
    238 type SharedConnectionCounter struct {
    239 	mu sync.Mutex
    240 
    241 	activeRequests           int
    242 	pendingConnectionAcquire int
    243 }
    244 
    245 // ActiveRequests returns the count of active requests.
    246 func (cc *SharedConnectionCounter) ActiveRequests() int {
    247 	cc.mu.Lock()
    248 	defer cc.mu.Unlock()
    249 
    250 	return cc.activeRequests
    251 }
    252 
    253 // PendingConnectionAcquire returns the count of pending connection acquires.
    254 func (cc *SharedConnectionCounter) PendingConnectionAcquire() int {
    255 	cc.mu.Lock()
    256 	defer cc.mu.Unlock()
    257 
    258 	return cc.pendingConnectionAcquire
    259 }
    260 
    261 // AddActiveRequest increments the count of active requests.
    262 func (cc *SharedConnectionCounter) AddActiveRequest() {
    263 	cc.mu.Lock()
    264 	defer cc.mu.Unlock()
    265 
    266 	cc.activeRequests++
    267 }
    268 
    269 // RemoveActiveRequest decrements the count of active requests.
    270 func (cc *SharedConnectionCounter) RemoveActiveRequest() {
    271 	cc.mu.Lock()
    272 	defer cc.mu.Unlock()
    273 
    274 	cc.activeRequests--
    275 }
    276 
    277 // AddPendingConnectionAcquire increments the count of pending connection acquires.
    278 func (cc *SharedConnectionCounter) AddPendingConnectionAcquire() {
    279 	cc.mu.Lock()
    280 	defer cc.mu.Unlock()
    281 
    282 	cc.pendingConnectionAcquire++
    283 }
    284 
    285 // RemovePendingConnectionAcquire decrements the count of pending connection acquires.
    286 func (cc *SharedConnectionCounter) RemovePendingConnectionAcquire() {
    287 	cc.mu.Lock()
    288 	defer cc.mu.Unlock()
    289 
    290 	cc.pendingConnectionAcquire--
    291 }
    292 
    293 // InitMetricContext initializes the metric context with the provided counter and publisher.
    294 // It returns the updated context.
    295 func InitMetricContext(
    296 	ctx context.Context, counter *SharedConnectionCounter, publisher MetricPublisher,
    297 ) context.Context {
    298 	if middleware.GetStackValue(ctx, metricContextKey{}) == nil {
    299 		ctx = middleware.WithStackValue(ctx, metricContextKey{}, &MetricContext{
    300 			connectionCounter: counter,
    301 			publisher:         publisher,
    302 			data: &MetricData{
    303 				Attempts: []AttemptMetrics{},
    304 				Stream:   StreamMetrics{},
    305 			},
    306 		})
    307 	}
    308 	return ctx
    309 }
    310 
    311 // Context returns the metric context from the given context.
    312 // It returns nil if the metric context is not found.
    313 func Context(ctx context.Context) *MetricContext {
    314 	mctx := middleware.GetStackValue(ctx, metricContextKey{})
    315 	if mctx == nil {
    316 		return nil
    317 	}
    318 	return mctx.(*MetricContext)
    319 }