Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 178 additions & 129 deletions api/deployment/v1/message.pb.go

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions api/matchingservice/v1/request_response.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,8 @@ func HasDeploymentVersion(deployments *persistencespb.DeploymentData, v *deploym

// Check for the presence of the version in the new DeploymentData format.
if deploymentData, ok := deployments.GetDeploymentsData()[v.GetDeploymentName()]; ok {
return deploymentData.GetVersions()[v.GetBuildId()] != nil
vd := deploymentData.GetVersions()[v.GetBuildId()]
return vd != nil && !vd.GetDeleted()
}

return false
Expand Down
135 changes: 135 additions & 0 deletions common/worker_versioning/worker_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,141 @@ func TestCalculateTaskQueueVersioningInfo(t *testing.T) {
},
},
}},
// Tests with deleted versions
{name: "new format: current version marked as deleted should be ignored", wantCurrent: nil,
data: &persistencespb.DeploymentData{
DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{
"foo": {
RoutingConfig: &deploymentpb.RoutingConfig{
CurrentDeploymentVersion: &deploymentpb.WorkerDeploymentVersion{DeploymentName: "foo", BuildId: v1.GetBuildId()},
CurrentVersionChangedTime: t2,
},
Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{
v1.GetBuildId(): {Deleted: true},
},
},
},
}},
{name: "new format: ramping version marked as deleted should be ignored", wantRamping: nil,
data: &persistencespb.DeploymentData{
DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{
"foo": {
RoutingConfig: &deploymentpb.RoutingConfig{
RampingDeploymentVersion: &deploymentpb.WorkerDeploymentVersion{DeploymentName: "foo", BuildId: v2.GetBuildId()},
RampingVersionPercentage: 50,
RampingVersionPercentageChangedTime: t2,
},
Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{
v2.GetBuildId(): {Deleted: true},
},
},
},
}},
{name: "new format: current deleted, ramping not deleted -> only ramping returned", wantCurrent: nil, wantRamping: v2,
data: &persistencespb.DeploymentData{
DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{
"foo": {
RoutingConfig: &deploymentpb.RoutingConfig{
CurrentDeploymentVersion: &deploymentpb.WorkerDeploymentVersion{DeploymentName: "foo", BuildId: v1.GetBuildId()},
CurrentVersionChangedTime: t1,
RampingDeploymentVersion: &deploymentpb.WorkerDeploymentVersion{DeploymentName: "foo", BuildId: v2.GetBuildId()},
RampingVersionPercentage: 30,
RampingVersionPercentageChangedTime: t2,
},
Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{
v1.GetBuildId(): {Deleted: true},
v2.GetBuildId(): {Deleted: false},
},
},
},
}},
{name: "new format: ramping deleted, current not deleted -> only current returned", wantCurrent: v1, wantRamping: nil,
data: &persistencespb.DeploymentData{
DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{
"foo": {
RoutingConfig: &deploymentpb.RoutingConfig{
CurrentDeploymentVersion: &deploymentpb.WorkerDeploymentVersion{DeploymentName: "foo", BuildId: v1.GetBuildId()},
CurrentVersionChangedTime: t1,
RampingDeploymentVersion: &deploymentpb.WorkerDeploymentVersion{DeploymentName: "foo", BuildId: v2.GetBuildId()},
RampingVersionPercentage: 30,
RampingVersionPercentageChangedTime: t2,
},
Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{
v1.GetBuildId(): {Deleted: false},
v2.GetBuildId(): {Deleted: true},
},
},
},
}},
{name: "new format: both current and ramping deleted -> both nil", wantCurrent: nil, wantRamping: nil,
data: &persistencespb.DeploymentData{
DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{
"foo": {
RoutingConfig: &deploymentpb.RoutingConfig{
CurrentDeploymentVersion: &deploymentpb.WorkerDeploymentVersion{DeploymentName: "foo", BuildId: v1.GetBuildId()},
CurrentVersionChangedTime: t1,
RampingDeploymentVersion: &deploymentpb.WorkerDeploymentVersion{DeploymentName: "foo", BuildId: v2.GetBuildId()},
RampingVersionPercentage: 30,
RampingVersionPercentageChangedTime: t2,
},
Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{
v1.GetBuildId(): {Deleted: true},
v2.GetBuildId(): {Deleted: true},
},
},
},
}},
{name: "mixed: new current deleted falls back to old current", wantCurrent: v1,
data: &persistencespb.DeploymentData{
Versions: []*deploymentspb.DeploymentVersionData{
{Version: v1, CurrentSinceTime: t1, RoutingUpdateTime: t1},
},
DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{
"foo": {
RoutingConfig: &deploymentpb.RoutingConfig{
CurrentDeploymentVersion: &deploymentpb.WorkerDeploymentVersion{DeploymentName: "foo", BuildId: v2.GetBuildId()},
CurrentVersionChangedTime: t2,
},
Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{
v2.GetBuildId(): {Deleted: true},
},
},
},
}},
{name: "mixed: new ramping deleted falls back to old ramping", wantRamping: v1,
data: &persistencespb.DeploymentData{
Versions: []*deploymentspb.DeploymentVersionData{
{Version: v1, RampingSinceTime: t1, RoutingUpdateTime: t1, RampPercentage: 40},
},
DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{
"foo": {
RoutingConfig: &deploymentpb.RoutingConfig{
RampingDeploymentVersion: &deploymentpb.WorkerDeploymentVersion{DeploymentName: "foo", BuildId: v2.GetBuildId()},
RampingVersionPercentage: 50,
RampingVersionPercentageChangedTime: t2,
},
Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{
v2.GetBuildId(): {Deleted: true},
},
},
},
}},
{name: "new format: version exists but marked as deleted alongside non-deleted version", wantCurrent: v2,
data: &persistencespb.DeploymentData{
DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{
"foo": {
RoutingConfig: &deploymentpb.RoutingConfig{
CurrentDeploymentVersion: &deploymentpb.WorkerDeploymentVersion{DeploymentName: "foo", BuildId: v2.GetBuildId()},
CurrentVersionChangedTime: t2,
},
Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{
v1.GetBuildId(): {Deleted: true},
v2.GetBuildId(): {Deleted: false},
v3.GetBuildId(): {Deleted: true},
},
},
},
}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
16 changes: 8 additions & 8 deletions config/dynamicconfig/development-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@

### Worker Versioning Replay Test configs
### uncomment and set the right deploymentWorkflowVersion
# matching.deploymentWorkflowVersion:
# - value: 1 # put the integer value of the workflow version you want to target (enum DeploymentWorkflowVersion)
# matching.PollerHistoryTTL:
# - value: 1s
# matching.wv.VersionDrainageStatusVisibilityGracePeriod:
# - value: 5s
# matching.wv.VersionDrainageStatusRefreshInterval:
# - value: 5s
#matching.deploymentWorkflowVersion:
# - value: 2 # put the integer value of the workflow version you want to target (enum DeploymentWorkflowVersion)
#matching.PollerHistoryTTL:
# - value: 1s
#matching.wv.VersionDrainageStatusVisibilityGracePeriod:
# - value: 5s
#matching.wv.VersionDrainageStatusRefreshInterval:
# - value: 5s
### END of Worker Versioning Replay Test configs

limit.maxIDLength:
Expand Down
19 changes: 18 additions & 1 deletion proto/internal/temporal/server/api/deployment/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,21 @@ message DeploymentVersionData {
}

// Information that a TQ should know about a particular Deployment Version. This info is not part of
// RoutingConfig and hence not protected by the revision number.
// RoutingConfig and hence not protected by the routing config revision number.
// As of Workflow Version `VersionDataRevisionNumber`, version specific data has its own revision
// number, which makes async propagations safer and allows async registration.
message WorkerDeploymentVersionData {
// Incremented everytime version data changes. Updates with lower revision number than what is
// already in the TQ will be ignored to avoid stale writes.
int64 revision_number = 1;
// Last update time. Used for garbage collecting deleted versions from TQ user data.
google.protobuf.Timestamp update_time = 2;
// In order to protect against deletes being overwritten by delayed stale writes, we can't
// immediately delete the version data from task queues. instead, we mark them as deleted while
// keeping the revision number.
// Old enough deleted versions are GCed based on update_time.
bool deleted = 3;

temporal.api.enums.v1.WorkerDeploymentVersionStatus status = 6;
}

Expand Down Expand Up @@ -114,6 +127,10 @@ message VersionLocalState {

// Status of the Worker Deployment Version.
temporal.api.enums.v1.WorkerDeploymentVersionStatus status = 14;

// Incremented everytime version data synced to TQ changes. Updates with lower revision number
// than what is already in the TQ will be ignored to avoid stale writes during async operations.
int64 revision_number = 15;
}

// Data specific to a task queue, from the perspective of a worker deployment version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,20 +370,20 @@ message SyncDeploymentUserDataRequest {
// Ignored by the task queue if new revision number is not greater that what it has.
temporal.api.deployment.v1.RoutingConfig update_routing_config = 10;
// Optional map of build id to upsert version data.
// Ignored if `update_routing_config` is present and has an outdated revision number.
// (-- api-linter: core::0203::required=disabled
// aip.dev/not-precedent: Not following Google API format --)
map<string, temporal.server.api.deployment.v1.WorkerDeploymentVersionData> upsert_versions_data = 11;
// List of build ids to forget from task queue.
// Ignored if `update_routing_config` is present and has an outdated revision number.
// Deprecated. Use upsert_versions_data with deleted=true.
repeated string forget_versions = 12;
}

