-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add standalone activity metrics #8759
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: heartbeat
Are you sure you want to change the base?
Conversation
| Token *tokenspb.Task | ||
| Request R | ||
| // RequestWithContext wraps a request context specific metadata. | ||
| type RequestWithContext[R any] struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using @dandavison naming. Open to better ideas.
|
cursor review |
| // RequestWithContext wraps a request context specific metadata. | ||
| type RequestWithContext[R any] struct { | ||
| Request R | ||
| Token *tokenspb.Task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is specific to certain worker APIs isn't it. So I think it's starting to feel a bit hacky to share the same wrapper across multiple APIs that only need some things.
| Token *tokenspb.Task | ||
| MetricsHandler metrics.Handler | ||
| NamespaceName namespace.Name | ||
| BreakdownMetricsByTaskQueue dynamicconfig.BoolPropertyFnWithTaskQueueFilter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it definitely appropriate to pass this; DC can be queried where needed, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to pass this in.
| type WithToken[R any] struct { | ||
| Token *tokenspb.Task | ||
| Request R | ||
| // RequestWithContext wraps a request context specific metadata. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // RequestWithContext wraps a request context specific metadata. | |
| // RequestWithContext wraps a request with context-specific metadata. |
| // recordOnAttemptedMetrics records metrics for attempted activities, including retries and originating from any | ||
| // terminal state transitions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite follow the "and originating from any terminal state transitions" bit.
For the first bit, would "records metrics for an activity attempt" be clearer?
| event.handler, | ||
| event.breakdownMetricsByTaskQueue, | ||
| event.operationTag, | ||
| event.timeoutType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm finding it odd that we're referencing GetStartedTime time in a transition to Scheduled.
I don't think attempt.GetStartedTime().AsTime() is guaranteed to be zero here, right? So then we'll be emitting ActivityStartToCloseLatency which would be wrong?
| metricsHandler, | ||
| breakdownMetricsByTaskQueue, | ||
| operationTag, | ||
| timeoutType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this not mean that recordOnAttemptedMetrics() gets called twice, since it is called when a retry transitions to SCHEDULED?
|
|
||
| // recordOnAttemptedMetrics records metrics for attempted activities, including retries and originating from any | ||
| // terminal state transitions. | ||
| func (a *Activity) recordOnAttemptedMetrics( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not urgent but when we get time I'd like us to think about a consistent and sensible mapping of events to method names. I am already finding methods named "handleX" and "recordX" confusingly similar, and now we're adding "recordX" with a second meaning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggesting the verb emit here.
| Token *tokenspb.Task | ||
| Request R | ||
| // RequestWithContext wraps a request context specific metadata. | ||
| type RequestWithContext[R any] struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would create a struct per request and avoid this generic wrapper.
bergundy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks really good. I would consider restructuring the code as I suggested to make it easier to follow.
|
|
||
| // recordOnAttemptedMetrics records metrics for attempted activities, including retries and originating from any | ||
| // terminal state transitions. | ||
| func (a *Activity) recordOnAttemptedMetrics( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggesting the verb emit here.
| ) { | ||
| taskQueueFamily := a.GetTaskQueue().GetName() | ||
|
|
||
| handler := metrics.GetPerTaskQueueFamilyScope( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest passing in a fully baked metrics handler into the application logic.
You can build the metrics handler in the API handlers or task executors if needed, it will save you from carrying all of these parameters around.
| Details: details, | ||
| }) | ||
|
|
||
| recordPayloadSize(details.Size(), req.MetricsHandler, req.NamespaceName.String(), metrics.HistoryRecordActivityTaskHeartbeatScope) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need these payload size metrics IMHO, we added them to understand the implications of putting payloads in mutable state for standalone activities.
| breakdownMetricsByTaskQueue(namespaceName, taskQueueFamily, enumspb.TASK_QUEUE_TYPE_ACTIVITY), | ||
| metrics.OperationTag(operationTag), | ||
| metrics.ActivityTypeTag(a.GetActivityType().GetName()), | ||
| // metrics.VersioningBehaviorTag(versioningBehavior), TODO add when we have versioning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't not emit this tag here, all metrics have to have the same tags no matter where they're omitted from.
| scheduleToCloseLatency := time.Since(a.GetScheduledTime().AsTime()) | ||
| metrics.ActivityScheduleToCloseLatency.With(handler).Record(scheduleToCloseLatency) | ||
|
|
||
| switch operationTag { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be much easier to follow this code if you emitted the appropriate metric where it is relevant.
| type timeoutTaskExecutorOptions struct { | ||
| fx.In | ||
|
|
||
| Dc *dynamicconfig.Collection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the config struct that Dan added in a separate PR and initialize it in fx instead of using the collection directly.
| Attempt: attempt.GetCount(), | ||
| }) | ||
|
|
||
| recordPayloadSize(event.inputSize, event.handler, event.namespace.String(), metrics.HistoryRecordActivityTaskStartedScope) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, not required to record this IMHO.
What changed?
Add standalone activity metrics
Why?
Need standalone activity metrics for observability purposes
How did you test it?
Note
Adds comprehensive metrics and payload-size recording for standalone activities, introducing contextual request wrappers and wiring metrics through state transitions, timeout executors, and history APIs.
RequestWithContextcarryingToken,MetricsHandler,NamespaceName, andBreakdownMetricsByTaskQueueand replaces prior request wrappers.ActivityStartToCloseLatency,ActivityScheduleToCloseLatency, success/fail/cancel/timeout counters, and per-timeout tags.recordPayloadSize(...)to emit payload sizes for input, heartbeat details, results, and failures.chasm/lib/activity/activity_tasks.go):timeoutTaskExecutorOptions(dynamic config, metrics, namespace registry); resolve namespace and emit timeout metrics during schedule/start/close/heartbeat timeouts and retries.metrics.Handlerandnamespace.Registry;StartActivityExecutionemits input payload size on scheduling.RequestWithContext(token, metrics handler, namespace, breakdown setting) into chasm component calls.Written by Cursor Bugbot for commit 02c6aba. This will update automatically on new commits. Configure here.