From 516247d5e94d2cdac460bac012787130ead1de94 Mon Sep 17 00:00:00 2001 From: Fred Tzeng Date: Tue, 9 Dec 2025 15:06:02 -0800 Subject: [PATCH 1/3] Add standalone activity metrics --- chasm/lib/activity/activity.go | 173 +++++++++++++-- chasm/lib/activity/activity_tasks.go | 173 +++++++++++++-- chasm/lib/activity/config.go | 10 +- chasm/lib/activity/handler.go | 42 +++- chasm/lib/activity/library.go | 3 +- chasm/lib/activity/statemachine.go | 71 ++++-- chasm/lib/activity/statemachine_test.go | 204 ++++++++++++++++-- .../api/respondactivitytaskcanceled/api.go | 25 ++- .../api/respondactivitytaskcompleted/api.go | 25 ++- .../api/respondactivitytaskfailed/api.go | 25 ++- 10 files changed, 658 insertions(+), 93 deletions(-) diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index f8cda1a7f1d..b96bd47e769 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -20,6 +20,7 @@ import ( "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/payload" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" @@ -61,6 +62,33 @@ type WithToken[R any] struct { Request R } +// RespondCompletedReqWrapper wraps the RespondActivityTaskCompletedRequest with context-specific data. +type RespondCompletedReqWrapper struct { + Request *historyservice.RespondActivityTaskCompletedRequest + Token *tokenspb.Task + MetricsHandler metrics.Handler +} + +// RespondFailedReqWrapper wraps the RespondActivityTaskFailedRequest with context-specific data. +type RespondFailedReqWrapper struct { + Request *historyservice.RespondActivityTaskFailedRequest + Token *tokenspb.Task + MetricsHandler metrics.Handler +} + +// RespondCancelledReqWrapper wraps the RespondActivityTaskCanceledRequest with context-specific data. +type RespondCancelledReqWrapper struct { + Request *historyservice.RespondActivityTaskCanceledRequest + Token *tokenspb.Task + MetricsHandler metrics.Handler +} + +// RequestCancelActivityReqWrapper wraps the RequestCancelActivityExecutionRequest with context-specific data. +type RequestCancelActivityReqWrapper struct { + request *activitypb.RequestCancelActivityExecutionRequest + metricsHandler metrics.Handler +} + func (a *Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState { switch a.Status { case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED: @@ -137,6 +165,20 @@ func (a *Activity) createAddActivityTaskRequest(ctx chasm.Context, namespaceID s }, nil } +// MetricsHandlerBuilderParams contains parameters for building a metrics handler for activity operations. +type MetricsHandlerBuilderParams struct { + ActivityType string + TaskQueueName string +} + +// GetMetricsHandlerParams retrieves parameters for building a metrics handler for activity operations. +func (a *Activity) GetMetricsHandlerParams(_ chasm.Context, _ any) (MetricsHandlerBuilderParams, error) { + return MetricsHandlerBuilderParams{ + ActivityType: a.GetActivityType().GetName(), + TaskQueueName: a.GetTaskQueue().GetName(), + }, nil +} + // HandleStarted updates the activity on recording activity task started and populates the response. func (a *Activity) HandleStarted(ctx chasm.MutableContext, request *historyservice.RecordActivityTaskStartedRequest) ( *historyservice.RecordActivityTaskStartedResponse, error, @@ -199,14 +241,14 @@ func (a *Activity) RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx ch // HandleCompleted updates the activity on activity completion. func (a *Activity) HandleCompleted( ctx chasm.MutableContext, - input WithToken[*historyservice.RespondActivityTaskCompletedRequest], + req RespondCompletedReqWrapper, ) (*historyservice.RespondActivityTaskCompletedResponse, error) { // TODO(dan): add test coverage for this validation - if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil { + if err := ValidateActivityTaskToken(ctx, a, req.Token); err != nil { return nil, err } - if err := TransitionCompleted.Apply(a, ctx, input.Request); err != nil { + if err := TransitionCompleted.Apply(a, ctx, req); err != nil { return nil, err } @@ -217,14 +259,14 @@ func (a *Activity) HandleCompleted( // for retry instead. func (a *Activity) HandleFailed( ctx chasm.MutableContext, - input WithToken[*historyservice.RespondActivityTaskFailedRequest], + req RespondFailedReqWrapper, ) (*historyservice.RespondActivityTaskFailedResponse, error) { // TODO(dan): add test coverage for this validation - if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil { + if err := ValidateActivityTaskToken(ctx, a, req.Token); err != nil { return nil, err } - failure := input.Request.GetFailedRequest().GetFailure() + failure := req.Request.GetFailedRequest().GetFailure() shouldRetry, retryInterval, err := a.shouldRetryOnFailure(ctx, failure) if err != nil { @@ -239,11 +281,13 @@ func (a *Activity) HandleFailed( return nil, err } + a.emitOnAttemptFailedMetrics(ctx, req.MetricsHandler) + return &historyservice.RespondActivityTaskFailedResponse{}, nil } // No more retries, transition to failed state - if err := TransitionFailed.Apply(a, ctx, input.Request); err != nil { + if err := TransitionFailed.Apply(a, ctx, req); err != nil { return nil, err } @@ -253,14 +297,17 @@ func (a *Activity) HandleFailed( // HandleCanceled updates the activity on activity canceled. func (a *Activity) HandleCanceled( ctx chasm.MutableContext, - input WithToken[*historyservice.RespondActivityTaskCanceledRequest], + req RespondCancelledReqWrapper, ) (*historyservice.RespondActivityTaskCanceledResponse, error) { // TODO(dan): add test coverage for this validation - if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil { + if err := ValidateActivityTaskToken(ctx, a, req.Token); err != nil { return nil, err } - if err := TransitionCanceled.Apply(a, ctx, input.Request.GetCancelRequest().GetDetails()); err != nil { + if err := TransitionCanceled.Apply(a, ctx, cancelEvent{ + details: req.Request.GetCancelRequest().GetDetails(), + handler: req.MetricsHandler, + }); err != nil { return nil, err } @@ -288,10 +335,11 @@ func (a *Activity) getOrCreateLastHeartbeat(ctx chasm.MutableContext) *activityp return heartbeat } -func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, req *activitypb.RequestCancelActivityExecutionRequest) ( +func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, reqWrapper RequestCancelActivityReqWrapper) ( *activitypb.RequestCancelActivityExecutionResponse, error, ) { - newReqID := req.GetFrontendRequest().GetRequestId() + req := reqWrapper.request.GetFrontendRequest() + newReqID := req.GetRequestId() existingReqID := a.GetCancelState().GetRequestId() // If already in cancel requested state, fail if request ID is different, else no-op @@ -307,18 +355,21 @@ func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, req *ac // If in scheduled state, cancel immediately right after marking cancel requested isCancelImmediately := a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED - if err := TransitionCancelRequested.Apply(a, ctx, req.GetFrontendRequest()); err != nil { + if err := TransitionCancelRequested.Apply(a, ctx, req); err != nil { return nil, err } if isCancelImmediately { details := &commonpb.Payloads{ Payloads: []*commonpb.Payload{ - payload.EncodeString(req.GetFrontendRequest().GetReason()), + payload.EncodeString(req.GetReason()), }, } - err := TransitionCanceled.Apply(a, ctx, details) + err := TransitionCanceled.Apply(a, ctx, cancelEvent{ + details: details, + handler: reqWrapper.metricsHandler, + }) if err != nil { return nil, err } @@ -458,15 +509,16 @@ func createHeartbeatTimeoutFailure() *failurepb.Failure { // RecordHeartbeat records a heartbeat for the activity. func (a *Activity) RecordHeartbeat( ctx chasm.MutableContext, - input WithToken[*historyservice.RecordActivityTaskHeartbeatRequest], + req WithToken[*historyservice.RecordActivityTaskHeartbeatRequest], ) (*historyservice.RecordActivityTaskHeartbeatResponse, error) { - err := ValidateActivityTaskToken(ctx, a, input.Token) + err := ValidateActivityTaskToken(ctx, a, req.Token) if err != nil { return nil, err } + a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{ RecordedTime: timestamppb.New(ctx.Now(a)), - Details: input.Request.GetHeartbeatRequest().GetDetails(), + Details: req.Request.HeartbeatRequest.GetDetails(), }) ctx.AddTask( a, @@ -635,3 +687,88 @@ func (a *Activity) StoreOrSelf(ctx chasm.Context) ActivityStore { } return a } + +func (a *Activity) emitOnAttemptTimedOutMetrics(ctx chasm.Context, handler metrics.Handler, timeoutType enumspb.TimeoutType) { + // TODO ignore err for now as it won't be there after rebase on main + attempt := a.LastAttempt.Get(ctx) + startedTime := attempt.GetStartedTime().AsTime() + + latency := time.Since(startedTime) + metrics.ActivityStartToCloseLatency.With(handler).Record(latency) + + timeoutTag := metrics.StringTag("timeout_type", timeoutType.String()) + metrics.ActivityTaskTimeout.With(handler).Record(1, timeoutTag) +} + +func (a *Activity) emitOnAttemptFailedMetrics(ctx chasm.Context, handler metrics.Handler) { + // TODO ignore err for now as it won't be there after rebase on main + attempt := a.LastAttempt.Get(ctx) + startedTime := attempt.GetStartedTime().AsTime() + + latency := time.Since(startedTime) + metrics.ActivityStartToCloseLatency.With(handler).Record(latency) + + metrics.ActivityTaskFail.With(handler).Record(1) +} + +func (a *Activity) emitOnCompletedMetrics(ctx chasm.Context, handler metrics.Handler) { + // TODO ignore err for now as it won't be there after rebase on main + attempt := a.LastAttempt.Get(ctx) + startedTime := attempt.GetStartedTime().AsTime() + + startToCloseLatency := time.Since(startedTime) + metrics.ActivityStartToCloseLatency.With(handler).Record(startToCloseLatency) + + scheduleToCloseLatency := time.Since(a.GetScheduleTime().AsTime()) + metrics.ActivityScheduleToCloseLatency.With(handler).Record(scheduleToCloseLatency) + + metrics.ActivitySuccess.With(handler).Record(1) +} + +func (a *Activity) emitOnFailedMetrics(ctx chasm.Context, handler metrics.Handler) { + // TODO ignore err for now as it won't be there after rebase on main + attempt := a.LastAttempt.Get(ctx) + startedTime := attempt.GetStartedTime().AsTime() + + startToCloseLatency := time.Since(startedTime) + metrics.ActivityStartToCloseLatency.With(handler).Record(startToCloseLatency) + + scheduleToCloseLatency := time.Since(a.GetScheduleTime().AsTime()) + metrics.ActivityScheduleToCloseLatency.With(handler).Record(scheduleToCloseLatency) + + metrics.ActivityTaskFail.With(handler).Record(1) + metrics.ActivityFail.With(handler).Record(1) +} + +func (a *Activity) emitOnCanceledMetrics(ctx chasm.Context, handler metrics.Handler) { + // TODO ignore err for now as it won't be there after rebase on main + attempt := a.LastAttempt.Get(ctx) + startedTime := attempt.GetStartedTime().AsTime() + + // Cancel can happen before start, so guard against zero time + if !startedTime.IsZero() { + startToCloseLatency := time.Since(startedTime) + metrics.ActivityStartToCloseLatency.With(handler).Record(startToCloseLatency) + } + + scheduleToCloseLatency := time.Since(a.GetScheduleTime().AsTime()) + metrics.ActivityScheduleToCloseLatency.With(handler).Record(scheduleToCloseLatency) + + metrics.ActivityCancel.With(handler).Record(1) +} + +func (a *Activity) emitOnTimedOutMetrics(ctx chasm.Context, handler metrics.Handler, timeoutType enumspb.TimeoutType) { + // TODO ignore err for now as it won't be there after rebase on main + attempt := a.LastAttempt.Get(ctx) + startedTime := attempt.GetStartedTime().AsTime() + + startToCloseLatency := time.Since(startedTime) + metrics.ActivityStartToCloseLatency.With(handler).Record(startToCloseLatency) + + scheduleToCloseLatency := time.Since(a.GetScheduleTime().AsTime()) + metrics.ActivityScheduleToCloseLatency.With(handler).Record(scheduleToCloseLatency) + + timeoutTag := metrics.StringTag("timeout_type", timeoutType.String()) + metrics.ActivityTaskTimeout.With(handler).Record(1, timeoutTag) + metrics.ActivityTimeout.With(handler).Record(1, timeoutTag) +} diff --git a/chasm/lib/activity/activity_tasks.go b/chasm/lib/activity/activity_tasks.go index 8d585ff9eb2..e0340195a67 100644 --- a/chasm/lib/activity/activity_tasks.go +++ b/chasm/lib/activity/activity_tasks.go @@ -6,7 +6,10 @@ import ( enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/resource" + "go.temporal.io/server/common/tqid" "go.temporal.io/server/common/util" "go.uber.org/fx" ) @@ -59,10 +62,22 @@ func (e *activityDispatchTaskExecutor) Execute( return err } -type scheduleToStartTimeoutTaskExecutor struct{} +type timeoutTaskExecutorOptions struct { + fx.In + + Config *Config + MetricsHandler metrics.Handler + NamespaceRegistry namespace.Registry +} -func newScheduleToStartTimeoutTaskExecutor() *scheduleToStartTimeoutTaskExecutor { - return &scheduleToStartTimeoutTaskExecutor{} +type scheduleToStartTimeoutTaskExecutor struct { + opts timeoutTaskExecutorOptions +} + +func newScheduleToStartTimeoutTaskExecutor(opts timeoutTaskExecutorOptions) *scheduleToStartTimeoutTaskExecutor { + return &scheduleToStartTimeoutTaskExecutor{ + opts, + } } func (e *scheduleToStartTimeoutTaskExecutor) Validate( @@ -81,13 +96,41 @@ func (e *scheduleToStartTimeoutTaskExecutor) Execute( _ chasm.TaskAttributes, _ *activitypb.ScheduleToStartTimeoutTask, ) error { - return TransitionTimedOut.Apply(activity, ctx, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START) + breakdownMetricsByTaskQueue := e.opts.Config.BreakdownMetricsByTaskQueue + nsID := namespace.ID(ctx.ExecutionKey().NamespaceID) + namespaceName, err := e.opts.NamespaceRegistry.GetNamespaceName(nsID) + if err != nil { + return err + } + + taskQueueFamily := activity.GetTaskQueue().GetName() + + metricsHandler := metrics.GetPerTaskQueueFamilyScope( + e.opts.MetricsHandler, + namespaceName.String(), + tqid.UnsafeTaskQueueFamily(namespaceName.String(), taskQueueFamily), + breakdownMetricsByTaskQueue(namespaceName.String(), taskQueueFamily, enumspb.TASK_QUEUE_TYPE_ACTIVITY), + metrics.OperationTag(metrics.TimerActiveTaskActivityTimeoutScope), + metrics.ActivityTypeTag(activity.GetActivityType().GetName()), + metrics.VersioningBehaviorTag(enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED), + ) + + event := timeoutEvent{ + timeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, + metricsHandler: metricsHandler, + } + + return TransitionTimedOut.Apply(activity, ctx, event) } -type scheduleToCloseTimeoutTaskExecutor struct{} +type scheduleToCloseTimeoutTaskExecutor struct { + opts timeoutTaskExecutorOptions +} -func newScheduleToCloseTimeoutTaskExecutor() *scheduleToCloseTimeoutTaskExecutor { - return &scheduleToCloseTimeoutTaskExecutor{} +func newScheduleToCloseTimeoutTaskExecutor(opts timeoutTaskExecutorOptions) *scheduleToCloseTimeoutTaskExecutor { + return &scheduleToCloseTimeoutTaskExecutor{ + opts, + } } func (e *scheduleToCloseTimeoutTaskExecutor) Validate( @@ -105,13 +148,41 @@ func (e *scheduleToCloseTimeoutTaskExecutor) Execute( _ chasm.TaskAttributes, _ *activitypb.ScheduleToCloseTimeoutTask, ) error { - return TransitionTimedOut.Apply(activity, ctx, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE) + breakdownMetricsByTaskQueue := e.opts.Config.BreakdownMetricsByTaskQueue + nsID := namespace.ID(ctx.ExecutionKey().NamespaceID) + namespaceName, err := e.opts.NamespaceRegistry.GetNamespaceName(nsID) + if err != nil { + return err + } + + taskQueueFamily := activity.GetTaskQueue().GetName() + + metricsHandler := metrics.GetPerTaskQueueFamilyScope( + e.opts.MetricsHandler, + namespaceName.String(), + tqid.UnsafeTaskQueueFamily(namespaceName.String(), taskQueueFamily), + breakdownMetricsByTaskQueue(namespaceName.String(), taskQueueFamily, enumspb.TASK_QUEUE_TYPE_ACTIVITY), + metrics.OperationTag(metrics.TimerActiveTaskActivityTimeoutScope), + metrics.ActivityTypeTag(activity.GetActivityType().GetName()), + metrics.VersioningBehaviorTag(enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED), + ) + + event := timeoutEvent{ + timeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, + metricsHandler: metricsHandler, + } + + return TransitionTimedOut.Apply(activity, ctx, event) } -type startToCloseTimeoutTaskExecutor struct{} +type startToCloseTimeoutTaskExecutor struct { + opts timeoutTaskExecutorOptions +} -func newStartToCloseTimeoutTaskExecutor() *startToCloseTimeoutTaskExecutor { - return &startToCloseTimeoutTaskExecutor{} +func newStartToCloseTimeoutTaskExecutor(opts timeoutTaskExecutorOptions) *startToCloseTimeoutTaskExecutor { + return &startToCloseTimeoutTaskExecutor{ + opts, + } } func (e *startToCloseTimeoutTaskExecutor) Validate( @@ -136,23 +207,57 @@ func (e *startToCloseTimeoutTaskExecutor) Execute( return err } + breakdownMetricsByTaskQueue := e.opts.Config.BreakdownMetricsByTaskQueue + nsID := namespace.ID(ctx.ExecutionKey().NamespaceID) + namespaceName, err := e.opts.NamespaceRegistry.GetNamespaceName(nsID) + if err != nil { + return err + } + + taskQueueFamily := activity.GetTaskQueue().GetName() + + metricsHandler := metrics.GetPerTaskQueueFamilyScope( + e.opts.MetricsHandler, + namespaceName.String(), + tqid.UnsafeTaskQueueFamily(namespaceName.String(), taskQueueFamily), + breakdownMetricsByTaskQueue(namespaceName.String(), taskQueueFamily, enumspb.TASK_QUEUE_TYPE_ACTIVITY), + metrics.OperationTag(metrics.TimerActiveTaskActivityTimeoutScope), + metrics.ActivityTypeTag(activity.GetActivityType().GetName()), + metrics.VersioningBehaviorTag(enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED), + ) + // Retry task if we have remaining attempts and time. A retry involves transitioning the activity back to scheduled state. if shouldRetry { - return TransitionRescheduled.Apply(activity, ctx, rescheduleEvent{ + err = TransitionRescheduled.Apply(activity, ctx, rescheduleEvent{ retryInterval: retryInterval, failure: createStartToCloseTimeoutFailure(), + timeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, }) + if err != nil { + return err + } + + activity.emitOnAttemptTimedOutMetrics(ctx, metricsHandler, enumspb.TIMEOUT_TYPE_START_TO_CLOSE) + + return nil } // Reached maximum attempts, timeout the activity - return TransitionTimedOut.Apply(activity, ctx, enumspb.TIMEOUT_TYPE_START_TO_CLOSE) + return TransitionTimedOut.Apply(activity, ctx, timeoutEvent{ + timeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, + metricsHandler: metricsHandler, + }) } // HeartbeatTimeoutTask is a pure task that enforces heartbeat timeouts. -type heartbeatTimeoutTaskExecutor struct{} +type heartbeatTimeoutTaskExecutor struct { + opts timeoutTaskExecutorOptions +} -func newHeartbeatTimeoutTaskExecutor() *heartbeatTimeoutTaskExecutor { - return &heartbeatTimeoutTaskExecutor{} +func newHeartbeatTimeoutTaskExecutor(opts timeoutTaskExecutorOptions) *heartbeatTimeoutTaskExecutor { + return &heartbeatTimeoutTaskExecutor{ + opts, + } } // Validate validates a HeartbeatTimeoutTask. @@ -208,11 +313,43 @@ func (e *heartbeatTimeoutTaskExecutor) Execute( if err != nil { return err } + + breakdownMetricsByTaskQueue := e.opts.Config.BreakdownMetricsByTaskQueue + nsID := namespace.ID(ctx.ExecutionKey().NamespaceID) + namespaceName, err := e.opts.NamespaceRegistry.GetNamespaceName(nsID) + if err != nil { + return err + } + + taskQueueFamily := activity.GetTaskQueue().GetName() + + metricsHandler := metrics.GetPerTaskQueueFamilyScope( + e.opts.MetricsHandler, + namespaceName.String(), + tqid.UnsafeTaskQueueFamily(namespaceName.String(), taskQueueFamily), + breakdownMetricsByTaskQueue(namespaceName.String(), taskQueueFamily, enumspb.TASK_QUEUE_TYPE_ACTIVITY), + metrics.OperationTag(metrics.TimerActiveTaskActivityTimeoutScope), + metrics.ActivityTypeTag(activity.GetActivityType().GetName()), + metrics.VersioningBehaviorTag(enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED), + ) + if shouldRetry { - return TransitionRescheduled.Apply(activity, ctx, rescheduleEvent{ + err = TransitionRescheduled.Apply(activity, ctx, rescheduleEvent{ retryInterval: retryInterval, failure: createHeartbeatTimeoutFailure(), + timeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT, }) + if err != nil { + return err + } + + activity.emitOnAttemptTimedOutMetrics(ctx, metricsHandler, enumspb.TIMEOUT_TYPE_HEARTBEAT) + + return nil } - return TransitionTimedOut.Apply(activity, ctx, enumspb.TIMEOUT_TYPE_HEARTBEAT) + + return TransitionTimedOut.Apply(activity, ctx, timeoutEvent{ + timeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT, + metricsHandler: metricsHandler, + }) } diff --git a/chasm/lib/activity/config.go b/chasm/lib/activity/config.go index cc31eec4983..d7784bd4497 100644 --- a/chasm/lib/activity/config.go +++ b/chasm/lib/activity/config.go @@ -23,13 +23,15 @@ var ( ) type Config struct { - LongPollTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter - LongPollBuffer dynamicconfig.DurationPropertyFnWithNamespaceFilter + BreakdownMetricsByTaskQueue dynamicconfig.TypedPropertyFnWithTaskQueueFilter[bool] + LongPollTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter + LongPollBuffer dynamicconfig.DurationPropertyFnWithNamespaceFilter } func ConfigProvider(dc *dynamicconfig.Collection) *Config { return &Config{ - LongPollTimeout: LongPollTimeout.Get(dc), - LongPollBuffer: LongPollBuffer.Get(dc), + LongPollTimeout: LongPollTimeout.Get(dc), + LongPollBuffer: LongPollBuffer.Get(dc), + BreakdownMetricsByTaskQueue: dynamicconfig.MetricsBreakdownByTaskQueue.Get(dc), } } diff --git a/chasm/lib/activity/handler.go b/chasm/lib/activity/handler.go index 73c0c999e11..055868a8e0a 100644 --- a/chasm/lib/activity/handler.go +++ b/chasm/lib/activity/handler.go @@ -11,6 +11,9 @@ import ( "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" "go.temporal.io/server/common/contextutil" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/tqid" ) var ( @@ -28,12 +31,16 @@ var ( type handler struct { activitypb.UnimplementedActivityServiceServer - config *Config + config *Config + metricsHandler metrics.Handler + namespaceRegistry namespace.Registry } -func newHandler(config *Config) *handler { +func newHandler(config *Config, metricsHandler metrics.Handler, namespaceRegistry namespace.Registry) *handler { return &handler{ - config: config, + config: config, + metricsHandler: metricsHandler, + namespaceRegistry: namespaceRegistry, } } @@ -76,7 +83,6 @@ func (h *handler) StartActivityExecution(ctx context.Context, req *activitypb.St chasm.WithRequestID(req.GetFrontendRequest().GetRequestId()), chasm.WithBusinessIDPolicy(reusePolicy, conflictPolicy), ) - if err != nil { return nil, err } @@ -253,11 +259,37 @@ func (h *handler) RequestCancelActivityExecution( RunID: frontendReq.GetRunId(), }) + breakdownMetricsByTaskQueue := h.config.BreakdownMetricsByTaskQueue + namespaceName, err := h.namespaceRegistry.GetNamespaceName(namespace.ID(req.GetNamespaceId())) + if err != nil { + return nil, err + } + + metricsHandlerBuilderParams, err := chasm.ReadComponent(ctx, ref, (*Activity).GetMetricsHandlerParams, nil) + if err != nil { + return nil, err + } + + taskQueueFamily := metricsHandlerBuilderParams.TaskQueueName + + metricsHandler := metrics.GetPerTaskQueueFamilyScope( + h.metricsHandler, + namespaceName.String(), + tqid.UnsafeTaskQueueFamily(namespaceName.String(), taskQueueFamily), + breakdownMetricsByTaskQueue(namespaceName.String(), taskQueueFamily, enumspb.TASK_QUEUE_TYPE_ACTIVITY), + metrics.OperationTag(metrics.TimerActiveTaskActivityTimeoutScope), + metrics.ActivityTypeTag(metricsHandlerBuilderParams.ActivityType), + metrics.VersioningBehaviorTag(enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED), + ) + response, _, err = chasm.UpdateComponent( ctx, ref, (*Activity).handleCancellationRequested, - req, + RequestCancelActivityReqWrapper{ + request: req, + metricsHandler: metricsHandler, + }, ) if err != nil { return nil, err diff --git a/chasm/lib/activity/library.go b/chasm/lib/activity/library.go index 280622c55fa..f8bff5bc48a 100644 --- a/chasm/lib/activity/library.go +++ b/chasm/lib/activity/library.go @@ -59,11 +59,12 @@ func (l *library) RegisterServices(server *grpc.Server) { func (l *library) Tasks() []*chasm.RegistrableTask { return []*chasm.RegistrableTask{ - chasm.NewRegistrableSideEffectTask[*Activity, *activitypb.ActivityDispatchTask]( + chasm.NewRegistrableSideEffectTask( "dispatch", l.activityDispatchTaskExecutor, l.activityDispatchTaskExecutor, ), + // TODO(dan): why are the task names "FooTimer" but "FooTimeoutTask" in the struct names? chasm.NewRegistrablePureTask( "scheduleToStartTimer", l.scheduleToStartTimeoutTaskExecutor, diff --git a/chasm/lib/activity/statemachine.go b/chasm/lib/activity/statemachine.go index 7766594765d..c3967b3c24c 100644 --- a/chasm/lib/activity/statemachine.go +++ b/chasm/lib/activity/statemachine.go @@ -4,13 +4,13 @@ import ( "fmt" "time" - commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" failurepb "go.temporal.io/api/failure/v1" "go.temporal.io/api/workflowservice/v1" - "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" + "go.temporal.io/server/common/metrics" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -75,6 +75,7 @@ var TransitionScheduled = chasm.NewTransition( type rescheduleEvent struct { retryInterval time.Duration failure *failurepb.Failure + timeoutType enumspb.TimeoutType } // TransitionRescheduled affects a transition to Scheduled from Started, which happens on retries. The event to pass in @@ -158,17 +159,22 @@ var TransitionCompleted = chasm.NewTransition( activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED, - func(a *Activity, ctx chasm.MutableContext, request *historyservice.RespondActivityTaskCompletedRequest) error { + func(a *Activity, ctx chasm.MutableContext, reqWrapper RespondCompletedReqWrapper) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { + req := reqWrapper.Request.GetCompleteRequest() + attempt := a.LastAttempt.Get(ctx) attempt.CompleteTime = timestamppb.New(ctx.Now(a)) - attempt.LastWorkerIdentity = request.GetCompleteRequest().GetIdentity() + attempt.LastWorkerIdentity = req.GetIdentity() outcome := a.Outcome.Get(ctx) outcome.Variant = &activitypb.ActivityOutcome_Successful_{ Successful: &activitypb.ActivityOutcome_Successful{ - Output: request.GetCompleteRequest().GetResult(), + Output: req.GetResult(), }, } + + a.emitOnCompletedMetrics(ctx, reqWrapper.MetricsHandler) + return nil }) }, @@ -181,16 +187,25 @@ var TransitionFailed = chasm.NewTransition( activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_FAILED, - func(a *Activity, ctx chasm.MutableContext, req *historyservice.RespondActivityTaskFailedRequest) error { + func(a *Activity, ctx chasm.MutableContext, reqWrapper RespondFailedReqWrapper) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { - if details := req.GetFailedRequest().GetLastHeartbeatDetails(); details != nil { + req := reqWrapper.Request.GetFailedRequest() + + if details := req.GetLastHeartbeatDetails(); details != nil { heartbeat := a.getOrCreateLastHeartbeat(ctx) heartbeat.Details = details heartbeat.RecordedTime = timestamppb.New(ctx.Now(a)) } attempt := a.LastAttempt.Get(ctx) - attempt.LastWorkerIdentity = req.GetFailedRequest().GetIdentity() - return a.recordFailedAttempt(ctx, 0, req.GetFailedRequest().GetFailure(), true) + attempt.LastWorkerIdentity = req.GetIdentity() + + if err := a.recordFailedAttempt(ctx, 0, req.GetFailure(), true); err != nil { + return err + } + + a.emitOnFailedMetrics(ctx, reqWrapper.MetricsHandler) + + return nil }) }, ) @@ -241,20 +256,25 @@ var TransitionCancelRequested = chasm.NewTransition( }, ) +type cancelEvent struct { + details *common.Payloads + handler metrics.Handler +} + // TransitionCanceled affects a transition to Canceled status var TransitionCanceled = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED, - func(a *Activity, ctx chasm.MutableContext, details *commonpb.Payloads) error { + func(a *Activity, ctx chasm.MutableContext, event cancelEvent) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { outcome := a.Outcome.Get(ctx) failure := &failurepb.Failure{ Message: "Activity canceled", FailureInfo: &failurepb.Failure_CanceledFailureInfo{ CanceledFailureInfo: &failurepb.CanceledFailureInfo{ - Details: details, + Details: event.details, }, }, } @@ -263,12 +283,21 @@ var TransitionCanceled = chasm.NewTransition( Failure: failure, }, } + + a.emitOnCanceledMetrics(ctx, event.handler) + return nil }) }, ) // TransitionTimedOut affects a transition to TimedOut status +type timeoutEvent struct { + metricsHandler metrics.Handler + timeoutType enumspb.TimeoutType +} + +// TransitionTimedOut transitions to TimedOut status var TransitionTimedOut = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, @@ -276,21 +305,31 @@ var TransitionTimedOut = chasm.NewTransition( activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, - func(a *Activity, ctx chasm.MutableContext, timeoutType enumspb.TimeoutType) error { + func(a *Activity, ctx chasm.MutableContext, event timeoutEvent) error { + timeoutType := event.timeoutType + return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { + var err error switch timeoutType { case enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE: - return a.recordScheduleToStartOrCloseTimeoutFailure(ctx, timeoutType) + err = a.recordScheduleToStartOrCloseTimeoutFailure(ctx, timeoutType) case enumspb.TIMEOUT_TYPE_START_TO_CLOSE: failure := createStartToCloseTimeoutFailure() - return a.recordFailedAttempt(ctx, 0, failure, true) + err = a.recordFailedAttempt(ctx, 0, failure, true) case enumspb.TIMEOUT_TYPE_HEARTBEAT: failure := createHeartbeatTimeoutFailure() - return a.recordFailedAttempt(ctx, 0, failure, true) + err = a.recordFailedAttempt(ctx, 0, failure, true) default: - return fmt.Errorf("unhandled activity timeout: %v", timeoutType) + err = fmt.Errorf("unhandled activity timeout: %v", timeoutType) + } + if err != nil { + return err } + + a.emitOnTimedOutMetrics(ctx, event.metricsHandler, timeoutType) + + return nil }) }, ) diff --git a/chasm/lib/activity/statemachine_test.go b/chasm/lib/activity/statemachine_test.go index 2730f120040..99c098e22f9 100644 --- a/chasm/lib/activity/statemachine_test.go +++ b/chasm/lib/activity/statemachine_test.go @@ -8,13 +8,17 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" failurepb "go.temporal.io/api/failure/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/testing/protorequire" + "go.uber.org/mock/gomock" "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" ) var ( @@ -80,17 +84,23 @@ func TestTransitionScheduled(t *testing.T) { } attemptState := &activitypb.ActivityAttemptState{Count: tc.startingAttemptCount} outcome := &activitypb.ActivityOutcome{} + input := payloads.EncodeString("test-input") activity := &Activity{ ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, RetryPolicy: defaultRetryPolicy, ScheduleToCloseTimeout: durationpb.New(tc.scheduleToCloseTimeout), ScheduleToStartTimeout: durationpb.New(tc.scheduleToStartTimeout), StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), Status: activitypb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, }, LastAttempt: chasm.NewDataField(ctx, attemptState), Outcome: chasm.NewDataField(ctx, outcome), + RequestData: chasm.NewDataField(ctx, &activitypb.ActivityRequestData{ + Input: input, + }), } err := TransitionScheduled.Apply(activity, ctx, nil) @@ -130,6 +140,9 @@ func TestTransitionRescheduled(t *testing.T) { expectedRetryInterval time.Duration retryPolicy *commonpb.RetryPolicy scheduleToStartTimeout time.Duration + operationTag string + counterMetric string + timeoutType enumspb.TimeoutType }{ { name: "second attempt - timeout recorded", @@ -141,6 +154,9 @@ func TestTransitionRescheduled(t *testing.T) { expectedRetryInterval: 2 * time.Second, retryPolicy: defaultRetryPolicy, scheduleToStartTimeout: defaultScheduleToStartTimeout, + operationTag: metrics.TimerActiveTaskActivityTimeoutScope, + counterMetric: metrics.ActivityTaskTimeout.Name(), + timeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, }, { name: "third attempt - timeout recorded", @@ -152,6 +168,9 @@ func TestTransitionRescheduled(t *testing.T) { expectedRetryInterval: 4 * time.Second, retryPolicy: defaultRetryPolicy, scheduleToStartTimeout: defaultScheduleToStartTimeout, + operationTag: metrics.TimerActiveTaskActivityTimeoutScope, + counterMetric: metrics.ActivityTaskTimeout.Name(), + timeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, }, { name: "no schedule to start timeout", @@ -162,6 +181,37 @@ func TestTransitionRescheduled(t *testing.T) { expectedRetryInterval: 2 * time.Second, retryPolicy: defaultRetryPolicy, scheduleToStartTimeout: 0, + operationTag: metrics.TimerActiveTaskActivityTimeoutScope, + counterMetric: metrics.ActivityTaskTimeout.Name(), + timeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, + }, + { + name: "heartbeat timeout - timeout recorded", + startingAttemptCount: 1, + expectedTasks: []chasm.MockTask{ + {Payload: &activitypb.ScheduleToStartTimeoutTask{}}, + {Payload: &activitypb.ActivityDispatchTask{}}, + }, + expectedRetryInterval: 2 * time.Second, + retryPolicy: defaultRetryPolicy, + scheduleToStartTimeout: defaultScheduleToStartTimeout, + operationTag: metrics.TimerActiveTaskActivityTimeoutScope, + counterMetric: metrics.ActivityTaskTimeout.Name(), + timeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT, + }, + + { + name: "reschedule from failure", + startingAttemptCount: 1, + expectedTasks: []chasm.MockTask{ + {Payload: &activitypb.ScheduleToStartTimeoutTask{}}, + {Payload: &activitypb.ActivityDispatchTask{}}, + }, + expectedRetryInterval: 2 * time.Second, + retryPolicy: defaultRetryPolicy, + scheduleToStartTimeout: defaultScheduleToStartTimeout, + operationTag: metrics.HistoryRespondActivityTaskFailedScope, + counterMetric: metrics.ActivityTaskFail.Name(), }, } @@ -174,20 +224,25 @@ func TestTransitionRescheduled(t *testing.T) { activity := &Activity{ ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, RetryPolicy: defaultRetryPolicy, ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), ScheduleToStartTimeout: durationpb.New(tc.scheduleToStartTimeout), StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), Status: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, }, LastAttempt: chasm.NewDataField(ctx, attemptState), Outcome: chasm.NewDataField(ctx, outcome), } - err := TransitionRescheduled.Apply(activity, ctx, rescheduleEvent{ + event := rescheduleEvent{ retryInterval: tc.expectedRetryInterval, failure: createStartToCloseTimeoutFailure(), - }) + timeoutType: tc.timeoutType, + } + + err := TransitionRescheduled.Apply(activity, ctx, event) require.NoError(t, err) require.Equal(t, activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, activity.Status) require.Equal(t, tc.startingAttemptCount+1, attemptState.Count) @@ -225,8 +280,10 @@ func TestTransitionRescheduled(t *testing.T) { func TestTransitionStarted(t *testing.T) { ctx := &chasm.MockMutableContext{} - ctx.HandleNow = func(chasm.Component) time.Time { return defaultTime } - attemptState := &activitypb.ActivityAttemptState{Count: 1} + attemptState := &activitypb.ActivityAttemptState{ + Count: 1, + StartedTime: timestamppb.New(defaultTime), + } outcome := &activitypb.ActivityOutcome{} activity := &Activity{ @@ -284,6 +341,12 @@ func TestTransitionTimedout(t *testing.T) { timeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, attemptCount: 5, }, + { + name: "heartbeat timeout", + startStatus: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + timeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT, + attemptCount: 2, + }, } for _, tc := range testCases { @@ -294,17 +357,45 @@ func TestTransitionTimedout(t *testing.T) { activity := &Activity{ ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, RetryPolicy: defaultRetryPolicy, ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), ScheduleToStartTimeout: durationpb.New(defaultScheduleToStartTimeout), StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), Status: tc.startStatus, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, }, LastAttempt: chasm.NewDataField(ctx, attemptState), Outcome: chasm.NewDataField(ctx, outcome), } - err := TransitionTimedOut.Apply(activity, ctx, tc.timeoutType) + controller := gomock.NewController(t) + metricsHandler := metrics.NewMockHandler(controller) + + timerStartToCloseLatency := metrics.NewMockTimerIface(controller) + timerStartToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityStartToCloseLatency.Name()).Return(timerStartToCloseLatency) + + timerScheduleToCloseLatency := metrics.NewMockTimerIface(controller) + timerScheduleToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityScheduleToCloseLatency.Name()).Return(timerScheduleToCloseLatency) + + timeoutTag := metrics.StringTag("timeout_type", tc.timeoutType.String()) + + counterTimeout := metrics.NewMockCounterIface(controller) + counterTimeout.EXPECT().Record(int64(1), timeoutTag).Times(1) + metricsHandler.EXPECT().Counter(metrics.ActivityTimeout.Name()).Return(counterTimeout) + + counterTaskTimeout := metrics.NewMockCounterIface(controller) + counterTaskTimeout.EXPECT().Record(int64(1), timeoutTag).Times(1) + metricsHandler.EXPECT().Counter(metrics.ActivityTaskTimeout.Name()).Return(counterTaskTimeout) + + event := timeoutEvent{ + timeoutType: tc.timeoutType, + metricsHandler: metricsHandler, + } + + err := TransitionTimedOut.Apply(activity, ctx, event) require.NoError(t, err) require.Equal(t, activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, activity.Status) require.Equal(t, tc.attemptCount, attemptState.Count) @@ -317,17 +408,15 @@ func TestTransitionTimedout(t *testing.T) { require.Nil(t, attemptState.GetCompleteTime()) require.NotNil(t, outcome.GetFailed().GetFailure()) // do something - case enumspb.TIMEOUT_TYPE_START_TO_CLOSE: + case enumspb.TIMEOUT_TYPE_START_TO_CLOSE, + enumspb.TIMEOUT_TYPE_HEARTBEAT: // Timeout failure is recorded in both attempt state and outcome. TransitionTimedOut should only be called when there // are no more retries. Retries go through TransitionRescheduled. require.NotNil(t, attemptState.GetLastFailureDetails().GetFailure()) require.NotNil(t, attemptState.GetLastFailureDetails().GetTime()) require.NotNil(t, attemptState.GetCompleteTime()) require.Nil(t, attemptState.GetCurrentRetryInterval()) - - failure, ok := outcome.GetVariant().(*activitypb.ActivityOutcome_Failed_) - require.True(t, ok, "expected variant to be of type Failed") - require.Nil(t, failure.Failed, "expected outcome.Failed to be nil since failure is recorded in attempt state") + require.Nil(t, outcome.GetVariant(), "expected outcome variant to be nil since failure is recorded in attempt state") default: t.Fatalf("unexpected timeout type: %v", tc.timeoutType) @@ -346,11 +435,13 @@ func TestTransitionCompleted(t *testing.T) { activity := &Activity{ ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, RetryPolicy: defaultRetryPolicy, ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), ScheduleToStartTimeout: durationpb.New(defaultScheduleToStartTimeout), StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), Status: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, }, LastAttempt: chasm.NewDataField(ctx, attemptState), Outcome: chasm.NewDataField(ctx, outcome), @@ -358,12 +449,32 @@ func TestTransitionCompleted(t *testing.T) { payload := payloads.EncodeString("Done") - err := TransitionCompleted.Apply(activity, ctx, &historyservice.RespondActivityTaskCompletedRequest{ - CompleteRequest: &workflowservice.RespondActivityTaskCompletedRequest{ - Result: payload, - Identity: "worker", + controller := gomock.NewController(t) + metricsHandler := metrics.NewMockHandler(controller) + + timerStartToCloseLatency := metrics.NewMockTimerIface(controller) + timerStartToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityStartToCloseLatency.Name()).Return(timerStartToCloseLatency) + + timerScheduleToCloseLatency := metrics.NewMockTimerIface(controller) + timerScheduleToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityScheduleToCloseLatency.Name()).Return(timerScheduleToCloseLatency) + + counterSuccess := metrics.NewMockCounterIface(controller) + counterSuccess.EXPECT().Record(int64(1)).Times(1) + metricsHandler.EXPECT().Counter(metrics.ActivitySuccess.Name()).Return(counterSuccess) + + req := RespondCompletedReqWrapper{ + Request: &historyservice.RespondActivityTaskCompletedRequest{ + CompleteRequest: &workflowservice.RespondActivityTaskCompletedRequest{ + Result: payload, + Identity: "worker", + }, }, - }) + MetricsHandler: metricsHandler, + } + + err := TransitionCompleted.Apply(activity, ctx, req) require.NoError(t, err) require.Equal(t, activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED, activity.Status) require.EqualValues(t, 1, attemptState.Count) @@ -381,11 +492,13 @@ func TestTransitionFailed(t *testing.T) { activity := &Activity{ ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, RetryPolicy: defaultRetryPolicy, ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), ScheduleToStartTimeout: durationpb.New(defaultScheduleToStartTimeout), StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), Status: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, }, LastAttempt: chasm.NewDataField(ctx, attemptState), LastHeartbeat: chasm.NewDataField(ctx, heartbeatState), @@ -401,13 +514,38 @@ func TestTransitionFailed(t *testing.T) { }}, } - err := TransitionFailed.Apply(activity, ctx, &historyservice.RespondActivityTaskFailedRequest{ - FailedRequest: &workflowservice.RespondActivityTaskFailedRequest{ - Failure: failure, - LastHeartbeatDetails: heartbeatDetails, - Identity: "worker", + controller := gomock.NewController(t) + metricsHandler := metrics.NewMockHandler(controller) + + timerStartToCloseLatency := metrics.NewMockTimerIface(controller) + timerStartToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityStartToCloseLatency.Name()).Return(timerStartToCloseLatency) + + timerScheduleToCloseLatency := metrics.NewMockTimerIface(controller) + timerScheduleToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityScheduleToCloseLatency.Name()).Return(timerScheduleToCloseLatency) + + counterFail := metrics.NewMockCounterIface(controller) + counterFail.EXPECT().Record(int64(1)).Times(1) + metricsHandler.EXPECT().Counter(metrics.ActivityFail.Name()).Return(counterFail) + + counterTaskFail := metrics.NewMockCounterIface(controller) + counterTaskFail.EXPECT().Record(int64(1)).Times(1) + metricsHandler.EXPECT().Counter(metrics.ActivityTaskFail.Name()).Return(counterTaskFail) + + req := RespondFailedReqWrapper{ + Request: &historyservice.RespondActivityTaskFailedRequest{ + FailedRequest: &workflowservice.RespondActivityTaskFailedRequest{ + Failure: failure, + LastHeartbeatDetails: heartbeatDetails, + Identity: "worker", + }, }, - }) + MetricsHandler: metricsHandler, + } + + err := TransitionFailed.Apply(activity, ctx, req) + require.NoError(t, err) require.Equal(t, activitypb.ACTIVITY_EXECUTION_STATUS_FAILED, activity.Status) require.EqualValues(t, 1, attemptState.Count) @@ -499,17 +637,39 @@ func TestTransitionCanceled(t *testing.T) { activity := &Activity{ ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, RetryPolicy: defaultRetryPolicy, ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), ScheduleToStartTimeout: durationpb.New(defaultScheduleToStartTimeout), StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), Status: activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, }, LastAttempt: chasm.NewDataField(ctx, attemptState), Outcome: chasm.NewDataField(ctx, outcome), } - err := TransitionCanceled.Apply(activity, ctx, payloads.EncodeString("Details")) + controller := gomock.NewController(t) + metricsHandler := metrics.NewMockHandler(controller) + + timerStartToCloseLatency := metrics.NewMockTimerIface(controller) + timerStartToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityStartToCloseLatency.Name()).Return(timerStartToCloseLatency) + + timerScheduleToCloseLatency := metrics.NewMockTimerIface(controller) + timerScheduleToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityScheduleToCloseLatency.Name()).Return(timerScheduleToCloseLatency) + + counterCancel := metrics.NewMockCounterIface(controller) + counterCancel.EXPECT().Record(int64(1)).Times(1) + metricsHandler.EXPECT().Counter(metrics.ActivityCancel.Name()).Return(counterCancel) + + event := cancelEvent{ + details: payloads.EncodeString("Details"), + handler: metricsHandler, + } + + err := TransitionCanceled.Apply(activity, ctx, event) require.NoError(t, err) require.Equal(t, activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED, activity.Status) diff --git a/service/history/api/respondactivitytaskcanceled/api.go b/service/history/api/respondactivitytaskcanceled/api.go index a66dfc9a566..f5bd9c5444a 100644 --- a/service/history/api/respondactivitytaskcanceled/api.go +++ b/service/history/api/respondactivitytaskcanceled/api.go @@ -13,6 +13,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/tasktoken" + "go.temporal.io/server/common/tqid" "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/consts" historyi "go.temporal.io/server/service/history/interfaces" @@ -40,13 +41,31 @@ func Invoke( // Handle standalone activity if component ref is present in the token if componentRef := token.GetComponentRef(); len(componentRef) > 0 { + metricsHandlerBuilderParams, err := chasm.ReadComponent(ctx, componentRef, (*activity.Activity).GetMetricsHandlerParams, nil) + if err != nil { + return nil, err + } + + taskQueueName := metricsHandlerBuilderParams.TaskQueueName + + handler := metrics.GetPerTaskQueueFamilyScope( + shard.GetMetricsHandler(), + namespace.String(), + tqid.UnsafeTaskQueueFamily(req.GetNamespaceId(), taskQueueName), + shard.GetConfig().BreakdownMetricsByTaskQueue(namespace.String(), taskQueueName, enumspb.TASK_QUEUE_TYPE_ACTIVITY), + metrics.OperationTag(metrics.HistoryRespondActivityTaskCanceledScope), + metrics.ActivityTypeTag(metricsHandlerBuilderParams.ActivityType), + metrics.VersioningBehaviorTag(enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED), + ) + response, _, err := chasm.UpdateComponent( ctx, componentRef, (*activity.Activity).HandleCanceled, - activity.WithToken[*historyservice.RespondActivityTaskCanceledRequest]{ - Token: token, - Request: req, + activity.RespondCancelledReqWrapper{ + Request: req, + Token: token, + MetricsHandler: handler, }, ) diff --git a/service/history/api/respondactivitytaskcompleted/api.go b/service/history/api/respondactivitytaskcompleted/api.go index 689a0077b88..151faa08bc7 100644 --- a/service/history/api/respondactivitytaskcompleted/api.go +++ b/service/history/api/respondactivitytaskcompleted/api.go @@ -13,6 +13,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/tasktoken" + "go.temporal.io/server/common/tqid" "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/consts" historyi "go.temporal.io/server/service/history/interfaces" @@ -40,13 +41,31 @@ func Invoke( // Handle standalone activity if component ref is present in the token if componentRef := token.GetComponentRef(); len(componentRef) > 0 { + metricsHandlerBuilderParams, err := chasm.ReadComponent(ctx, componentRef, (*activity.Activity).GetMetricsHandlerParams, nil) + if err != nil { + return nil, err + } + + taskQueueName := metricsHandlerBuilderParams.TaskQueueName + + handler := metrics.GetPerTaskQueueFamilyScope( + shard.GetMetricsHandler(), + namespace.String(), + tqid.UnsafeTaskQueueFamily(req.GetNamespaceId(), taskQueueName), + shard.GetConfig().BreakdownMetricsByTaskQueue(namespace.String(), taskQueueName, enumspb.TASK_QUEUE_TYPE_ACTIVITY), + metrics.OperationTag(metrics.HistoryRespondActivityTaskCompletedScope), + metrics.ActivityTypeTag(metricsHandlerBuilderParams.ActivityType), + metrics.VersioningBehaviorTag(enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED), + ) + response, _, err := chasm.UpdateComponent( ctx, componentRef, (*activity.Activity).HandleCompleted, - activity.WithToken[*historyservice.RespondActivityTaskCompletedRequest]{ - Token: token, - Request: req, + activity.RespondCompletedReqWrapper{ + Request: req, + Token: token, + MetricsHandler: handler, }, ) diff --git a/service/history/api/respondactivitytaskfailed/api.go b/service/history/api/respondactivitytaskfailed/api.go index d19c8759bd5..dda0438b9a6 100644 --- a/service/history/api/respondactivitytaskfailed/api.go +++ b/service/history/api/respondactivitytaskfailed/api.go @@ -14,6 +14,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/tasktoken" + "go.temporal.io/server/common/tqid" "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/consts" historyi "go.temporal.io/server/service/history/interfaces" @@ -41,13 +42,31 @@ func Invoke( // Handle standalone activity if component ref is present in the token if componentRef := token.GetComponentRef(); len(componentRef) > 0 { + metricsHandlerBuilderParams, err := chasm.ReadComponent(ctx, componentRef, (*activity.Activity).GetMetricsHandlerParams, nil) + if err != nil { + return nil, err + } + + taskQueueName := metricsHandlerBuilderParams.TaskQueueName + + handler := metrics.GetPerTaskQueueFamilyScope( + shard.GetMetricsHandler(), + namespace.String(), + tqid.UnsafeTaskQueueFamily(req.GetNamespaceId(), taskQueueName), + shard.GetConfig().BreakdownMetricsByTaskQueue(namespace.String(), taskQueueName, enumspb.TASK_QUEUE_TYPE_ACTIVITY), + metrics.OperationTag(metrics.HistoryRespondActivityTaskFailedScope), + metrics.ActivityTypeTag(metricsHandlerBuilderParams.ActivityType), + metrics.VersioningBehaviorTag(enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED), + ) + response, _, err := chasm.UpdateComponent( ctx, componentRef, (*activity.Activity).HandleFailed, - activity.WithToken[*historyservice.RespondActivityTaskFailedRequest]{ - Token: token, - Request: req, + activity.RespondFailedReqWrapper{ + Request: req, + Token: token, + MetricsHandler: handler, }, ) From e01e7fabeb46adfb423f1ea419cb85c35435386b Mon Sep 17 00:00:00 2001 From: Fred Tzeng Date: Tue, 9 Dec 2025 15:10:32 -0800 Subject: [PATCH 2/3] Fix lint. --- chasm/lib/activity/activity_tasks.go | 4 ++-- chasm/lib/activity/statemachine.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/chasm/lib/activity/activity_tasks.go b/chasm/lib/activity/activity_tasks.go index e0340195a67..ba9bb731dbe 100644 --- a/chasm/lib/activity/activity_tasks.go +++ b/chasm/lib/activity/activity_tasks.go @@ -277,8 +277,8 @@ func (e *heartbeatTimeoutTaskExecutor) Validate( // On the i-th execution of this function, we look back into the past and determine whether the // last heartbeat was received after hb_i. If so, we reject this timeout task. Otherwise, the // Execute function runs and we fail the attempt. - if !(activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED || - activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED) { + if activity.Status != activitypb.ACTIVITY_EXECUTION_STATUS_STARTED && + activity.Status != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { return false, nil } // Task attempt must still match current attempt. diff --git a/chasm/lib/activity/statemachine.go b/chasm/lib/activity/statemachine.go index c3967b3c24c..343f49f423b 100644 --- a/chasm/lib/activity/statemachine.go +++ b/chasm/lib/activity/statemachine.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "go.temporal.io/api/common/v1" + commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" failurepb "go.temporal.io/api/failure/v1" "go.temporal.io/api/workflowservice/v1" @@ -257,7 +257,7 @@ var TransitionCancelRequested = chasm.NewTransition( ) type cancelEvent struct { - details *common.Payloads + details *commonpb.Payloads handler metrics.Handler } From d73da0c36ff79ea9866121055494f6760fdb74ee Mon Sep 17 00:00:00 2001 From: Fred Tzeng Date: Tue, 9 Dec 2025 15:24:52 -0800 Subject: [PATCH 3/3] Added termination metrics emit. --- chasm/lib/activity/activity.go | 36 ++++++++++++++++++------- chasm/lib/activity/handler.go | 30 +++++++++++++++++++-- chasm/lib/activity/statemachine.go | 7 +++-- chasm/lib/activity/statemachine_test.go | 32 ++++++++++++++++++---- common/metrics/metric_defs.go | 1 + 5 files changed, 87 insertions(+), 19 deletions(-) diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index b96bd47e769..300c1e4262b 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -83,12 +83,18 @@ type RespondCancelledReqWrapper struct { MetricsHandler metrics.Handler } -// RequestCancelActivityReqWrapper wraps the RequestCancelActivityExecutionRequest with context-specific data. -type RequestCancelActivityReqWrapper struct { +// requestCancelActivityReqWrapper wraps the RequestCancelActivityExecutionRequest with context-specific data. +type requestCancelActivityReqWrapper struct { request *activitypb.RequestCancelActivityExecutionRequest metricsHandler metrics.Handler } +// terminateActivityReqWrapper wraps the TerminateActivityExecutionRequest with context-specific data. +type terminateActivityReqWrapper struct { + request *activitypb.TerminateActivityExecutionRequest + metricsHandler metrics.Handler +} + func (a *Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState { switch a.Status { case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED: @@ -314,7 +320,7 @@ func (a *Activity) HandleCanceled( return &historyservice.RespondActivityTaskCanceledResponse{}, nil } -func (a *Activity) handleTerminated(ctx chasm.MutableContext, req *activitypb.TerminateActivityExecutionRequest) ( +func (a *Activity) handleTerminated(ctx chasm.MutableContext, req terminateActivityReqWrapper) ( *activitypb.TerminateActivityExecutionResponse, error, ) { if err := TransitionTerminated.Apply(a, ctx, req); err != nil { @@ -335,7 +341,7 @@ func (a *Activity) getOrCreateLastHeartbeat(ctx chasm.MutableContext) *activityp return heartbeat } -func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, reqWrapper RequestCancelActivityReqWrapper) ( +func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, reqWrapper requestCancelActivityReqWrapper) ( *activitypb.RequestCancelActivityExecutionResponse, error, ) { req := reqWrapper.request.GetFrontendRequest() @@ -689,7 +695,6 @@ func (a *Activity) StoreOrSelf(ctx chasm.Context) ActivityStore { } func (a *Activity) emitOnAttemptTimedOutMetrics(ctx chasm.Context, handler metrics.Handler, timeoutType enumspb.TimeoutType) { - // TODO ignore err for now as it won't be there after rebase on main attempt := a.LastAttempt.Get(ctx) startedTime := attempt.GetStartedTime().AsTime() @@ -701,7 +706,6 @@ func (a *Activity) emitOnAttemptTimedOutMetrics(ctx chasm.Context, handler metri } func (a *Activity) emitOnAttemptFailedMetrics(ctx chasm.Context, handler metrics.Handler) { - // TODO ignore err for now as it won't be there after rebase on main attempt := a.LastAttempt.Get(ctx) startedTime := attempt.GetStartedTime().AsTime() @@ -712,7 +716,6 @@ func (a *Activity) emitOnAttemptFailedMetrics(ctx chasm.Context, handler metrics } func (a *Activity) emitOnCompletedMetrics(ctx chasm.Context, handler metrics.Handler) { - // TODO ignore err for now as it won't be there after rebase on main attempt := a.LastAttempt.Get(ctx) startedTime := attempt.GetStartedTime().AsTime() @@ -726,7 +729,6 @@ func (a *Activity) emitOnCompletedMetrics(ctx chasm.Context, handler metrics.Han } func (a *Activity) emitOnFailedMetrics(ctx chasm.Context, handler metrics.Handler) { - // TODO ignore err for now as it won't be there after rebase on main attempt := a.LastAttempt.Get(ctx) startedTime := attempt.GetStartedTime().AsTime() @@ -741,7 +743,6 @@ func (a *Activity) emitOnFailedMetrics(ctx chasm.Context, handler metrics.Handle } func (a *Activity) emitOnCanceledMetrics(ctx chasm.Context, handler metrics.Handler) { - // TODO ignore err for now as it won't be there after rebase on main attempt := a.LastAttempt.Get(ctx) startedTime := attempt.GetStartedTime().AsTime() @@ -757,8 +758,23 @@ func (a *Activity) emitOnCanceledMetrics(ctx chasm.Context, handler metrics.Hand metrics.ActivityCancel.With(handler).Record(1) } +func (a *Activity) emitOnTerminatedMetrics(ctx chasm.Context, handler metrics.Handler) { + attempt := a.LastAttempt.Get(ctx) + startedTime := attempt.GetStartedTime().AsTime() + + // Terminate can happen before start, so guard against zero time + if !startedTime.IsZero() { + startToCloseLatency := time.Since(startedTime) + metrics.ActivityStartToCloseLatency.With(handler).Record(startToCloseLatency) + } + + scheduleToCloseLatency := time.Since(a.GetScheduleTime().AsTime()) + metrics.ActivityScheduleToCloseLatency.With(handler).Record(scheduleToCloseLatency) + + metrics.ActivityTerminate.With(handler).Record(1) +} + func (a *Activity) emitOnTimedOutMetrics(ctx chasm.Context, handler metrics.Handler, timeoutType enumspb.TimeoutType) { - // TODO ignore err for now as it won't be there after rebase on main attempt := a.LastAttempt.Get(ctx) startedTime := attempt.GetStartedTime().AsTime() diff --git a/chasm/lib/activity/handler.go b/chasm/lib/activity/handler.go index 055868a8e0a..9a985a61345 100644 --- a/chasm/lib/activity/handler.go +++ b/chasm/lib/activity/handler.go @@ -232,11 +232,37 @@ func (h *handler) TerminateActivityExecution( RunID: frontendReq.GetRunId(), }) + breakdownMetricsByTaskQueue := h.config.BreakdownMetricsByTaskQueue + namespaceName, err := h.namespaceRegistry.GetNamespaceName(namespace.ID(req.GetNamespaceId())) + if err != nil { + return nil, err + } + + metricsHandlerBuilderParams, err := chasm.ReadComponent(ctx, ref, (*Activity).GetMetricsHandlerParams, nil) + if err != nil { + return nil, err + } + + taskQueueFamily := metricsHandlerBuilderParams.TaskQueueName + + metricsHandler := metrics.GetPerTaskQueueFamilyScope( + h.metricsHandler, + namespaceName.String(), + tqid.UnsafeTaskQueueFamily(namespaceName.String(), taskQueueFamily), + breakdownMetricsByTaskQueue(namespaceName.String(), taskQueueFamily, enumspb.TASK_QUEUE_TYPE_ACTIVITY), + metrics.OperationTag(metrics.TimerActiveTaskActivityTimeoutScope), + metrics.ActivityTypeTag(metricsHandlerBuilderParams.ActivityType), + metrics.VersioningBehaviorTag(enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED), + ) + response, _, err = chasm.UpdateComponent( ctx, ref, (*Activity).handleTerminated, - req, + terminateActivityReqWrapper{ + request: req, + metricsHandler: metricsHandler, + }, ) if err != nil { @@ -286,7 +312,7 @@ func (h *handler) RequestCancelActivityExecution( ctx, ref, (*Activity).handleCancellationRequested, - RequestCancelActivityReqWrapper{ + requestCancelActivityReqWrapper{ request: req, metricsHandler: metricsHandler, }, diff --git a/chasm/lib/activity/statemachine.go b/chasm/lib/activity/statemachine.go index 343f49f423b..362163c9c84 100644 --- a/chasm/lib/activity/statemachine.go +++ b/chasm/lib/activity/statemachine.go @@ -218,12 +218,12 @@ var TransitionTerminated = chasm.NewTransition( activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED, - func(a *Activity, ctx chasm.MutableContext, req *activitypb.TerminateActivityExecutionRequest) error { + func(a *Activity, ctx chasm.MutableContext, reqWrapper terminateActivityReqWrapper) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { outcome := a.Outcome.Get(ctx) failure := &failurepb.Failure{ // TODO if the reason isn't provided, perhaps set a default reason. Also see if we should prefix with "Activity terminated: " - Message: req.GetFrontendRequest().GetReason(), + Message: reqWrapper.request.GetFrontendRequest().GetReason(), FailureInfo: &failurepb.Failure_TerminatedFailureInfo{}, } outcome.Variant = &activitypb.ActivityOutcome_Failed_{ @@ -231,6 +231,9 @@ var TransitionTerminated = chasm.NewTransition( Failure: failure, }, } + + a.emitOnCanceledMetrics(ctx, reqWrapper.metricsHandler) + return nil }) }, diff --git a/chasm/lib/activity/statemachine_test.go b/chasm/lib/activity/statemachine_test.go index 99c098e22f9..ec12d602c9b 100644 --- a/chasm/lib/activity/statemachine_test.go +++ b/chasm/lib/activity/statemachine_test.go @@ -569,22 +569,44 @@ func TestTransitionTerminated(t *testing.T) { activity := &Activity{ ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-activity-type"}, RetryPolicy: defaultRetryPolicy, ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), ScheduleToStartTimeout: durationpb.New(defaultScheduleToStartTimeout), StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), Status: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, }, LastAttempt: chasm.NewDataField(ctx, attemptState), Outcome: chasm.NewDataField(ctx, outcome), } - err := TransitionTerminated.Apply(activity, ctx, &activitypb.TerminateActivityExecutionRequest{ - FrontendRequest: &workflowservice.TerminateActivityExecutionRequest{ - Reason: "Test Termination", - Identity: "terminator", + controller := gomock.NewController(t) + metricsHandler := metrics.NewMockHandler(controller) + + timerStartToCloseLatency := metrics.NewMockTimerIface(controller) + timerStartToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityStartToCloseLatency.Name()).Return(timerStartToCloseLatency) + + timerScheduleToCloseLatency := metrics.NewMockTimerIface(controller) + timerScheduleToCloseLatency.EXPECT().Record(gomock.Any()).Times(1) + metricsHandler.EXPECT().Timer(metrics.ActivityScheduleToCloseLatency.Name()).Return(timerScheduleToCloseLatency) + + counterCancel := metrics.NewMockCounterIface(controller) + counterCancel.EXPECT().Record(int64(1)).Times(1) + metricsHandler.EXPECT().Counter(metrics.ActivityCancel.Name()).Return(counterCancel) + + reqWrapper := terminateActivityReqWrapper{ + request: &activitypb.TerminateActivityExecutionRequest{ + FrontendRequest: &workflowservice.TerminateActivityExecutionRequest{ + Reason: "Test Termination", + Identity: "terminator", + }, }, - }) + metricsHandler: metricsHandler, + } + + err := TransitionTerminated.Apply(activity, ctx, reqWrapper) require.NoError(t, err) require.Equal(t, activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED, activity.Status) require.EqualValues(t, 1, attemptState.Count) diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 354c79d653d..c671766943a 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -862,6 +862,7 @@ var ( ActivityFail = NewCounterDef("activity_fail", WithDescription("Number of activities that failed and won't be retried anymore.")) ActivityTaskFail = NewCounterDef("activity_task_fail", WithDescription("Number of activity task failures (includes retries).")) ActivityCancel = NewCounterDef("activity_cancel") + ActivityTerminate = NewCounterDef("activity_terminate") ActivityTaskTimeout = NewCounterDef("activity_task_timeout", WithDescription("Number of activity task timeouts (including retries).")) ActivityTimeout = NewCounterDef("activity_timeout", WithDescription("Number of terminal activity timeouts.")) ActivityPayloadSize = NewCounterDef("activity_payload_size", WithDescription("Size of activity payloads in bytes."))