Skip to content

Conversation

@dandavison
Copy link
Contributor

@dandavison dandavison commented Dec 1, 2025

What changed?

Implement heartbeating for CHASM activities (standalone activity)

Why?

Required feature

How did you test it?

  • built
  • added new functional test(s)

Note

Implements heartbeat recording and timeout handling for standalone (CHASM) activities, including token validation, a heartbeat timer task, API wiring, and tests.

  • Activity (CHASM library):
    • Add RecordHeartbeat with task token validation and response (CancelRequested), persisting last heartbeat and scheduling next heartbeatTimer.
    • Introduce WithToken[T] wrapper and apply token validation to HandleCompleted/Failed/Canceled.
    • Add ValidateActivityTaskToken and use in handlers.
    • Schedule heartbeat timer on TransitionStarted; handle TIMEOUT_TYPE_HEARTBEAT in TransitionTimedOut.
  • Tasks/Executors:
    • Add HeartbeatTimeoutTask proto and executor (validate deadline/attempt; reschedule or timeout; retry via TransitionRescheduled).
    • Register new task in fx/library modules.
  • History APIs:
    • Wire standalone path for RecordActivityTaskHeartbeat, RespondActivityTask{Completed,Failed,Canceled} to call CHASM with WithToken.
  • Protos/Generated:
    • Extend proto/v1/tasks.proto and generated files for HeartbeatTimeoutTask helpers/messages.
  • Tests:
    • Add heartbeat tests: invalid/malformed tokens, stale token/attempt, cancel-requested flag, details propagation on retry, heartbeat timeout (with/without retry), and keepalive heartbeats.
    • Minor assertion message improvements in existing tests.

Written by Cursor Bugbot for commit 7c0893a. This will update automatically on new commits. Configure here.

Cleanup

Add test of cross-execution token usage

Fix test

Change DeserializeComponentRef to return error on empty input

Handle empty token same as absent wait policy

Remove chasm engine from tests

Cleanup

Reinstate ignored opts

Refactor: ErrorAs

Refactor: NotFound ErrorAs

Revert "Fix CHASM not found errors"

This reverts commit d3c709c.

Commute NotFound error in activity handler

errors.New

Cleanup

Revert irrelevant changes

Changes from code review
func (a *Activity) HandleStarted(
ctx chasm.MutableContext,
request *historyservice.RecordActivityTaskStartedRequest,
) (*historyservice.RecordActivityTaskStartedResponse, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore: formatting only

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like @fretz12 and you have different ideas on how to format these signatures. I personally like the style in this PR better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this formatting better too, but ideally we should have the linter take care of it.
I'm running gofumpt locally, it seems to do a good job... maybe introduce it as part of the lint-code target?

ctx chasm.Context,
key chasm.EntityKey,
response *historyservice.RecordActivityTaskStartedResponse,
) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore: formatting only

