Skip to content

Conversation

@fretz12
Copy link
Contributor

@fretz12 fretz12 commented Dec 4, 2025

What changed?

Add standalone activity metrics

Why?

Need standalone activity metrics for observability purposes

How did you test it?

  • built
  • run locally and tested manually
  • covered by existing tests
  • added new unit test(s)
  • added new functional test(s)

Note

Adds comprehensive metrics and payload-size recording for standalone activities, introducing contextual request wrappers and wiring metrics through state transitions, timeout executors, and history APIs.

  • Activity component/state machine:
    • Introduces RequestWithContext carrying Token, MetricsHandler, NamespaceName, and BreakdownMetricsByTaskQueue and replaces prior request wrappers.
    • Records metrics on schedule, attempts, and terminal transitions (success, fail, cancel, timeouts), including ActivityStartToCloseLatency, ActivityScheduleToCloseLatency, success/fail/cancel/timeout counters, and per-timeout tags.
    • Adds recordPayloadSize(...) to emit payload sizes for input, heartbeat details, results, and failures.
  • Timeout and dispatch executors (chasm/lib/activity/activity_tasks.go):
    • Add timeoutTaskExecutorOptions (dynamic config, metrics, namespace registry); resolve namespace and emit timeout metrics during schedule/start/close/heartbeat timeouts and retries.
  • Activity handler:
    • Injects metrics.Handler and namespace.Registry; StartActivityExecution emits input payload size on scheduling.
  • History APIs (record/respond activity ops):
    • Pass RequestWithContext (token, metrics handler, namespace, breakdown setting) into chasm component calls.
  • Tests:
    • Update/add unit tests to validate metric emissions and payload-size recording across transitions and timeouts.

Written by Cursor Bugbot for commit 02c6aba. This will update automatically on new commits. Configure here.

Token *tokenspb.Task
Request R
// RequestWithContext wraps a request context specific metadata.
type RequestWithContext[R any] struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using @dandavison naming. Open to better ideas.

@fretz12 fretz12 marked this pull request as ready for review December 4, 2025 23:19
@fretz12 fretz12 requested review from a team as code owners December 4, 2025 23:19
@fretz12
Copy link
Contributor Author

fretz12 commented Dec 4, 2025

cursor review

// RequestWithContext wraps a request context specific metadata.
type RequestWithContext[R any] struct {
Request R
Token *tokenspb.Task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is specific to certain worker APIs isn't it. So I think it's starting to feel a bit hacky to share the same wrapper across multiple APIs that only need some things.

Token *tokenspb.Task
MetricsHandler metrics.Handler
NamespaceName namespace.Name
BreakdownMetricsByTaskQueue dynamicconfig.BoolPropertyFnWithTaskQueueFilter
Copy link
Contributor

@dandavison dandavison Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it definitely appropriate to pass this; DC can be queried where needed, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to pass this in.

type WithToken[R any] struct {
Token *tokenspb.Task
Request R
// RequestWithContext wraps a request context specific metadata.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// RequestWithContext wraps a request context specific metadata.
// RequestWithContext wraps a request with context-specific metadata.

Comment on lines +700 to +701
// recordOnAttemptedMetrics records metrics for attempted activities, including retries and originating from any
// terminal state transitions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite follow the "and originating from any terminal state transitions" bit.

For the first bit, would "records metrics for an activity attempt" be clearer?

event.handler,
event.breakdownMetricsByTaskQueue,
event.operationTag,
event.timeoutType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm finding it odd that we're referencing GetStartedTime time in a transition to Scheduled.

I don't think attempt.GetStartedTime().AsTime() is guaranteed to be zero here, right? So then we'll be emitting ActivityStartToCloseLatency which would be wrong?

metricsHandler,
breakdownMetricsByTaskQueue,
operationTag,
timeoutType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this not mean that recordOnAttemptedMetrics() gets called twice, since it is called when a retry transitions to SCHEDULED?


// recordOnAttemptedMetrics records metrics for attempted activities, including retries and originating from any
// terminal state transitions.
func (a *Activity) recordOnAttemptedMetrics(
Copy link
Contributor

@dandavison dandavison Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not urgent but when we get time I'd like us to think about a consistent and sensible mapping of events to method names. I am already finding methods named "handleX" and "recordX" confusingly similar, and now we're adding "recordX" with a second meaning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggesting the verb emit here.

Token *tokenspb.Task
Request R
// RequestWithContext wraps a request context specific metadata.
type RequestWithContext[R any] struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would create a struct per request and avoid this generic wrapper.

Copy link
Member

@bergundy bergundy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks really good. I would consider restructuring the code as I suggested to make it easier to follow.


// recordOnAttemptedMetrics records metrics for attempted activities, including retries and originating from any
// terminal state transitions.
func (a *Activity) recordOnAttemptedMetrics(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggesting the verb emit here.

) {
taskQueueFamily := a.GetTaskQueue().GetName()

handler := metrics.GetPerTaskQueueFamilyScope(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest passing in a fully baked metrics handler into the application logic.

You can build the metrics handler in the API handlers or task executors if needed, it will save you from carrying all of these parameters around.

Details: details,
})

recordPayloadSize(details.Size(), req.MetricsHandler, req.NamespaceName.String(), metrics.HistoryRecordActivityTaskHeartbeatScope)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need these payload size metrics IMHO, we added them to understand the implications of putting payloads in mutable state for standalone activities.

breakdownMetricsByTaskQueue(namespaceName, taskQueueFamily, enumspb.TASK_QUEUE_TYPE_ACTIVITY),
metrics.OperationTag(operationTag),
metrics.ActivityTypeTag(a.GetActivityType().GetName()),
// metrics.VersioningBehaviorTag(versioningBehavior), TODO add when we have versioning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't not emit this tag here, all metrics have to have the same tags no matter where they're omitted from.

scheduleToCloseLatency := time.Since(a.GetScheduledTime().AsTime())
metrics.ActivityScheduleToCloseLatency.With(handler).Record(scheduleToCloseLatency)

switch operationTag {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be much easier to follow this code if you emitted the appropriate metric where it is relevant.

type timeoutTaskExecutorOptions struct {
fx.In

Dc *dynamicconfig.Collection
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the config struct that Dan added in a separate PR and initialize it in fx instead of using the collection directly.

Attempt: attempt.GetCount(),
})

recordPayloadSize(event.inputSize, event.handler, event.namespace.String(), metrics.HistoryRecordActivityTaskStartedScope)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, not required to record this IMHO.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants