-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Standalone activity heartbeating #8730
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: standalone-activity
Are you sure you want to change the base?
Conversation
Cleanup Add test of cross-execution token usage Fix test Change DeserializeComponentRef to return error on empty input Handle empty token same as absent wait policy Remove chasm engine from tests Cleanup Reinstate ignored opts Refactor: ErrorAs Refactor: NotFound ErrorAs Revert "Fix CHASM not found errors" This reverts commit d3c709c. Commute NotFound error in activity handler errors.New Cleanup Revert irrelevant changes Changes from code review
This reverts commit a92b69e.
This reverts commit f322ea18257a1e485d839f2af98c689c73c0f110.
| func (a *Activity) HandleStarted( | ||
| ctx chasm.MutableContext, | ||
| request *historyservice.RecordActivityTaskStartedRequest, | ||
| ) (*historyservice.RecordActivityTaskStartedResponse, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignore: formatting only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like @fretz12 and you have different ideas on how to format these signatures. I personally like the style in this PR better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this formatting better too, but ideally we should have the linter take care of it.
I'm running gofumpt locally, it seems to do a good job... maybe introduce it as part of the lint-code target?
| ctx chasm.Context, | ||
| key chasm.EntityKey, | ||
| response *historyservice.RecordActivityTaskStartedResponse, | ||
| ) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignore: formatting only
| func (a *Activity) handleCancellationRequested( | ||
| ctx chasm.MutableContext, | ||
| req *activitypb.CancelActivityExecutionRequest, | ||
| ) (*activitypb.CancelActivityExecutionResponse, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignore: formatting only
| func (a *Activity) HandleStarted( | ||
| ctx chasm.MutableContext, | ||
| request *historyservice.RecordActivityTaskStartedRequest, | ||
| ) (*historyservice.RecordActivityTaskStartedResponse, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like @fretz12 and you have different ideas on how to format these signatures. I personally like the style in this PR better.
| } | ||
|
|
||
| // Validate validates a HeartbeatTimeoutTask. | ||
| func (e *heartbeatTimeoutTaskExecutor) Validate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You will want to validate that the schedule time in the attributes is still relevant here not in the execute function. It should deterministic function of the last heartbeat time and the attempt start time (hbDeadline below should be equal to the schedule time).
| hbTimeout := activity.GetHeartbeatTimeout().AsDuration() | ||
| attemptStartTime := attempt.GetStartedTime().AsTime() | ||
| lastHbTime := lastHb.GetRecordedTime().AsTime() // could be from a previous attempt | ||
| // No heartbeats in the attempt so far is equivalent to a heartbeat having been sent at attempt | ||
| // start time. | ||
| hbDeadline := util.MaxTime(lastHbTime, attemptStartTime).Add(hbTimeout) | ||
|
|
||
| if ctx.Now(activity).Before(hbDeadline) { | ||
| // Deadline has not expired; schedule a new task. | ||
| ctx.AddTask( | ||
| activity, | ||
| chasm.TaskAttributes{ | ||
| ScheduledTime: hbDeadline, | ||
| }, | ||
| &activitypb.HeartbeatTimeoutTask{ | ||
| Attempt: attempt.GetCount(), | ||
| }, | ||
| ) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't happen here. Validate in the Validate function. Schedule a new task any time a heartbeat is received.
chasm/lib/activity/library.go
Outdated
| l.activityDispatchTaskExecutor, | ||
| l.activityDispatchTaskExecutor, | ||
| ), | ||
| // TODO(dan): why are the task names "FooTimer" but "FooTimeoutTask" in the struct names? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We agreed not to add Task to the task names. Not sure if we want to use TaskExecutor vs. just Executor for the struct names but that is not critical and can change easily. The string names cannot be changed easily OTOH because they affect how tasks are represented in persistence.
| Details: input.Request.HeartbeatRequest.GetDetails(), | ||
| }) | ||
| return nil, nil | ||
| return &historyservice.RecordActivityTaskHeartbeatResponse{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just before returning here you want to generate a new heartbeat task if the heartbeat timeout is set.
| require.Error(t, err) | ||
| statusErr := serviceerror.ToStatus(err) | ||
| require.NotNil(t, statusErr) | ||
| require.Equal(t, codes.InvalidArgument, statusErr.Code()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, you can use require.ErrorAs(err, &invalidArgumentErr)
| ) error { | ||
| if a.Status != activitystatepb.ACTIVITY_EXECUTION_STATUS_STARTED && | ||
| a.Status != activitystatepb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { | ||
| return serviceerror.NewNotFound("activity task not found") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong error message? Also NotFound doesn't feel like the right error type, even though I get it's checking if a token "matches". Perhaps FailPrecondition?
| return err | ||
| } | ||
| if token.Attempt != attempt.GetCount() { | ||
| return serviceerror.NewNotFound("activity task not found") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above
| if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil { | ||
| return nil, err | ||
| } | ||
| a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bergundy is there any performance pentalty if we create a new field on every hearbeat?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, no penalty.
chasm/lib/activity/activity.go
Outdated
| a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{ | ||
| RecordedTime: timestamppb.New(ctx.Now(a)), | ||
| Details: details, | ||
| Details: input.Request.HeartbeatRequest.GetDetails(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Details: input.Request.HeartbeatRequest.GetDetails(), | |
| Details: input.Request.GetHeartbeatRequest().GetDetails(), |
chasm/lib/activity/activity_tasks.go
Outdated
| func (e *heartbeatTimeoutTaskExecutor) Execute( | ||
| ctx chasm.MutableContext, | ||
| activity *Activity, | ||
| taskAttrs chasm.TaskAttributes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: taskAttrs, task are unused
| ctx.AddTask( | ||
| activity, | ||
| chasm.TaskAttributes{ | ||
| ScheduledTime: hbDeadline, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If hbDeadline happens to be attemptStartTime + hbTimeout (i.e., no hearbeat recorded yet), won't the next timer basically pop the same time as the current one being executed?
- Hearbeats always add a task - Task is invalid if lastHeartbeat (or activity start) is within timeout - Otherwise task fails attempt
| &activitypb.HeartbeatTimeoutTask{ | ||
| Attempt: attempt.GetCount(), | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Heartbeat task scheduled even when timeout is zero
The RecordHeartbeat function unconditionally schedules a HeartbeatTimeoutTask regardless of whether a heartbeat timeout is configured. When HeartbeatTimeout is zero or not set, ctx.Now(a).Add(a.GetHeartbeatTimeout().AsDuration()) schedules a task for "now", which is incorrect behavior. The code in TransitionStarted correctly guards this with if heartbeatTimeout := a.GetHeartbeatTimeout().AsDuration(); heartbeatTimeout > 0 before adding the task. The same guard is needed here.
81b2516 to
c8e5800
Compare
What changed?
Implement heartbeating for CHASM activities (standalone activity)
Why?
Required feature
How did you test it?
Note
Implements heartbeat recording and timeout handling for standalone (CHASM) activities, including token validation, a heartbeat timer task, API wiring, and tests.
RecordHeartbeatwith task token validation and response (CancelRequested), persisting last heartbeat and scheduling nextheartbeatTimer.WithToken[T]wrapper and apply token validation toHandleCompleted/Failed/Canceled.ValidateActivityTaskTokenand use in handlers.TransitionStarted; handleTIMEOUT_TYPE_HEARTBEATinTransitionTimedOut.HeartbeatTimeoutTaskproto and executor (validate deadline/attempt; reschedule or timeout; retry viaTransitionRescheduled).fx/library modules.RecordActivityTaskHeartbeat,RespondActivityTask{Completed,Failed,Canceled}to call CHASM withWithToken.proto/v1/tasks.protoand generated files forHeartbeatTimeoutTaskhelpers/messages.Written by Cursor Bugbot for commit 7c0893a. This will update automatically on new commits. Configure here.