metrics.go (11385B)
1 // Package metrics implements metrics gathering for SDK development purposes. 2 // 3 // This package is designated as private and is intended for use only by the 4 // AWS client runtime. The exported API therein is not considered stable and 5 // is subject to breaking changes without notice. 6 package metrics 7 8 import ( 9 "context" 10 "encoding/json" 11 "fmt" 12 "sync" 13 "time" 14 15 "github.com/aws/smithy-go/middleware" 16 ) 17 18 const ( 19 // ServiceIDKey is the key for the service ID metric. 20 ServiceIDKey = "ServiceId" 21 // OperationNameKey is the key for the operation name metric. 22 OperationNameKey = "OperationName" 23 // ClientRequestIDKey is the key for the client request ID metric. 24 ClientRequestIDKey = "ClientRequestId" 25 // APICallDurationKey is the key for the API call duration metric. 26 APICallDurationKey = "ApiCallDuration" 27 // APICallSuccessfulKey is the key for the API call successful metric. 28 APICallSuccessfulKey = "ApiCallSuccessful" 29 // MarshallingDurationKey is the key for the marshalling duration metric. 30 MarshallingDurationKey = "MarshallingDuration" 31 // InThroughputKey is the key for the input throughput metric. 32 InThroughputKey = "InThroughput" 33 // OutThroughputKey is the key for the output throughput metric. 34 OutThroughputKey = "OutThroughput" 35 // RetryCountKey is the key for the retry count metric. 36 RetryCountKey = "RetryCount" 37 // HTTPStatusCodeKey is the key for the HTTP status code metric. 38 HTTPStatusCodeKey = "HttpStatusCode" 39 // AWSExtendedRequestIDKey is the key for the AWS extended request ID metric. 40 AWSExtendedRequestIDKey = "AwsExtendedRequestId" 41 // AWSRequestIDKey is the key for the AWS request ID metric. 42 AWSRequestIDKey = "AwsRequestId" 43 // BackoffDelayDurationKey is the key for the backoff delay duration metric. 44 BackoffDelayDurationKey = "BackoffDelayDuration" 45 // StreamThroughputKey is the key for the stream throughput metric. 46 StreamThroughputKey = "Throughput" 47 // ConcurrencyAcquireDurationKey is the key for the concurrency acquire duration metric. 48 ConcurrencyAcquireDurationKey = "ConcurrencyAcquireDuration" 49 // PendingConcurrencyAcquiresKey is the key for the pending concurrency acquires metric. 50 PendingConcurrencyAcquiresKey = "PendingConcurrencyAcquires" 51 // SigningDurationKey is the key for the signing duration metric. 52 SigningDurationKey = "SigningDuration" 53 // UnmarshallingDurationKey is the key for the unmarshalling duration metric. 54 UnmarshallingDurationKey = "UnmarshallingDuration" 55 // TimeToFirstByteKey is the key for the time to first byte metric. 56 TimeToFirstByteKey = "TimeToFirstByte" 57 // ServiceCallDurationKey is the key for the service call duration metric. 58 ServiceCallDurationKey = "ServiceCallDuration" 59 // EndpointResolutionDurationKey is the key for the endpoint resolution duration metric. 60 EndpointResolutionDurationKey = "EndpointResolutionDuration" 61 // AttemptNumberKey is the key for the attempt number metric. 62 AttemptNumberKey = "AttemptNumber" 63 // MaxConcurrencyKey is the key for the max concurrency metric. 64 MaxConcurrencyKey = "MaxConcurrency" 65 // AvailableConcurrencyKey is the key for the available concurrency metric. 66 AvailableConcurrencyKey = "AvailableConcurrency" 67 ) 68 69 // MetricPublisher provides the interface to provide custom MetricPublishers. 70 // PostRequestMetrics will be invoked by the MetricCollection middleware to post request. 71 // PostStreamMetrics will be invoked by ReadCloserWithMetrics to post stream metrics. 72 type MetricPublisher interface { 73 PostRequestMetrics(*MetricData) error 74 PostStreamMetrics(*MetricData) error 75 } 76 77 // Serializer provides the interface to provide custom Serializers. 78 // Serialize will transform any input object in its corresponding string representation. 79 type Serializer interface { 80 Serialize(obj interface{}) (string, error) 81 } 82 83 // DefaultSerializer is an implementation of the Serializer interface. 84 type DefaultSerializer struct{} 85 86 // Serialize uses the default JSON serializer to obtain the string representation of an object. 87 func (DefaultSerializer) Serialize(obj interface{}) (string, error) { 88 bytes, err := json.Marshal(obj) 89 if err != nil { 90 return "", err 91 } 92 return string(bytes), nil 93 } 94 95 type metricContextKey struct{} 96 97 // MetricContext contains fields to store metric-related information. 98 type MetricContext struct { 99 connectionCounter *SharedConnectionCounter 100 publisher MetricPublisher 101 data *MetricData 102 } 103 104 // MetricData stores the collected metric data. 105 type MetricData struct { 106 RequestStartTime time.Time 107 RequestEndTime time.Time 108 APICallDuration time.Duration 109 SerializeStartTime time.Time 110 SerializeEndTime time.Time 111 MarshallingDuration time.Duration 112 ResolveEndpointStartTime time.Time 113 ResolveEndpointEndTime time.Time 114 EndpointResolutionDuration time.Duration 115 InThroughput float64 116 OutThroughput float64 117 RetryCount int 118 Success uint8 119 StatusCode int 120 ClientRequestID string 121 ServiceID string 122 OperationName string 123 PartitionID string 124 Region string 125 RequestContentLength int64 126 Stream StreamMetrics 127 Attempts []AttemptMetrics 128 } 129 130 // StreamMetrics stores metrics related to streaming data. 131 type StreamMetrics struct { 132 ReadDuration time.Duration 133 ReadBytes int64 134 Throughput float64 135 } 136 137 // AttemptMetrics stores metrics related to individual attempts. 138 type AttemptMetrics struct { 139 ServiceCallStart time.Time 140 ServiceCallEnd time.Time 141 ServiceCallDuration time.Duration 142 FirstByteTime time.Time 143 TimeToFirstByte time.Duration 144 ConnRequestedTime time.Time 145 ConnObtainedTime time.Time 146 ConcurrencyAcquireDuration time.Duration 147 CredentialFetchStartTime time.Time 148 CredentialFetchEndTime time.Time 149 SignStartTime time.Time 150 SignEndTime time.Time 151 SigningDuration time.Duration 152 DeserializeStartTime time.Time 153 DeserializeEndTime time.Time 154 UnMarshallingDuration time.Duration 155 RetryDelay time.Duration 156 ResponseContentLength int64 157 StatusCode int 158 RequestID string 159 ExtendedRequestID string 160 HTTPClient string 161 MaxConcurrency int 162 PendingConnectionAcquires int 163 AvailableConcurrency int 164 ActiveRequests int 165 ReusedConnection bool 166 } 167 168 // Data returns the MetricData associated with the MetricContext. 169 func (mc *MetricContext) Data() *MetricData { 170 return mc.data 171 } 172 173 // ConnectionCounter returns the SharedConnectionCounter associated with the MetricContext. 174 func (mc *MetricContext) ConnectionCounter() *SharedConnectionCounter { 175 return mc.connectionCounter 176 } 177 178 // Publisher returns the MetricPublisher associated with the MetricContext. 179 func (mc *MetricContext) Publisher() MetricPublisher { 180 return mc.publisher 181 } 182 183 // ComputeRequestMetrics calculates and populates derived metrics based on the collected data. 184 func (md *MetricData) ComputeRequestMetrics() { 185 186 for idx := range md.Attempts { 187 attempt := &md.Attempts[idx] 188 attempt.ConcurrencyAcquireDuration = attempt.ConnObtainedTime.Sub(attempt.ConnRequestedTime) 189 attempt.SigningDuration = attempt.SignEndTime.Sub(attempt.SignStartTime) 190 attempt.UnMarshallingDuration = attempt.DeserializeEndTime.Sub(attempt.DeserializeStartTime) 191 attempt.TimeToFirstByte = attempt.FirstByteTime.Sub(attempt.ServiceCallStart) 192 attempt.ServiceCallDuration = attempt.ServiceCallEnd.Sub(attempt.ServiceCallStart) 193 } 194 195 md.APICallDuration = md.RequestEndTime.Sub(md.RequestStartTime) 196 md.MarshallingDuration = md.SerializeEndTime.Sub(md.SerializeStartTime) 197 md.EndpointResolutionDuration = md.ResolveEndpointEndTime.Sub(md.ResolveEndpointStartTime) 198 199 md.RetryCount = len(md.Attempts) - 1 200 201 latestAttempt, err := md.LatestAttempt() 202 203 if err != nil { 204 fmt.Printf("error retrieving attempts data due to: %s. Skipping Throughput metrics", err.Error()) 205 } else { 206 207 md.StatusCode = latestAttempt.StatusCode 208 209 if md.Success == 1 { 210 if latestAttempt.ResponseContentLength > 0 && latestAttempt.ServiceCallDuration > 0 { 211 md.InThroughput = float64(latestAttempt.ResponseContentLength) / latestAttempt.ServiceCallDuration.Seconds() 212 } 213 if md.RequestContentLength > 0 && latestAttempt.ServiceCallDuration > 0 { 214 md.OutThroughput = float64(md.RequestContentLength) / latestAttempt.ServiceCallDuration.Seconds() 215 } 216 } 217 } 218 } 219 220 // LatestAttempt returns the latest attempt metrics. 221 // It returns an error if no attempts are initialized. 222 func (md *MetricData) LatestAttempt() (*AttemptMetrics, error) { 223 if md.Attempts == nil || len(md.Attempts) == 0 { 224 return nil, fmt.Errorf("no attempts initialized. NewAttempt() should be called first") 225 } 226 return &md.Attempts[len(md.Attempts)-1], nil 227 } 228 229 // NewAttempt initializes new attempt metrics. 230 func (md *MetricData) NewAttempt() { 231 if md.Attempts == nil { 232 md.Attempts = []AttemptMetrics{} 233 } 234 md.Attempts = append(md.Attempts, AttemptMetrics{}) 235 } 236 237 // SharedConnectionCounter is a counter shared across API calls. 238 type SharedConnectionCounter struct { 239 mu sync.Mutex 240 241 activeRequests int 242 pendingConnectionAcquire int 243 } 244 245 // ActiveRequests returns the count of active requests. 246 func (cc *SharedConnectionCounter) ActiveRequests() int { 247 cc.mu.Lock() 248 defer cc.mu.Unlock() 249 250 return cc.activeRequests 251 } 252 253 // PendingConnectionAcquire returns the count of pending connection acquires. 254 func (cc *SharedConnectionCounter) PendingConnectionAcquire() int { 255 cc.mu.Lock() 256 defer cc.mu.Unlock() 257 258 return cc.pendingConnectionAcquire 259 } 260 261 // AddActiveRequest increments the count of active requests. 262 func (cc *SharedConnectionCounter) AddActiveRequest() { 263 cc.mu.Lock() 264 defer cc.mu.Unlock() 265 266 cc.activeRequests++ 267 } 268 269 // RemoveActiveRequest decrements the count of active requests. 270 func (cc *SharedConnectionCounter) RemoveActiveRequest() { 271 cc.mu.Lock() 272 defer cc.mu.Unlock() 273 274 cc.activeRequests-- 275 } 276 277 // AddPendingConnectionAcquire increments the count of pending connection acquires. 278 func (cc *SharedConnectionCounter) AddPendingConnectionAcquire() { 279 cc.mu.Lock() 280 defer cc.mu.Unlock() 281 282 cc.pendingConnectionAcquire++ 283 } 284 285 // RemovePendingConnectionAcquire decrements the count of pending connection acquires. 286 func (cc *SharedConnectionCounter) RemovePendingConnectionAcquire() { 287 cc.mu.Lock() 288 defer cc.mu.Unlock() 289 290 cc.pendingConnectionAcquire-- 291 } 292 293 // InitMetricContext initializes the metric context with the provided counter and publisher. 294 // It returns the updated context. 295 func InitMetricContext( 296 ctx context.Context, counter *SharedConnectionCounter, publisher MetricPublisher, 297 ) context.Context { 298 if middleware.GetStackValue(ctx, metricContextKey{}) == nil { 299 ctx = middleware.WithStackValue(ctx, metricContextKey{}, &MetricContext{ 300 connectionCounter: counter, 301 publisher: publisher, 302 data: &MetricData{ 303 Attempts: []AttemptMetrics{}, 304 Stream: StreamMetrics{}, 305 }, 306 }) 307 } 308 return ctx 309 } 310 311 // Context returns the metric context from the given context. 312 // It returns nil if the metric context is not found. 313 func Context(ctx context.Context) *MetricContext { 314 mctx := middleware.GetStackValue(ctx, metricContextKey{}) 315 if mctx == nil { 316 return nil 317 } 318 return mctx.(*MetricContext) 319 }