diff --git a/app/lib/task/backend.dart b/app/lib/task/backend.dart index 0aaa2cbf6..59495071c 100644 --- a/app/lib/task/backend.dart +++ b/app/lib/task/backend.dart @@ -689,17 +689,9 @@ class TaskBackend { zone = versionState.zone!; instance = versionState.instance!; - - // Remove instanceName, zone, secretToken, and set attempts = 0 - state.versions![version] = PackageVersionStateInfo( - scheduled: versionState.scheduled, + state.versions![version] = versionState.complete( docs: hasDocIndexHtml, pana: summary != null, - finished: true, - attempts: 0, - instance: null, // version is no-longer running on this instance - secretToken: null, // TODO: Consider retaining this for idempotency - zone: null, ); // Determine if something else was running on the instance @@ -1002,13 +994,12 @@ class TaskBackend { await for (final state in _db.tasks.listAllForCurrentRuntime()) { final zone = taskWorkerCloudCompute.zones.first; // ignore: invalid_use_of_visible_for_testing_member - final updated = await updatePackageStateWithPendingVersions( + final payload = await updatePackageStateWithPendingVersions( _db, state.package, zone, taskWorkerCloudCompute.generateInstanceName(), ); - final payload = updated?.$1; if (payload == null) continue; await processPayload(payload); } @@ -1418,7 +1409,6 @@ final class _TaskDataAccess { Future restorePreviousVersionsState( String packageName, String instanceName, - Map previousVersionsMap, ) async { await withRetryTransaction(_db, (tx) async { final s = await tx.tasks.lookupOrNull(packageName); @@ -1429,7 +1419,7 @@ final class _TaskDataAccess { s.versions!.addEntries( s.versions!.entries .where((e) => e.value.instance == instanceName) - .map((e) => MapEntry(e.key, previousVersionsMap[e.key]!)), + .map((e) => MapEntry(e.key, e.value.resetAfterFailedAttempt())), ); s.pendingAt = derivePendingAt( versions: s.versions!, diff --git a/app/lib/task/models.dart b/app/lib/task/models.dart index 1ccd3bba4..fc689e7a6 100644 --- a/app/lib/task/models.dart +++ b/app/lib/task/models.dart @@ -3,6 +3,7 @@ // BSD-style license that can be found in the LICENSE file. import 'dart:convert' show json; +import 'dart:math'; import 'package:clock/clock.dart'; import 'package:json_annotation/json_annotation.dart'; @@ -249,7 +250,7 @@ List derivePendingVersions({ } /// State of a given `version` within a [PackageState]. -@JsonSerializable() +@JsonSerializable(includeIfNull: false) class PackageVersionStateInfo { PackageVersionStatus get status { if (attempts == 0 && scheduled == initialTimestamp) { @@ -319,6 +320,9 @@ class PackageVersionStateInfo { /// comparison. Please use [isAuthorized] for validating a request. final String? secretToken; + /// The previous scheduled timestamp (if we are currently in an active schedule). + final DateTime? previousScheduled; + /// Return true, if [token] matches [secretToken] and it has not expired. /// /// This does a fixed-time comparison to mitigate timing attacks. @@ -347,6 +351,7 @@ class PackageVersionStateInfo { this.docs = false, this.pana = false, this.finished = false, + this.previousScheduled, }); factory PackageVersionStateInfo.fromJson(Map m) => @@ -364,6 +369,53 @@ class PackageVersionStateInfo { 'secretToken: $secretToken', ].join(', ') + ')'; + + // Remove instanceName, zone, secretToken, and set attempts = 0 + PackageVersionStateInfo complete({required bool pana, required bool docs}) { + return PackageVersionStateInfo( + scheduled: scheduled, + attempts: 0, + docs: docs, + pana: pana, + finished: true, + zone: null, + instance: null, // version is no-longer running on this instance + secretToken: null, // TODO: Consider retaining this for idempotency + previousScheduled: null, + ); + } + + /// Derives a new version state with scheduling information. + PackageVersionStateInfo scheduleNew({ + required String zone, + required String instanceName, + }) { + return PackageVersionStateInfo( + scheduled: clock.now(), + attempts: attempts + 1, + zone: zone, + instance: instanceName, + secretToken: createUuid(), + finished: finished, + docs: docs, + pana: pana, + previousScheduled: scheduled, + ); + } + + /// Reverts the status of the last scheduling attempt, which has presumably failed. + PackageVersionStateInfo resetAfterFailedAttempt() { + return PackageVersionStateInfo( + scheduled: previousScheduled ?? initialTimestamp, + attempts: max(0, attempts - 1), + zone: null, + instance: null, + secretToken: null, + finished: finished, + docs: docs, + pana: pana, + ); + } } /// A [db.Property] encoding a Map from version to [PackageVersionStateInfo] as JSON. diff --git a/app/lib/task/models.g.dart b/app/lib/task/models.g.dart index d1b3d8816..9854fbc07 100644 --- a/app/lib/task/models.g.dart +++ b/app/lib/task/models.g.dart @@ -17,6 +17,9 @@ PackageVersionStateInfo _$PackageVersionStateInfoFromJson( docs: json['docs'] as bool? ?? false, pana: json['pana'] as bool? ?? false, finished: json['finished'] as bool? ?? false, + previousScheduled: json['previousScheduled'] == null + ? null + : DateTime.parse(json['previousScheduled'] as String), ); Map _$PackageVersionStateInfoToJson( @@ -27,9 +30,10 @@ Map _$PackageVersionStateInfoToJson( 'finished': instance.finished, 'scheduled': instance.scheduled.toIso8601String(), 'attempts': instance.attempts, - 'zone': instance.zone, - 'instance': instance.instance, - 'secretToken': instance.secretToken, + 'zone': ?instance.zone, + 'instance': ?instance.instance, + 'secretToken': ?instance.secretToken, + 'previousScheduled': ?instance.previousScheduled?.toIso8601String(), }; PackageStateInfo _$PackageStateInfoFromJson(Map json) => diff --git a/app/lib/task/scheduler.dart b/app/lib/task/scheduler.dart index 4ec22c2e4..6fd5f4f83 100644 --- a/app/lib/task/scheduler.dart +++ b/app/lib/task/scheduler.dart @@ -10,7 +10,6 @@ import 'package:meta/meta.dart'; import 'package:pub_dev/package/backend.dart'; import 'package:pub_dev/shared/configuration.dart'; import 'package:pub_dev/shared/datastore.dart'; -import 'package:pub_dev/shared/utils.dart'; import 'package:pub_dev/task/backend.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; import 'package:pub_dev/task/models.dart'; @@ -101,13 +100,12 @@ Future<(CreateInstancesState, Duration)> runOneCreateInstancesCycle( final instanceName = compute.generateInstanceName(); final zone = pickZone(); - final updated = await updatePackageStateWithPendingVersions( + final payload = await updatePackageStateWithPendingVersions( db, selected.package, zone, instanceName, ); - final payload = updated?.$1; if (payload == null) { return; } @@ -174,15 +172,13 @@ Future<(CreateInstancesState, Duration)> runOneCreateInstancesCycle( banZone(zone, minutes: 15); } if (rollbackPackageState) { - final oldVersionsMap = updated?.$2 ?? const {}; - // Restore the state of the PackageState for versions that were + // Restire the state of the PackageState for versions that were // suppose to run on the instance we just failed to create. // If this doesn't work, we'll eventually retry. Hence, correctness // does not hinge on this transaction being successful. await db.tasks.restorePreviousVersionsState( selected.package, instanceName, - oldVersionsMap, ); } } @@ -221,11 +217,8 @@ Future<(CreateInstancesState, Duration)> runOneCreateInstancesCycle( /// Updates the package state with versions that are already pending or /// will be pending soon. -/// -/// Returns the payload and the old status of the state info version map @visibleForTesting -Future<(Payload, Map)?> -updatePackageStateWithPendingVersions( +Future updatePackageStateWithPendingVersions( DatastoreDB db, String package, String zone, @@ -237,7 +230,6 @@ updatePackageStateWithPendingVersions( // presumably the package was deleted. return null; } - final oldVersionsMap = {...?s.versions}; final now = clock.now(); final pendingVersions = derivePendingVersions( @@ -253,14 +245,7 @@ updatePackageStateWithPendingVersions( // Update PackageState s.versions!.addAll({ for (final v in pendingVersions.map((v) => v.toString())) - v: PackageVersionStateInfo( - scheduled: now, - attempts: s.versions![v]!.attempts + 1, - zone: zone, - instance: instanceName, - secretToken: createUuid(), - finished: s.versions![v]!.finished, - ), + v: s.versions![v]!.scheduleNew(zone: zone, instanceName: instanceName), }); s.pendingAt = derivePendingAt( versions: s.versions!, @@ -279,6 +264,6 @@ updatePackageStateWithPendingVersions( ), ), ); - return (payload, oldVersionsMap); + return payload; }); }