-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add revision number to Version Data #8722
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
f0e6f25
3310ae7
6803b6d
d38737a
7047f90
1852f3e
a772ffb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2039,8 +2039,28 @@ func (e *matchingEngineImpl) SyncDeploymentUserData( | |
| tqWorkerDeploymentData.Versions = make(map[string]*deploymentspb.WorkerDeploymentVersionData) | ||
| } | ||
| for buildID, versionData := range req.GetUpsertVersionsData() { | ||
| existing := tqWorkerDeploymentData.Versions[buildID] | ||
| // Skip if existing version data has a higher revision number to avoid stale writes. | ||
| // Equal revision number is accepted for now because we may roll back the workflow version | ||
| // and stop incrementing the revision number. | ||
| if existing != nil && existing.GetRevisionNumber() > versionData.GetRevisionNumber() { | ||
| continue | ||
| } | ||
| tqWorkerDeploymentData.Versions[buildID] = versionData | ||
| changed = true | ||
| if versionData.GetDeleted() { | ||
| // Remove the version from the old deployment data format if present. | ||
| //nolint:staticcheck // SA1019 deprecated versions will clean up later | ||
| for idx, oldVersions := range deploymentData.GetVersions() { | ||
| if oldVersions.GetVersion().GetDeploymentName() == req.GetDeploymentName() && | ||
| oldVersions.GetVersion().GetBuildId() == buildID { | ||
| //nolint:staticcheck // SA1019 deprecated versions will clean up later | ||
| deploymentData.Versions = append(deploymentData.Versions[:idx], deploymentData.Versions[idx+1:]...) | ||
| changed = true | ||
| break | ||
| } | ||
| } | ||
| } | ||
ShahabT marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| if removed := removeDeploymentVersions( | ||
|
|
@@ -2085,6 +2105,10 @@ func (e *matchingEngineImpl) SyncDeploymentUserData( | |
| tqWorkerDeploymentData, | ||
| ) | ||
| } | ||
|
|
||
| if cleanupOldDeletedVersions(tqWorkerDeploymentData) { | ||
| changed = true | ||
|
Comment on lines
+2109
to
+2110
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe we could just have this function placed inside of |
||
| } | ||
| } | ||
| } | ||
| if !changed { | ||
|
|
@@ -2100,6 +2124,20 @@ func (e *matchingEngineImpl) SyncDeploymentUserData( | |
| return &matchingservice.SyncDeploymentUserDataResponse{Version: version, RoutingConfigChanged: applyUpdatesToRoutingConfig}, nil | ||
| } | ||
|
|
||
| func cleanupOldDeletedVersions(deploymentData *persistencespb.WorkerDeploymentData) bool { | ||
| cleaned := false | ||
| for buildID, versionData := range deploymentData.Versions { | ||
| // TODO: improve this logic to remove even more old versions if we're reaching the limit of max versions per task queue. | ||
| // max versions per task queue is not implemented yet but we should have it because as of now a task queue can be polled from | ||
| // numerous deployment versions (accross different deployments) and that leads to a very large user data size. | ||
|
Comment on lines
+2130
to
+2132
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thinking about this a bit more - I think we should have a safeguard (dynamic config) for this prior to launching this as a GA. If folks use versioning with their CI/CD systems (as is done by our internal saas team) on the same task-queue, what you have placed in the comment can 100% come to life. wdyt?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will follow up with a PR to clean up more TQs when reaching a limit |
||
| if versionData.GetDeleted() && versionData.GetUpdateTime().AsTime().Before(time.Now().Add(-time.Hour*24*30)) { | ||
| delete(deploymentData.Versions, buildID) | ||
| cleaned = true | ||
| } | ||
| } | ||
| return cleaned | ||
| } | ||
|
|
||
| func (e *matchingEngineImpl) ApplyTaskQueueUserDataReplicationEvent( | ||
| ctx context.Context, | ||
| req *matchingservice.ApplyTaskQueueUserDataReplicationEventRequest, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has confused me a little....might be worth discussing on call.