func (a *Activity) handleCancellationRequested(
ctx chasm.MutableContext,
req *activitypb.CancelActivityExecutionRequest,
) (*activitypb.CancelActivityExecutionResponse, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore: formatting only

func (a *Activity) HandleStarted(
ctx chasm.MutableContext,
request *historyservice.RecordActivityTaskStartedRequest,
) (*historyservice.RecordActivityTaskStartedResponse, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like @fretz12 and you have different ideas on how to format these signatures. I personally like the style in this PR better.

}

// Validate validates a HeartbeatTimeoutTask.
func (e *heartbeatTimeoutTaskExecutor) Validate(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will want to validate that the schedule time in the attributes is still relevant here not in the execute function. It should deterministic function of the last heartbeat time and the attempt start time (hbDeadline below should be equal to the schedule time).

Comment on lines +230 to +249
hbTimeout := activity.GetHeartbeatTimeout().AsDuration()
attemptStartTime := attempt.GetStartedTime().AsTime()
lastHbTime := lastHb.GetRecordedTime().AsTime() // could be from a previous attempt
// No heartbeats in the attempt so far is equivalent to a heartbeat having been sent at attempt
// start time.
hbDeadline := util.MaxTime(lastHbTime, attemptStartTime).Add(hbTimeout)

if ctx.Now(activity).Before(hbDeadline) {
// Deadline has not expired; schedule a new task.
ctx.AddTask(
activity,
chasm.TaskAttributes{
ScheduledTime: hbDeadline,
},
&activitypb.HeartbeatTimeoutTask{
Attempt: attempt.GetCount(),
},
)
return nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't happen here. Validate in the Validate function. Schedule a new task any time a heartbeat is received.

l.activityDispatchTaskExecutor,
l.activityDispatchTaskExecutor,
),
// TODO(dan): why are the task names "FooTimer" but "FooTimeoutTask" in the struct names?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We agreed not to add Task to the task names. Not sure if we want to use TaskExecutor vs. just Executor for the struct names but that is not critical and can change easily. The string names cannot be changed easily OTOH because they affect how tasks are represented in persistence.

Details: input.Request.HeartbeatRequest.GetDetails(),
})
return nil, nil
return &historyservice.RecordActivityTaskHeartbeatResponse{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just before returning here you want to generate a new heartbeat task if the heartbeat timeout is set.

Comment on lines +1452 to +1455
require.Error(t, err)
statusErr := serviceerror.ToStatus(err)
require.NotNil(t, statusErr)
require.Equal(t, codes.InvalidArgument, statusErr.Code())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, you can use require.ErrorAs(err, &invalidArgumentErr)

) error {
if a.Status != activitystatepb.ACTIVITY_EXECUTION_STATUS_STARTED &&
a.Status != activitystatepb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED {
return serviceerror.NewNotFound("activity task not found")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong error message? Also NotFound doesn't feel like the right error type, even though I get it's checking if a token "matches". Perhaps FailPrecondition?

return err
}
if token.Attempt != attempt.GetCount() {
return serviceerror.NewNotFound("activity task not found")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above

if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil {
return nil, err
}
a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bergundy is there any performance pentalty if we create a new field on every hearbeat?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, no penalty.

a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{
RecordedTime: timestamppb.New(ctx.Now(a)),
Details: details,
Details: input.Request.HeartbeatRequest.GetDetails(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Details: input.Request.HeartbeatRequest.GetDetails(),
Details: input.Request.GetHeartbeatRequest().GetDetails(),

func (e *heartbeatTimeoutTaskExecutor) Execute(
ctx chasm.MutableContext,
activity *Activity,
taskAttrs chasm.TaskAttributes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: taskAttrs, task are unused

ctx.AddTask(
activity,
chasm.TaskAttributes{
ScheduledTime: hbDeadline,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If hbDeadline happens to be attemptStartTime + hbTimeout (i.e., no hearbeat recorded yet), won't the next timer basically pop the same time as the current one being executed?

- Hearbeats always add a task
- Task is invalid if lastHeartbeat (or activity start) is within timeout
- Otherwise task fails attempt
&activitypb.HeartbeatTimeoutTask{
Attempt: attempt.GetCount(),
},
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Heartbeat task scheduled even when timeout is zero

The RecordHeartbeat function unconditionally schedules a HeartbeatTimeoutTask regardless of whether a heartbeat timeout is configured. When HeartbeatTimeout is zero or not set, ctx.Now(a).Add(a.GetHeartbeatTimeout().AsDuration()) schedules a task for "now", which is incorrect behavior. The code in TransitionStarted correctly guards this with if heartbeatTimeout := a.GetHeartbeatTimeout().AsDuration(); heartbeatTimeout > 0 before adding the task. The same guard is needed here.

Fix in Cursor Fix in Web

@dandavison dandavison force-pushed the saa-schedule-to-close-bug-2 branch 3 times, most recently from 81b2516 to c8e5800 Compare December 6, 2025 00:24
Base automatically changed from saa-schedule-to-close-bug-2 to standalone-activity December 6, 2025 00:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants