-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add standalone activity metrics #8759
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: heartbeat
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,10 @@ import ( | |
| "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" | ||
| "go.temporal.io/server/common" | ||
| "go.temporal.io/server/common/backoff" | ||
| "go.temporal.io/server/common/dynamicconfig" | ||
| "go.temporal.io/server/common/metrics" | ||
| "go.temporal.io/server/common/namespace" | ||
| "go.temporal.io/server/common/tqid" | ||
| "google.golang.org/protobuf/types/known/durationpb" | ||
| "google.golang.org/protobuf/types/known/timestamppb" | ||
| ) | ||
|
|
@@ -54,10 +58,13 @@ type Activity struct { | |
| Store chasm.Field[ActivityStore] | ||
| } | ||
|
|
||
| // WithToken wraps a request with its deserialized task token. | ||
| type WithToken[R any] struct { | ||
| Token *tokenspb.Task | ||
| Request R | ||
| // RequestWithContext wraps a request context specific metadata. | ||
| type RequestWithContext[R any] struct { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using @dandavison naming. Open to better ideas.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would create a struct per request and avoid this generic wrapper. |
||
| Request R | ||
| Token *tokenspb.Task | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is specific to certain worker APIs isn't it. So I think it's starting to feel a bit hacky to share the same wrapper across multiple APIs that only need some things. |
||
| MetricsHandler metrics.Handler | ||
| NamespaceName namespace.Name | ||
| BreakdownMetricsByTaskQueue dynamicconfig.BoolPropertyFnWithTaskQueueFilter | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it definitely appropriate to pass this; DC can be queried where needed, right?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need to pass this in. |
||
| } | ||
|
|
||
| func (a *Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState { | ||
|
|
@@ -237,14 +244,14 @@ func (a *Activity) RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx ch | |
| // HandleCompleted updates the activity on activity completion. | ||
| func (a *Activity) HandleCompleted( | ||
| ctx chasm.MutableContext, | ||
| input WithToken[*historyservice.RespondActivityTaskCompletedRequest], | ||
| req RequestWithContext[*historyservice.RespondActivityTaskCompletedRequest], | ||
| ) (*historyservice.RespondActivityTaskCompletedResponse, error) { | ||
| // TODO(dan): add test coverage for this validation | ||
| if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil { | ||
| if err := ValidateActivityTaskToken(ctx, a, req.Token); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| if err := TransitionCompleted.Apply(a, ctx, input.Request); err != nil { | ||
| if err := TransitionCompleted.Apply(a, ctx, req); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
|
|
@@ -255,14 +262,14 @@ func (a *Activity) HandleCompleted( | |
| // for retry instead. | ||
| func (a *Activity) HandleFailed( | ||
| ctx chasm.MutableContext, | ||
| input WithToken[*historyservice.RespondActivityTaskFailedRequest], | ||
| req RequestWithContext[*historyservice.RespondActivityTaskFailedRequest], | ||
| ) (*historyservice.RespondActivityTaskFailedResponse, error) { | ||
| // TODO(dan): add test coverage for this validation | ||
| if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil { | ||
| if err := ValidateActivityTaskToken(ctx, a, req.Token); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| failure := input.Request.GetFailedRequest().GetFailure() | ||
| failure := req.Request.GetFailedRequest().GetFailure() | ||
|
|
||
| shouldRetry, retryInterval, err := a.shouldRetryOnFailure(ctx, failure) | ||
| if err != nil { | ||
|
|
@@ -271,8 +278,12 @@ func (a *Activity) HandleFailed( | |
|
|
||
| if shouldRetry { | ||
| if err := TransitionRescheduled.Apply(a, ctx, rescheduleEvent{ | ||
| retryInterval: retryInterval, | ||
| failure: failure, | ||
| retryInterval: retryInterval, | ||
| failure: failure, | ||
| handler: req.MetricsHandler, | ||
| namespace: req.NamespaceName, | ||
| breakdownMetricsByTaskQueue: req.BreakdownMetricsByTaskQueue, | ||
| operationTag: metrics.HistoryRespondActivityTaskFailedScope, | ||
| }); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
@@ -281,7 +292,7 @@ func (a *Activity) HandleFailed( | |
| } | ||
|
|
||
| // No more retries, transition to failed state | ||
| if err := TransitionFailed.Apply(a, ctx, input.Request); err != nil { | ||
| if err := TransitionFailed.Apply(a, ctx, req); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
|
|
@@ -291,14 +302,14 @@ func (a *Activity) HandleFailed( | |
| // HandleCanceled updates the activity on activity canceled. | ||
| func (a *Activity) HandleCanceled( | ||
| ctx chasm.MutableContext, | ||
| input WithToken[*historyservice.RespondActivityTaskCanceledRequest], | ||
| req RequestWithContext[*historyservice.RespondActivityTaskCanceledRequest], | ||
| ) (*historyservice.RespondActivityTaskCanceledResponse, error) { | ||
| // TODO(dan): add test coverage for this validation | ||
| if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil { | ||
| if err := ValidateActivityTaskToken(ctx, a, req.Token); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| if err := TransitionCanceled.Apply(a, ctx, input.Request); err != nil { | ||
| if err := TransitionCanceled.Apply(a, ctx, req); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
|
|
@@ -509,15 +520,21 @@ func createHeartbeatTimeoutFailure() *failurepb.Failure { | |
| // RecordHeartbeat records a heartbeat for the activity. | ||
| func (a *Activity) RecordHeartbeat( | ||
| ctx chasm.MutableContext, | ||
| input WithToken[*historyservice.RecordActivityTaskHeartbeatRequest], | ||
| req RequestWithContext[*historyservice.RecordActivityTaskHeartbeatRequest], | ||
| ) (*historyservice.RecordActivityTaskHeartbeatResponse, error) { | ||
| if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil { | ||
| if err := ValidateActivityTaskToken(ctx, a, req.Token); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| details := req.Request.HeartbeatRequest.GetDetails() | ||
|
|
||
| a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{ | ||
| RecordedTime: timestamppb.New(ctx.Now(a)), | ||
| Details: input.Request.HeartbeatRequest.GetDetails(), | ||
| Details: details, | ||
| }) | ||
|
|
||
| recordPayloadSize(details.Size(), req.MetricsHandler, req.NamespaceName.String(), metrics.HistoryRecordActivityTaskHeartbeatScope) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need these payload size metrics IMHO, we added them to understand the implications of putting payloads in mutable state for standalone activities. |
||
|
|
||
| return &historyservice.RecordActivityTaskHeartbeatResponse{ | ||
| CancelRequested: a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, | ||
| // TODO(dan): ActivityPaused, ActivityReset | ||
|
|
@@ -679,3 +696,103 @@ func (a *Activity) buildPollActivityExecutionResponse( | |
| FrontendResponse: response, | ||
| }, nil | ||
| } | ||
|
|
||
| // recordOnAttemptedMetrics records metrics for attempted activities, including retries and originating from any | ||
| // terminal state transitions. | ||
|
Comment on lines
+700
to
+701
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't quite follow the "and originating from any terminal state transitions" bit. For the first bit, would "records metrics for an activity attempt" be clearer? |
||
| func (a *Activity) recordOnAttemptedMetrics( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not urgent but when we get time I'd like us to think about a consistent and sensible mapping of events to method names. I am already finding methods named "handleX" and "recordX" confusingly similar, and now we're adding "recordX" with a second meaning.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggesting the verb |
||
| startedTime time.Time, | ||
| namespaceName string, | ||
| metricsHandler metrics.Handler, | ||
| breakdownMetricsByTaskQueue dynamicconfig.BoolPropertyFnWithTaskQueueFilter, | ||
| operationTag string, | ||
| timeoutType enumspb.TimeoutType, | ||
| ) { | ||
| taskQueueFamily := a.GetTaskQueue().GetName() | ||
|
|
||
| handler := metrics.GetPerTaskQueueFamilyScope( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest passing in a fully baked metrics handler into the application logic. You can build the metrics handler in the API handlers or task executors if needed, it will save you from carrying all of these parameters around. |
||
| metricsHandler, | ||
| namespaceName, | ||
| tqid.UnsafeTaskQueueFamily(namespaceName, taskQueueFamily), | ||
| breakdownMetricsByTaskQueue(namespaceName, taskQueueFamily, enumspb.TASK_QUEUE_TYPE_ACTIVITY), | ||
| metrics.OperationTag(operationTag), | ||
| metrics.ActivityTypeTag(a.GetActivityType().GetName()), | ||
| // metrics.VersioningBehaviorTag(versioningBehavior), TODO add when we have versioning | ||
| ) | ||
|
|
||
| if !startedTime.IsZero() { | ||
| latency := time.Since(startedTime) | ||
| metrics.ActivityStartToCloseLatency.With(handler).Record(latency) | ||
| } | ||
|
|
||
| switch operationTag { | ||
| case metrics.HistoryRespondActivityTaskFailedScope: | ||
| metrics.ActivityTaskFail.With(handler).Record(1) | ||
| case metrics.TimerActiveTaskActivityTimeoutScope: | ||
| timeoutTag := metrics.StringTag("timeout_type", timeoutType.String()) | ||
| metrics.ActivityTaskTimeout.With(handler).Record(1, timeoutTag) | ||
| default: | ||
| // Ignore | ||
| } | ||
| } | ||
|
|
||
| // recordOnClosedMetrics records metrics on transition to a terminal state. It always calls recordOnAttemptedMetrics to | ||
| // record metrics for the attempted activity as this transition is also an attempt.. | ||
| func (a *Activity) recordOnClosedMetrics( | ||
| startedTime time.Time, | ||
| namespaceName string, | ||
| metricsHandler metrics.Handler, | ||
| breakdownMetricsByTaskQueue dynamicconfig.BoolPropertyFnWithTaskQueueFilter, | ||
| operationTag string, | ||
| timeoutType enumspb.TimeoutType, | ||
| ) { | ||
| a.recordOnAttemptedMetrics( | ||
| startedTime, | ||
| namespaceName, | ||
| metricsHandler, | ||
| breakdownMetricsByTaskQueue, | ||
| operationTag, | ||
| timeoutType) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this not mean that |
||
|
|
||
| taskQueueFamily := a.GetTaskQueue().GetName() | ||
|
|
||
| handler := metrics.GetPerTaskQueueFamilyScope( | ||
| metricsHandler, | ||
| namespaceName, | ||
| tqid.UnsafeTaskQueueFamily(namespaceName, taskQueueFamily), | ||
| breakdownMetricsByTaskQueue(namespaceName, taskQueueFamily, enumspb.TASK_QUEUE_TYPE_ACTIVITY), | ||
| metrics.OperationTag(operationTag), | ||
| metrics.ActivityTypeTag(a.GetActivityType().GetName()), | ||
| // metrics.VersioningBehaviorTag(versioningBehavior), TODO add when we have versioning | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can't not emit this tag here, all metrics have to have the same tags no matter where they're omitted from. |
||
| ) | ||
|
|
||
| scheduleToCloseLatency := time.Since(a.GetScheduledTime().AsTime()) | ||
| metrics.ActivityScheduleToCloseLatency.With(metricsHandler).Record(scheduleToCloseLatency) | ||
fretz12 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| switch operationTag { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be much easier to follow this code if you emitted the appropriate metric where it is relevant. |
||
| case metrics.HistoryRespondActivityTaskCompletedScope: | ||
| metrics.ActivitySuccess.With(handler).Record(1) | ||
| case metrics.HistoryRespondActivityTaskFailedScope: | ||
| metrics.ActivityFail.With(handler).Record(1) | ||
| case metrics.HistoryRespondActivityTaskCanceledScope: | ||
| metrics.ActivityCancel.With(handler).Record(1) | ||
| case metrics.TimerActiveTaskActivityTimeoutScope: | ||
| timeoutTag := metrics.StringTag("timeout_type", timeoutType.String()) | ||
| metrics.ActivityTimeout.With(handler).Record(1, timeoutTag) | ||
| default: | ||
| // Ignore | ||
| } | ||
| } | ||
|
|
||
| func recordPayloadSize( | ||
| payloadSize int, | ||
| handler metrics.Handler, | ||
| namespaceName string, | ||
| operationTag string, | ||
| ) { | ||
| if payloadSize > 0 { | ||
| metrics.ActivityPayloadSize.With(handler).Record( | ||
| int64(payloadSize), | ||
| metrics.OperationTag(operationTag), | ||
| metrics.NamespaceTag(namespaceName)) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.