Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 136 additions & 19 deletions chasm/lib/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/tqid"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -54,10 +58,13 @@ type Activity struct {
Store chasm.Field[ActivityStore]
}

// WithToken wraps a request with its deserialized task token.
type WithToken[R any] struct {
Token *tokenspb.Task
Request R
// RequestWithContext wraps a request context specific metadata.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// RequestWithContext wraps a request context specific metadata.
// RequestWithContext wraps a request with context-specific metadata.

type RequestWithContext[R any] struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using @dandavison naming. Open to better ideas.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would create a struct per request and avoid this generic wrapper.

Request R
Token *tokenspb.Task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is specific to certain worker APIs isn't it. So I think it's starting to feel a bit hacky to share the same wrapper across multiple APIs that only need some things.

MetricsHandler metrics.Handler
NamespaceName namespace.Name
BreakdownMetricsByTaskQueue dynamicconfig.BoolPropertyFnWithTaskQueueFilter
Copy link
Contributor

@dandavison dandavison Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it definitely appropriate to pass this; DC can be queried where needed, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to pass this in.

}

func (a *Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState {
Expand Down Expand Up @@ -237,14 +244,14 @@ func (a *Activity) RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx ch
// HandleCompleted updates the activity on activity completion.
func (a *Activity) HandleCompleted(
ctx chasm.MutableContext,
input WithToken[*historyservice.RespondActivityTaskCompletedRequest],
req RequestWithContext[*historyservice.RespondActivityTaskCompletedRequest],
) (*historyservice.RespondActivityTaskCompletedResponse, error) {
// TODO(dan): add test coverage for this validation
if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil {
if err := ValidateActivityTaskToken(ctx, a, req.Token); err != nil {
return nil, err
}

if err := TransitionCompleted.Apply(a, ctx, input.Request); err != nil {
if err := TransitionCompleted.Apply(a, ctx, req); err != nil {
return nil, err
}

Expand All @@ -255,14 +262,14 @@ func (a *Activity) HandleCompleted(
// for retry instead.
func (a *Activity) HandleFailed(
ctx chasm.MutableContext,
input WithToken[*historyservice.RespondActivityTaskFailedRequest],
req RequestWithContext[*historyservice.RespondActivityTaskFailedRequest],
) (*historyservice.RespondActivityTaskFailedResponse, error) {
// TODO(dan): add test coverage for this validation
if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil {
if err := ValidateActivityTaskToken(ctx, a, req.Token); err != nil {
return nil, err
}

failure := input.Request.GetFailedRequest().GetFailure()
failure := req.Request.GetFailedRequest().GetFailure()

shouldRetry, retryInterval, err := a.shouldRetryOnFailure(ctx, failure)
if err != nil {
Expand All @@ -271,8 +278,12 @@ func (a *Activity) HandleFailed(

if shouldRetry {
if err := TransitionRescheduled.Apply(a, ctx, rescheduleEvent{
retryInterval: retryInterval,
failure: failure,
retryInterval: retryInterval,
failure: failure,
handler: req.MetricsHandler,
namespace: req.NamespaceName,
breakdownMetricsByTaskQueue: req.BreakdownMetricsByTaskQueue,
operationTag: metrics.HistoryRespondActivityTaskFailedScope,
}); err != nil {
return nil, err
}
Expand All @@ -281,7 +292,7 @@ func (a *Activity) HandleFailed(
}

// No more retries, transition to failed state
if err := TransitionFailed.Apply(a, ctx, input.Request); err != nil {
if err := TransitionFailed.Apply(a, ctx, req); err != nil {
return nil, err
}

Expand All @@ -291,14 +302,14 @@ func (a *Activity) HandleFailed(
// HandleCanceled updates the activity on activity canceled.
func (a *Activity) HandleCanceled(
ctx chasm.MutableContext,
input WithToken[*historyservice.RespondActivityTaskCanceledRequest],
req RequestWithContext[*historyservice.RespondActivityTaskCanceledRequest],
) (*historyservice.RespondActivityTaskCanceledResponse, error) {
// TODO(dan): add test coverage for this validation
if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil {
if err := ValidateActivityTaskToken(ctx, a, req.Token); err != nil {
return nil, err
}

if err := TransitionCanceled.Apply(a, ctx, input.Request); err != nil {
if err := TransitionCanceled.Apply(a, ctx, req); err != nil {
return nil, err
}

Expand Down Expand Up @@ -509,15 +520,21 @@ func createHeartbeatTimeoutFailure() *failurepb.Failure {
// RecordHeartbeat records a heartbeat for the activity.
func (a *Activity) RecordHeartbeat(
ctx chasm.MutableContext,
input WithToken[*historyservice.RecordActivityTaskHeartbeatRequest],
req RequestWithContext[*historyservice.RecordActivityTaskHeartbeatRequest],
) (*historyservice.RecordActivityTaskHeartbeatResponse, error) {
if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil {
if err := ValidateActivityTaskToken(ctx, a, req.Token); err != nil {
return nil, err
}

details := req.Request.HeartbeatRequest.GetDetails()

a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{
RecordedTime: timestamppb.New(ctx.Now(a)),
Details: input.Request.HeartbeatRequest.GetDetails(),
Details: details,
})

recordPayloadSize(details.Size(), req.MetricsHandler, req.NamespaceName.String(), metrics.HistoryRecordActivityTaskHeartbeatScope)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need these payload size metrics IMHO, we added them to understand the implications of putting payloads in mutable state for standalone activities.


return &historyservice.RecordActivityTaskHeartbeatResponse{
CancelRequested: a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED,
// TODO(dan): ActivityPaused, ActivityReset
Expand Down Expand Up @@ -679,3 +696,103 @@ func (a *Activity) buildPollActivityExecutionResponse(
FrontendResponse: response,
}, nil
}

// recordOnAttemptedMetrics records metrics for attempted activities, including retries and originating from any
// terminal state transitions.
Comment on lines +700 to +701
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite follow the "and originating from any terminal state transitions" bit.

For the first bit, would "records metrics for an activity attempt" be clearer?

func (a *Activity) recordOnAttemptedMetrics(
Copy link
Contributor

@dandavison dandavison Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not urgent but when we get time I'd like us to think about a consistent and sensible mapping of events to method names. I am already finding methods named "handleX" and "recordX" confusingly similar, and now we're adding "recordX" with a second meaning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggesting the verb emit here.

startedTime time.Time,
namespaceName string,
metricsHandler metrics.Handler,
breakdownMetricsByTaskQueue dynamicconfig.BoolPropertyFnWithTaskQueueFilter,
operationTag string,
timeoutType enumspb.TimeoutType,
) {
taskQueueFamily := a.GetTaskQueue().GetName()

handler := metrics.GetPerTaskQueueFamilyScope(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest passing in a fully baked metrics handler into the application logic.

You can build the metrics handler in the API handlers or task executors if needed, it will save you from carrying all of these parameters around.

metricsHandler,
namespaceName,
tqid.UnsafeTaskQueueFamily(namespaceName, taskQueueFamily),
breakdownMetricsByTaskQueue(namespaceName, taskQueueFamily, enumspb.TASK_QUEUE_TYPE_ACTIVITY),
metrics.OperationTag(operationTag),
metrics.ActivityTypeTag(a.GetActivityType().GetName()),
// metrics.VersioningBehaviorTag(versioningBehavior), TODO add when we have versioning
)

if !startedTime.IsZero() {
latency := time.Since(startedTime)
metrics.ActivityStartToCloseLatency.With(handler).Record(latency)
}

switch operationTag {
case metrics.HistoryRespondActivityTaskFailedScope:
metrics.ActivityTaskFail.With(handler).Record(1)
case metrics.TimerActiveTaskActivityTimeoutScope:
timeoutTag := metrics.StringTag("timeout_type", timeoutType.String())
metrics.ActivityTaskTimeout.With(handler).Record(1, timeoutTag)
default:
// Ignore
}
}

// recordOnClosedMetrics records metrics on transition to a terminal state. It always calls recordOnAttemptedMetrics to
// record metrics for the attempted activity as this transition is also an attempt..
func (a *Activity) recordOnClosedMetrics(
startedTime time.Time,
namespaceName string,
metricsHandler metrics.Handler,
breakdownMetricsByTaskQueue dynamicconfig.BoolPropertyFnWithTaskQueueFilter,
operationTag string,
timeoutType enumspb.TimeoutType,
) {
a.recordOnAttemptedMetrics(
startedTime,
namespaceName,
metricsHandler,
breakdownMetricsByTaskQueue,
operationTag,
timeoutType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this not mean that recordOnAttemptedMetrics() gets called twice, since it is called when a retry transitions to SCHEDULED?


taskQueueFamily := a.GetTaskQueue().GetName()

handler := metrics.GetPerTaskQueueFamilyScope(
metricsHandler,
namespaceName,
tqid.UnsafeTaskQueueFamily(namespaceName, taskQueueFamily),
breakdownMetricsByTaskQueue(namespaceName, taskQueueFamily, enumspb.TASK_QUEUE_TYPE_ACTIVITY),
metrics.OperationTag(operationTag),
metrics.ActivityTypeTag(a.GetActivityType().GetName()),
// metrics.VersioningBehaviorTag(versioningBehavior), TODO add when we have versioning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't not emit this tag here, all metrics have to have the same tags no matter where they're omitted from.

)

scheduleToCloseLatency := time.Since(a.GetScheduledTime().AsTime())
metrics.ActivityScheduleToCloseLatency.With(metricsHandler).Record(scheduleToCloseLatency)

switch operationTag {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be much easier to follow this code if you emitted the appropriate metric where it is relevant.

case metrics.HistoryRespondActivityTaskCompletedScope:
metrics.ActivitySuccess.With(handler).Record(1)
case metrics.HistoryRespondActivityTaskFailedScope:
metrics.ActivityFail.With(handler).Record(1)
case metrics.HistoryRespondActivityTaskCanceledScope:
metrics.ActivityCancel.With(handler).Record(1)
case metrics.TimerActiveTaskActivityTimeoutScope:
timeoutTag := metrics.StringTag("timeout_type", timeoutType.String())
metrics.ActivityTimeout.With(handler).Record(1, timeoutTag)
default:
// Ignore
}
}

func recordPayloadSize(
payloadSize int,
handler metrics.Handler,
namespaceName string,
operationTag string,
) {
if payloadSize > 0 {
metrics.ActivityPayloadSize.With(handler).Record(
int64(payloadSize),
metrics.OperationTag(operationTag),
metrics.NamespaceTag(namespaceName))
}
}
Loading
Loading