message SyncDeploymentUserDataResponse {
// New task queue user data version. Can be used to wait for propagation.
int64 version = 1;
// If the routing config changed after applying this operation. Compared base on revision number.
bool routing_config_changed = 2;
// Deprecated. using this is not totaly safe in case of retries.
bool routing_config_changed = 2 [deprecated = true];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has confused me a little....might be worth discussing on call.

}

message ApplyTaskQueueUserDataReplicationEventRequest {
Expand Down
38 changes: 38 additions & 0 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2039,8 +2039,28 @@ func (e *matchingEngineImpl) SyncDeploymentUserData(
tqWorkerDeploymentData.Versions = make(map[string]*deploymentspb.WorkerDeploymentVersionData)
}
for buildID, versionData := range req.GetUpsertVersionsData() {
existing := tqWorkerDeploymentData.Versions[buildID]
// Skip if existing version data has a higher revision number to avoid stale writes.
// Equal revision number is accepted for now because we may roll back the workflow version
// and stop incrementing the revision number.
if existing != nil && existing.GetRevisionNumber() > versionData.GetRevisionNumber() {
continue
}
tqWorkerDeploymentData.Versions[buildID] = versionData
changed = true
if versionData.GetDeleted() {
// Remove the version from the old deployment data format if present.
//nolint:staticcheck // SA1019 deprecated versions will clean up later
for idx, oldVersions := range deploymentData.GetVersions() {
if oldVersions.GetVersion().GetDeploymentName() == req.GetDeploymentName() &&
oldVersions.GetVersion().GetBuildId() == buildID {
//nolint:staticcheck // SA1019 deprecated versions will clean up later
deploymentData.Versions = append(deploymentData.Versions[:idx], deploymentData.Versions[idx+1:]...)
changed = true
break
}
}
}
}

if removed := removeDeploymentVersions(
Expand Down Expand Up @@ -2085,6 +2105,10 @@ func (e *matchingEngineImpl) SyncDeploymentUserData(
tqWorkerDeploymentData,
)
}

if cleanupOldDeletedVersions(tqWorkerDeploymentData) {
changed = true
Comment on lines +2109 to +2110
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could just have this function placed inside of removeDeploymentVersions and call it something like removeAndCleanUpDeploymentVersions. It shall then have the contained logic of deleting old versions, marking the version in the new format as deleted and also Garbage Collecting any versions if present.

}
}
}
if !changed {
Expand All @@ -2100,6 +2124,20 @@ func (e *matchingEngineImpl) SyncDeploymentUserData(
return &matchingservice.SyncDeploymentUserDataResponse{Version: version, RoutingConfigChanged: applyUpdatesToRoutingConfig}, nil
}

func cleanupOldDeletedVersions(deploymentData *persistencespb.WorkerDeploymentData) bool {
cleaned := false
for buildID, versionData := range deploymentData.Versions {
// TODO: improve this logic to remove even more old versions if we're reaching the limit of max versions per task queue.
// max versions per task queue is not implemented yet but we should have it because as of now a task queue can be polled from
// numerous deployment versions (accross different deployments) and that leads to a very large user data size.
Comment on lines +2130 to +2132
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about this a bit more - I think we should have a safeguard (dynamic config) for this prior to launching this as a GA. If folks use versioning with their CI/CD systems (as is done by our internal saas team) on the same task-queue, what you have placed in the comment can 100% come to life.

wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will follow up with a PR to clean up more TQs when reaching a limit

if versionData.GetDeleted() && versionData.GetUpdateTime().AsTime().Before(time.Now().Add(-time.Hour*24*30)) {
delete(deploymentData.Versions, buildID)
cleaned = true
}
}
return cleaned
}

func (e *matchingEngineImpl) ApplyTaskQueueUserDataReplicationEvent(
ctx context.Context,
req *matchingservice.ApplyTaskQueueUserDataReplicationEventRequest,
Expand Down
Loading
Loading