diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index ab0353202..4521f3ca7 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -48,6 +48,10 @@ Those parameters are grouped under the `metadata` top-level key. Labels that are set here but not listed as `inherited_labels` in the operator parameters are ignored. +* **annotations** + A map of annotations to add to the `postgresql` resource. The operator reacts to certain annotations, for instance, to trigger specific actions. + * `postgres-operator.zalando.org/action: restore-in-place`: When this annotation is present with this value, the operator will trigger an automated in-place restore of the cluster. This process requires a valid `clone` section to be defined in the manifest with a target `timestamp`. See the [user guide](../user.md#automated-restore-in-place-point-in-time-recovery) for more details. + ## Top-level parameters These parameters are grouped directly under the `spec` key in the manifest. diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index 7e7cbeaf0..8a12cf08a 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -9,6 +9,7 @@ configuration. Variable names are underscore-separated words. ### ConfigMaps-based + The configuration is supplied in a key-value configmap, defined by the `CONFIG_MAP_NAME` environment variable. Non-scalar values, i.e. lists or maps, are encoded in the value strings using @@ -25,6 +26,7 @@ operator CRD, all the CRD defaults are provided in the [operator's default configuration manifest](https://github.com/zalando/postgres-operator/blob/master/manifests/postgresql-operator-default-configuration.yaml) ### CRD-based configuration + The configuration is stored in a custom YAML manifest. The manifest is an instance of the custom resource definition (CRD) called `OperatorConfiguration`. The operator registers this CRD during the @@ -171,6 +173,9 @@ Those are top-level keys, containing both leaf keys and groups. * **repair_period** period between consecutive repair requests. The default is `5m`. +* **pitr_backup_retention** + retention time for PITR (Point-In-Time-Recovery) state ConfigMaps. The operator will clean up ConfigMaps older than the configured retention. The value is a [duration string](https://pkg.go.dev/time#ParseDuration), e.g. "168h" (which is 7 days), "24h". The default is `168h`. + * **set_memory_request_to_limit** Set `memory_request` to `memory_limit` for all Postgres clusters (the default value is also increased but configured `max_memory_request` can not be @@ -918,6 +923,7 @@ key. ```yaml teams_api_role_configuration: "log_statement:all,search_path:'data,public'" ``` + The default is `"log_statement:all"` * **enable_team_superuser** diff --git a/docs/user.md b/docs/user.md index c1a7c7d45..86f35d8f0 100644 --- a/docs/user.md +++ b/docs/user.md @@ -891,6 +891,45 @@ original UID, making it possible retry restoring. However, it is probably better to create a temporary clone for experimenting or finding out to which point you should restore. +## Automated Restore in place (Point-in-Time Recovery) + +The operator supports automated in-place restores, allowing you to restore a database to a specific point in time without changing connection strings on the application side. This feature orchestrates the deletion of the current cluster and the creation of a new one from a backup. + +:warning: This is a destructive operation. The existing cluster's StatefulSet and pods will be deleted as part of the process. Ensure you have a reliable backup strategy and have tested the restore process in a non-production environment. + +To trigger an in-place restore, you need to add a special annotation and a `clone` section to your `postgresql` manifest: + +* **Annotate the manifest**: Add the `postgres-operator.zalando.org/action: restore-in-place` annotation to the `metadata` section. +* **Specify the recovery target**: Add a `clone` section to the `spec`, providing the `cluster` name and the `timestamp` for the point-in-time recovery. The `cluster` name **must** be the same as the `metadata.name` of the cluster you are restoring. The `timestamp` must be in RFC 3339 format and point to a time in the past for which you have WAL archives. + +Here is an example manifest snippet: + +```yaml +apiVersion: "acid.zalan.do/v1" +kind: postgresql +metadata: + name: acid-minimal-cluster + annotations: + postgres-operator.zalando.org/action: restore-in-place +spec: + # ... other cluster parameters + clone: + cluster: "acid-minimal-cluster" # Must match metadata.name + uid: "" + timestamp: "2022-04-01T10:11:12+00:00" + # ... other cluster parameters +``` + +When you apply this manifest, the operator will: +* See the `restore-in-place` annotation and begin the restore workflow. +* Store the restore request and the new cluster definition in a temporary `ConfigMap`. +* Delete the existing `postgresql` custom resource, which triggers the deletion of the associated StatefulSet and pods. +* Wait for the old cluster to be fully terminated. +* Create a new `postgresql` resource with a new UID but the same name. +* The new cluster will bootstrap from the latest base backup prior to the given `timestamp` and replay WAL files to recover to the specified point in time. + +The process is asynchronous. You can monitor the operator logs and the state of the `postgresql` resource to follow the progress. Once the new cluster is up and running, your applications can reconnect. + ## Setting up a standby cluster Standby cluster is a [Patroni feature](https://github.com/zalando/patroni/blob/master/docs/replica_bootstrap.rst#standby-cluster) @@ -1291,3 +1330,4 @@ As of now, the operator does not sync the pooler deployment automatically which means that changes in the pod template are not caught. You need to toggle `enableConnectionPooler` to set environment variables, volumes, secret mounts and securityContext required for TLS support in the pooler pod. + diff --git a/manifests/operatorconfiguration.crd.yaml b/manifests/operatorconfiguration.crd.yaml index 6556b333c..0fb224743 100644 --- a/manifests/operatorconfiguration.crd.yaml +++ b/manifests/operatorconfiguration.crd.yaml @@ -113,6 +113,9 @@ spec: repair_period: type: string default: "5m" + pitr_backup_retention: + type: string + default: "168h" set_memory_request_to_limit: type: boolean default: false diff --git a/manifests/postgresql-operator-default-configuration.yaml b/manifests/postgresql-operator-default-configuration.yaml index 389d9325a..8d03b0df9 100644 --- a/manifests/postgresql-operator-default-configuration.yaml +++ b/manifests/postgresql-operator-default-configuration.yaml @@ -19,6 +19,7 @@ configuration: min_instances: -1 resync_period: 30m repair_period: 5m + pitr_backup_retention: 168h # set_memory_request_to_limit: false # sidecars: # - image: image:123 diff --git a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go index cd11b9173..944bd862d 100644 --- a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go +++ b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go @@ -266,6 +266,7 @@ type OperatorConfigurationData struct { Workers uint32 `json:"workers,omitempty"` ResyncPeriod Duration `json:"resync_period,omitempty"` RepairPeriod Duration `json:"repair_period,omitempty"` + PitrBackupRetention Duration `json:"pitr_backup_retention,omitempty"` SetMemoryRequestToLimit bool `json:"set_memory_request_to_limit,omitempty"` ShmVolume *bool `json:"enable_shm_volume,omitempty"` SidecarImages map[string]string `json:"sidecar_docker_images,omitempty"` // deprecated in favour of SidecarContainers diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 9cd750e84..f1a9ab2f4 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -3,6 +3,7 @@ package cluster // Postgres CustomResourceDefinition object i.e. Spilo import ( + "context" "database/sql" "encoding/json" "fmt" @@ -32,6 +33,7 @@ import ( v1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" @@ -431,6 +433,33 @@ func (c *Cluster) Create() (err error) { c.logger.Errorf("could not list resources: %v", err) } + if err := c.updatePITRResources(PitrStateLabelValueFinished); err != nil { + return fmt.Errorf("could not update pitr resources: %v", err) + } + return nil +} + +// update the label to finished for PITR for the given config map +func (c *Cluster) updatePITRResources(state string) error { + cmName := fmt.Sprintf(PitrConfigMapNameTemplate, c.Name) + cmNamespace := c.Namespace + patchPayload := map[string]any{ + "metadata": map[string]any{ + "labels": map[string]string{ + PitrStateLabelKey: state, + }, + }, + } + + data, _ := json.Marshal(patchPayload) + if _, err := c.KubeClient.ConfigMaps(cmNamespace).Patch(context.TODO(), cmName, types.MergePatchType, data, metav1.PatchOptions{}, ""); err != nil { + // If ConfigMap doesn't exist, this is a normal cluster creation (not a restore-in-place) + if k8serrors.IsNotFound(err) { + return nil + } + c.logger.Errorf("restore-in-place: error updating config map label to state: %v", err) + return err + } return nil } @@ -1200,6 +1229,33 @@ func syncResources(a, b *v1.ResourceRequirements) bool { return false } +const ( + PitrStateLabelKey = "postgres-operator.zalando.org/pitr-state" + PitrStateLabelValuePending = "pending" + PitrStateLabelValueInProgress = "in-progress" + PitrStateLabelValueFinished = "finished" + PitrConfigMapNameTemplate = "pitr-state-%s" + PitrSpecDataKey = "spec" +) + +func (c *Cluster) isRestoreInPlace() bool { + cmName := fmt.Sprintf(PitrConfigMapNameTemplate, c.Name) + cm, err := c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), cmName, metav1.GetOptions{}) + if err != nil { + c.logger.Debugf("restore-in-place: Error while fetching config map: %s before deletion", cmName) + return false + } + + if cm != nil { + if val, ok := cm.Labels[PitrStateLabelKey]; ok { + if val == PitrStateLabelValuePending { + return true + } + } + } + return false +} + // Delete deletes the cluster and cleans up all objects associated with it (including statefulsets). // The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes // DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint @@ -1211,6 +1267,8 @@ func (c *Cluster) Delete() error { defer c.mu.Unlock() c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources") + isRestoreInPlace := c.isRestoreInPlace() + c.logger.Debugf("restore-in-place: Deleting the cluster, verifying whether resotore-in-place is true or not: %+v\n", isRestoreInPlace) if err := c.deleteStreams(); err != nil { anyErrors = true c.logger.Warningf("could not delete event streams: %v", err) @@ -1231,7 +1289,7 @@ func (c *Cluster) Delete() error { c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete statefulset: %v", err) } - if c.OpConfig.EnableSecretsDeletion != nil && *c.OpConfig.EnableSecretsDeletion { + if c.OpConfig.EnableSecretsDeletion != nil && *c.OpConfig.EnableSecretsDeletion && !isRestoreInPlace { if err := c.deleteSecrets(); err != nil { anyErrors = true c.logger.Warningf("could not delete secrets: %v", err) @@ -1256,10 +1314,12 @@ func (c *Cluster) Delete() error { } } - if err := c.deleteService(role); err != nil { - anyErrors = true - c.logger.Warningf("could not delete %s service: %v", role, err) - c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s service: %v", role, err) + if !isRestoreInPlace { + if err := c.deleteService(role); err != nil { + anyErrors = true + c.logger.Warningf("could not delete %s service: %v", role, err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s service: %v", role, err) + } } } diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 25f61db98..21301834e 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -24,7 +24,9 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" ) @@ -94,6 +96,7 @@ func TestCreate(t *testing.T) { clusterNamespace := "test" client := k8sutil.KubernetesClient{ + ConfigMapsGetter: clientSet.CoreV1(), DeploymentsGetter: clientSet.AppsV1(), CronJobsGetter: clientSet.BatchV1(), EndpointsGetter: clientSet.CoreV1(), @@ -2202,3 +2205,120 @@ func TestGetSwitchoverSchedule(t *testing.T) { }) } } + +func TestUpdatePITRResources(t *testing.T) { + clusterName := "test-cluster" + clusterNamespace := "default" + + tests := []struct { + name string + state string + cmExists bool + patchFails bool + expectedErr bool + expectedLabel string + }{ + { + "successful patch - update label to finished", + PitrStateLabelValueFinished, + true, + false, + false, + PitrStateLabelValueFinished, + }, + { + "successful patch - update label to in-progress", + PitrStateLabelValueInProgress, + true, + false, + false, + PitrStateLabelValueInProgress, + }, + { + "config map does not exist - no error", + PitrStateLabelValueFinished, + false, + false, + false, + "", + }, + { + "patch fails with non-NotFound error", + PitrStateLabelValueFinished, + true, + true, + true, + "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + if tt.cmExists { + cmName := fmt.Sprintf(PitrConfigMapNameTemplate, clusterName) + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: clusterNamespace, + Labels: map[string]string{ + PitrStateLabelKey: PitrStateLabelValuePending, + }, + }, + } + _, err := clientSet.CoreV1().ConfigMaps(clusterNamespace).Create(context.TODO(), cm, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("could not create configmap: %v", err) + } + } + + if tt.patchFails { + clientSet.PrependReactor("patch", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic patch error") + }) + } + + client := k8sutil.KubernetesClient{ + ConfigMapsGetter: clientSet.CoreV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + } + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: clusterNamespace, + }, + } + + cluster := New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, client, pg, logger, eventRecorder) + + err := cluster.updatePITRResources(tt.state) + + if err != nil { + if !tt.expectedErr { + t.Fatalf("unexpected error: %v", err) + } + } else if tt.expectedErr { + t.Fatalf("expected error, but got none") + } + + if tt.cmExists && !tt.patchFails && tt.expectedLabel != "" { + cmName := fmt.Sprintf(PitrConfigMapNameTemplate, clusterName) + updatedCm, err := clientSet.CoreV1().ConfigMaps(clusterNamespace).Get(context.TODO(), cmName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("could not get configmap: %v", err) + } + if updatedCm.Labels[PitrStateLabelKey] != tt.expectedLabel { + t.Errorf("expected label %q but got %q", tt.expectedLabel, updatedCm.Labels[PitrStateLabelKey]) + } + } + }) + } +} diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index ed3eb3d75..1925733de 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -10,6 +10,7 @@ import ( batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -301,6 +302,21 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { c.setProcessName("creating %v service", role) serviceSpec := c.generateService(role, &c.Spec) + + // check if the service already exists in case of pitr + svc, err := c.KubeClient.Services(serviceSpec.Namespace).Get(context.TODO(), serviceSpec.Name, metav1.GetOptions{}) + + // service already exists + if err == nil { + c.Services[role] = svc + return svc, nil + } + + if !errors.IsNotFound(err) { + return nil, err + } + + // at last create the service service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) if err != nil { return nil, err diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 824a030f4..ebcbb8b03 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -13,6 +13,7 @@ import ( "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" @@ -25,6 +26,11 @@ import ( "github.com/zalando/postgres-operator/pkg/util/ringlog" ) +const ( + restoreAnnotationKey = "postgres-operator.zalando.org/action" + restoreAnnotationValue = "restore-in-place" +) + func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() ticker := time.NewTicker(c.opConfig.ResyncPeriod) @@ -35,6 +41,12 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { if err := c.clusterListAndSync(); err != nil { c.logger.Errorf("could not list clusters: %v", err) } + if err := c.processPendingRestores(); err != nil { + c.logger.Errorf("could not process pending restores: %v", err) + } + if err := c.cleanupRestores(); err != nil { + c.logger.Errorf("could not cleanup restores: %v", err) + } case <-stopCh: return } @@ -539,6 +551,13 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) { pgOld := c.postgresqlCheck(prev) pgNew := c.postgresqlCheck(cur) if pgOld != nil && pgNew != nil { + + if pgNew.Annotations[restoreAnnotationKey] == restoreAnnotationValue { + c.logger.Debugf("restore-in-place: postgresqlUpdate called for cluster %q", pgNew.Name) + c.handleRestoreInPlace(pgOld, pgNew) + return + } + // Avoid the inifinite recursion for status updates if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) { if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) { @@ -568,6 +587,236 @@ func (c *Controller) postgresqlCheck(obj interface{}) *acidv1.Postgresql { return pg } +// validateRestoreInPlace checks if the restore parameters are valid +func (c *Controller) validateRestoreInPlace(pgOld, pgNew *acidv1.Postgresql) error { + c.logger.Debugf("restore-in-place: validating restore parameters for cluster %q", pgNew.Name) + + if pgNew.Spec.Clone == nil { + return fmt.Errorf("'clone' section is missing in the manifest") + } + + // Use ClusterName from CloneDescription + if pgNew.Spec.Clone.ClusterName != pgOld.Name { + return fmt.Errorf("clone cluster name %q does not match the current cluster name %q", pgNew.Spec.Clone.ClusterName, pgOld.Name) + } + + // Use EndTimestamp from CloneDescription + cloneTimestamp, err := time.Parse(time.RFC3339, pgNew.Spec.Clone.EndTimestamp) + if err != nil { + return fmt.Errorf("could not parse clone timestamp %q: %v", pgNew.Spec.Clone.EndTimestamp, err) + } + + if cloneTimestamp.After(time.Now()) { + return fmt.Errorf("clone timestamp %q is in the future", pgNew.Spec.Clone.EndTimestamp) + } + + c.logger.Debugf("restore-in-place: validation successful") + return nil +} + +// handleRestoreInPlace starts an asynchronous point-in-time-restore. +// It creates a ConfigMap to store the state and then deletes the old Postgresql CR. +func (c *Controller) handleRestoreInPlace(pgOld, pgNew *acidv1.Postgresql) { + c.logger.Infof("restore-in-place: starting asynchronous restore-in-place for cluster %q", pgNew.Name) + + if err := c.validateRestoreInPlace(pgOld, pgNew); err != nil { + c.logger.Errorf("restore-in-place: validation failed for cluster %q: %v", pgNew.Name, err) + return + } + + // Prepare new spec for the restored cluster + c.logger.Debugf("restore-in-place: preparing new postgresql spec for cluster %q", pgNew.Name) + newPgSpec := pgNew.DeepCopy() + delete(newPgSpec.Annotations, restoreAnnotationKey) + newPgSpec.ResourceVersion = "" + newPgSpec.UID = "" + + specData, err := json.Marshal(newPgSpec) + if err != nil { + c.logger.Errorf("restore-in-place: could not marshal new postgresql spec for cluster %q: %v", newPgSpec.Name, err) + return + } + + // Create or update ConfigMap to store restore state + cmName := fmt.Sprintf(cluster.PitrConfigMapNameTemplate, newPgSpec.Name) + c.logger.Debugf("restore-in-place: creating or updating state ConfigMap %q for cluster %q", cmName, newPgSpec.Name) + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: newPgSpec.Namespace, + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: string(specData), + }, + } + + // Check if ConfigMap already exists + _, err = c.KubeClient.ConfigMaps(cm.Namespace).Get(context.TODO(), cm.Name, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + _, err = c.KubeClient.ConfigMaps(cm.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}) + } + } else { + // If for some reason CM exists, update it + _, err = c.KubeClient.ConfigMaps(cm.Namespace).Update(context.TODO(), cm, metav1.UpdateOptions{}) + } + + if err != nil { + c.logger.Errorf("restore-in-place: could not create or update state ConfigMap %q for cluster %q: %v", cmName, newPgSpec.Name, err) + return + } + c.logger.Infof("restore-in-place: state ConfigMap %q created for cluster %q", cmName, newPgSpec.Name) + + // Delete old postgresql CR to trigger cleanup and UID change + c.logger.Debugf("restore-in-place: attempting deletion of postgresql CR %q", pgOld.Name) + err = c.KubeClient.Postgresqls(pgOld.Namespace).Delete(context.TODO(), pgOld.Name, metav1.DeleteOptions{}) + if err != nil && !errors.IsNotFound(err) { + c.logger.Errorf("restore-in-place: could not delete postgresql CR %q: %v", pgOld.Name, err) + return + } + c.logger.Infof("restore-in-place: initiated deletion of postgresql CR %q", pgOld.Name) +} + +func (c *Controller) processPendingRestores() error { + c.logger.Debug("restore-in-place: checking for pending restores") + namespace := c.opConfig.WatchedNamespace + if namespace == "" { + namespace = v1.NamespaceAll + } + + if err := c.processPendingCm(namespace); err != nil { + return err + } + + if err := c.processInProgressCm(namespace); err != nil { + return err + } + + return nil +} + +func (c *Controller) processPendingCm(namespace string) error { + pendingOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", cluster.PitrStateLabelKey, cluster.PitrStateLabelValuePending)} + pendingCmList, err := c.KubeClient.ConfigMaps(namespace).List(context.TODO(), pendingOpts) + if err != nil { + return fmt.Errorf("restore-in-place: could not list pending restore ConfigMaps: %v", err) + } + if len(pendingCmList.Items) > 0 { + c.logger.Debugf("restore-in-place: found %d pending restore(s) to process", len(pendingCmList.Items)) + } + + for _, cm := range pendingCmList.Items { + if err := c.processSinglePendingCm(cm); err != nil { + c.logger.Errorf("restore-in-place: could not process pending restore for config map %s: %v", cm.Name, err) + } + } + return nil +} + +func (c *Controller) processSinglePendingCm(cm v1.ConfigMap) error { + c.logger.Debugf("restore-in-place: processing pending ConfigMap %q", cm.Name) + clusterName := strings.TrimPrefix(cm.Name, "pitr-state-") + + _, err := c.KubeClient.Postgresqls(cm.Namespace).Get(context.TODO(), clusterName, metav1.GetOptions{}) + if err == nil { + c.logger.Infof("restore-in-place: pending restore for cluster %q is waiting for old Postgresql CR to be deleted", clusterName) + return nil + } + if !errors.IsNotFound(err) { + return fmt.Errorf("could not check for existence of Postgresql CR %q: %v", clusterName, err) + } + + c.logger.Infof("restore-in-place: old Postgresql CR %q is deleted, moving restore to 'in-progress'", clusterName) + cm.Labels[cluster.PitrStateLabelKey] = cluster.PitrStateLabelValueInProgress + if _, err := c.KubeClient.ConfigMaps(cm.Namespace).Update(context.TODO(), &cm, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("could not update ConfigMap %q to 'in-progress': %v", cm.Name, err) + } + return nil +} + +func (c *Controller) processInProgressCm(namespace string) error { + inProgressOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", cluster.PitrStateLabelKey, cluster.PitrStateLabelValueInProgress)} + inProgressCmList, err := c.KubeClient.ConfigMaps(namespace).List(context.TODO(), inProgressOpts) + if err != nil { + return fmt.Errorf("restore-in-place: could not list in-progress restore ConfigMaps: %v", err) + } + if len(inProgressCmList.Items) > 0 { + c.logger.Infof("restore-in-place: found %d in-progress restore(s) to process", len(inProgressCmList.Items)) + } + + for _, cm := range inProgressCmList.Items { + if err := c.processSingleInProgressCm(cm); err != nil { + c.logger.Errorf("restore-in-place: could not process in-progress restore for config map %s: %v", cm.Name, err) + } + } + return nil +} + +func (c *Controller) processSingleInProgressCm(cm v1.ConfigMap) error { + c.logger.Infof("restore-in-place: processing in-progress restore for ConfigMap %q", cm.Name) + + c.logger.Debugf("restore-in-place: unmarshalling spec from ConfigMap %q", cm.Name) + var newPgSpec acidv1.Postgresql + if err := json.Unmarshal([]byte(cm.Data[cluster.PitrSpecDataKey]), &newPgSpec); err != nil { + return fmt.Errorf("could not unmarshal postgresql spec from ConfigMap %q: %v", cm.Name, err) + } + + c.logger.Debugf("restore-in-place: creating new Postgresql CR %q from ConfigMap spec", newPgSpec.Name) + _, err := c.KubeClient.Postgresqls(newPgSpec.Namespace).Create(context.TODO(), &newPgSpec, metav1.CreateOptions{}) + if err != nil { + if errors.IsAlreadyExists(err) { + c.logger.Infof("restore-in-place: Postgresql CR %q already exists, cleaning up restore ConfigMap", newPgSpec.Name) + return nil + } else { + return fmt.Errorf("could not re-create Postgresql CR %q for restore: %v", newPgSpec.Name, err) + } + } + // If err is nil (creation successful) + c.logger.Infof("restore-in-place: successfully re-created Postgresql CR %q to complete restore", newPgSpec.Name) + return nil +} + +func (c *Controller) cleanupRestores() error { + c.logger.Debug("cleaning up old restore config maps") + namespace := c.opConfig.WatchedNamespace + if namespace == "" { + namespace = v1.NamespaceAll + } + + cmList, err := c.KubeClient.ConfigMaps(namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("could not list restore ConfigMaps: %v", err) + } + + retention := c.opConfig.PitrBackupRetention + if retention <= 0 { + c.logger.Debugf("Pitr backup retention is not set, skipping cleanup") + return nil + } + c.logger.Debugf("Pitr backup retention is %s", retention.String()) + + for _, cm := range cmList.Items { + if !strings.HasPrefix(cm.Name, "pitr-state-") { + continue + } + + age := time.Since(cm.CreationTimestamp.Time) + if age > retention { + c.logger.Infof("deleting old restore config map %q, age: %s", cm.Name, age.String()) + err := c.KubeClient.ConfigMaps(cm.Namespace).Delete(context.TODO(), cm.Name, metav1.DeleteOptions{}) + if err != nil { + c.logger.Errorf("could not delete config map %q: %v", cm.Name, err) + // continue with next cm + } + } + } + + return nil +} + /* Ensures the pod service account and role bindings exists in a namespace before a PG cluster is created there so that a user does not have to deploy diff --git a/pkg/controller/postgresql_test.go b/pkg/controller/postgresql_test.go index 71d23a264..b60949559 100644 --- a/pkg/controller/postgresql_test.go +++ b/pkg/controller/postgresql_test.go @@ -1,14 +1,23 @@ package controller import ( + "context" "fmt" "reflect" + "strings" "testing" "time" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/cluster" + fakeacidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" "github.com/zalando/postgres-operator/pkg/spec" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" ) var ( @@ -177,3 +186,1272 @@ func TestMeetsClusterDeleteAnnotations(t *testing.T) { } } } + +func TestCleanupRestores(t *testing.T) { + namespace := "default" + tests := []struct { + name string + configMaps []*v1.ConfigMap + retention time.Duration + remainingConfigMaps int + err error + }{ + { + "no config maps to delete", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: namespace, + CreationTimestamp: metav1.Now(), + }, + }, + }, + 24 * time.Hour, + 1, + nil, + }, + { + "one config map to delete", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: namespace, + CreationTimestamp: metav1.NewTime(time.Now().Add(-48 * time.Hour)), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-2", + Namespace: namespace, + CreationTimestamp: metav1.Now(), + }, + }, + }, + 24 * time.Hour, + 1, + nil, + }, + { + "do not delete non-pitr config maps", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-1", + Namespace: namespace, + CreationTimestamp: metav1.NewTime(time.Now().Add(-48 * time.Hour)), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-2", + Namespace: namespace, + CreationTimestamp: metav1.Now(), + }, + }, + }, + 24 * time.Hour, + 2, + nil, + }, + { + "zero retention, do nothing", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: namespace, + CreationTimestamp: metav1.NewTime(time.Now().Add(-48 * time.Hour)), + }, + }, + }, + 0, + 1, + nil, + }, + { + "list config maps fails", + []*v1.ConfigMap{}, + 24 * time.Hour, + 0, + fmt.Errorf("synthetic list error"), + }, + { + "delete config map fails", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-to-delete", + Namespace: namespace, + CreationTimestamp: metav1.NewTime(time.Now().Add(-48 * time.Hour)), + }, + }, + }, + 24 * time.Hour, + 1, + nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newPostgresqlTestController() + c.opConfig.PitrBackupRetention = tt.retention + c.opConfig.WatchedNamespace = namespace + + client := fake.NewSimpleClientset() + + if tt.name == "list config maps fails" { + client.PrependReactor("list", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic list error") + }) + } + if tt.name == "delete config map fails" { + client.PrependReactor("delete", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic delete error") + }) + } + + c.KubeClient = k8sutil.KubernetesClient{ + ConfigMapsGetter: client.CoreV1(), + } + + for _, cm := range tt.configMaps { + _, err := c.KubeClient.ConfigMaps(namespace).Create(context.TODO(), cm, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Could not create config map: %v", err) + } + } + + err := c.cleanupRestores() + + if err != nil { + if tt.err == nil { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), tt.err.Error()) { + t.Fatalf("error mismatch: got %q, expected to contain %q", err, tt.err) + } + } else if tt.err != nil { + t.Fatalf("expected error %q, but got none", tt.err) + } + + if tt.name != "list config maps fails" { + cms, err := c.KubeClient.ConfigMaps(namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Fatalf("Could not list config maps: %v", err) + } + if len(cms.Items) != tt.remainingConfigMaps { + t.Errorf("expected %d config maps, got %d", tt.remainingConfigMaps, len(cms.Items)) + } + } + }) + } +} + +func TestProcessSingleInProgressCm(t *testing.T) { + tests := []struct { + name string + cm v1.ConfigMap + err string + expectedPgName string + expectedPgNamespace string + }{ + { + "json marshal error", + v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: "test", + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: "invalid json", + }, + }, + "could not unmarshal postgresql spec from ConfigMap \"pitr-state-test-1\"", + "", + "", + }, + { + "successful create", + v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: "test", + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: "{\"apiVersion\":\"acid.zalan.do/v1\",\"kind\":\"postgresql\",\"metadata\":{\"name\":\"acid-minimal-cluster\",\"namespace\":\"po\"},\"spec\":{\"teamId\":\"acid\",\"volume\":{\"size\":\"1Gi\"},\"numberOfInstances\":1,\"users\":{\"zalando\":[\"superuser\",\"createdb\"]},\"databases\":{\"foo\":\"zalando\"},\"postgresql\":{\"version\":\"16\"},\"enableLogicalBackup\":false,\"patroni\":{\"pg_hba\":[\"local all all trust\",\"host all all 0.0.0.0/0 md5\",\"host replication all 0.0.0.0/0 md5\"]}}}", + }, + }, + "", + "acid-minimal-cluster", + "po", + }, + { + "postgresql resource already exists", + v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: "test", + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: "{\"apiVersion\":\"acid.zalan.do/v1\",\"kind\":\"postgresql\",\"metadata\":{\"name\":\"acid-minimal-cluster\",\"namespace\":\"po\"},\"spec\":{\"teamId\":\"acid\",\"volume\":{\"size\":\"1Gi\"},\"numberOfInstances\":1,\"users\":{\"zalando\":[\"superuser\",\"createdb\"]},\"databases\":{\"foo\":\"zalando\"},\"postgresql\":{\"version\":\"16\"}}}", + }, + }, + "", + "acid-minimal-cluster", + "po", + }, + { + "spec with missing teamId", + v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-2", + Namespace: "test", + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: "{\"apiVersion\":\"acid.zalan.do/v1\",\"kind\":\"postgresql\",\"metadata\":{\"name\":\"acid-spec-without-teamid\",\"namespace\":\"po\"},\"spec\":{\"volume\":{\"size\":\"1Gi\"},\"numberOfInstances\":1,\"users\":{\"zalando\":[\"superuser\",\"createdb\"]},\"databases\":{\"foo\":\"zalando\"}}}", + }, + }, + "", + "acid-spec-without-teamid", + "po", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newPostgresqlTestController() + acidClientSet := fakeacidv1.NewSimpleClientset() + + if tt.name == "postgresql resource already exists" { + pg := &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: tt.expectedPgName, + Namespace: tt.expectedPgNamespace, + }, + Spec: acidv1.PostgresSpec{ + TeamID: "some-other-team", + }, + } + _, err := acidClientSet.AcidV1().Postgresqls(tt.expectedPgNamespace).Create(context.TODO(), pg, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("could not pre-create postgresql resource for test: %v", err) + } + } + + c.KubeClient = k8sutil.KubernetesClient{ + PostgresqlsGetter: acidClientSet.AcidV1(), + } + + err := c.processSingleInProgressCm(tt.cm) + + if err != nil { + if tt.err == "" { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), tt.err) { + t.Fatalf("errors does not match, actual err: %v, expected err: %v", err, tt.err) + } + } else if tt.err != "" { + t.Fatalf("expected error containing %q, but got no error", tt.err) + } + + if tt.err == "" && tt.expectedPgName != "" { + pg, err := acidClientSet.AcidV1().Postgresqls(tt.expectedPgNamespace).Get(context.TODO(), tt.expectedPgName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("could not get postgresql resource: %v", err) + } + + switch tt.name { + case "successful create": + if pg.Spec.TeamID != "acid" { + t.Errorf("expected teamId 'acid', got '%s'", pg.Spec.TeamID) + } + case "postgresql resource already exists": + if pg.Spec.TeamID != "some-other-team" { + t.Errorf("expected teamId to be 'some-other-team', but it was overwritten to '%s'", pg.Spec.TeamID) + } + case "spec with missing teamId": + if pg.Spec.TeamID != "" { + t.Errorf("expected teamId to be empty, got '%s'", pg.Spec.TeamID) + } + } + } + }) + } +} + +func TestProcessInProgressCm(t *testing.T) { + tests := []struct { + name string + namespace string + cms []*v1.ConfigMap + err string + expectedPgCreations int + }{ + { + "process one of two in-progress cms", + "po", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: "po", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValueInProgress, + }, + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: "{\"metadata\":{\"name\":\"acid-test-cluster-1\"}}", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-2", + Namespace: "po", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-3", + Namespace: "po", + }, + }, + }, + "", + 1, + }, + { + "list fails", + "po", + []*v1.ConfigMap{}, + "synthetic list error", + 0, + }, + { + "single cm process fails", + "po", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-good", + Namespace: "po", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValueInProgress, + }, + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: "{\"metadata\":{\"name\":\"acid-good-cluster\"}}", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-bad", + Namespace: "po", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValueInProgress, + }, + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: "invalid-json", + }, + }, + }, + "", + 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newPostgresqlTestController() + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + if tt.name == "list fails" { + clientSet.PrependReactor("list", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic list error") + }) + } + + c.KubeClient = k8sutil.KubernetesClient{ + ConfigMapsGetter: clientSet.CoreV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + } + + for _, cm := range tt.cms { + _, err := c.KubeClient.ConfigMaps(tt.namespace).Create(context.TODO(), cm, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Could not create config map: %v", err) + } + } + + err := c.processInProgressCm(tt.namespace) + if err != nil { + if tt.err == "" { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), tt.err) { + t.Fatalf("errors does not match, actual err: %v, expected err: %v", err, tt.err) + } + } else if tt.err != "" { + t.Fatalf("expected error containing %q, but got no error", tt.err) + } + + var creations int + for _, action := range acidClientSet.Actions() { + if action.GetVerb() == "create" && action.GetResource().Resource == "postgresqls" { + creations++ + } + } + + if creations != tt.expectedPgCreations { + t.Errorf("expected %d postgresql resources to be created, but found %d", tt.expectedPgCreations, creations) + } + }) + } +} + +func TestProcessSinglePendingCm(t *testing.T) { + tests := []struct { + name string + cm v1.ConfigMap + pgExists bool + getPgFails bool + updateCmFails bool + expectedErr string + expectedLabel string + }{ + { + "postgresql cr still exists", + v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-cluster", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + true, + false, + false, + "", + cluster.PitrStateLabelValuePending, + }, + { + "get postgresql cr fails", + v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-cluster", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + false, + true, + false, + "could not check for existence of Postgresql CR", + cluster.PitrStateLabelValuePending, + }, + { + "postgresql cr does not exist, cm update succeeds", + v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-cluster", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + false, + false, + false, + "", + cluster.PitrStateLabelValueInProgress, + }, + { + "postgresql cr does not exist, cm update fails", + v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-cluster", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + false, + false, + true, + "could not update ConfigMap", + cluster.PitrStateLabelValuePending, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newPostgresqlTestController() + clientSet := fake.NewSimpleClientset(&tt.cm) + acidClientSet := fakeacidv1.NewSimpleClientset() + + if tt.pgExists { + pg := &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + } + _, err := acidClientSet.AcidV1().Postgresqls("default").Create(context.TODO(), pg, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("could not create postgresql resource: %v", err) + } + } + + if tt.getPgFails { + acidClientSet.PrependReactor("get", "postgresqls", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic get error") + }) + } + + if tt.updateCmFails { + clientSet.PrependReactor("update", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic update error") + }) + } + + c.KubeClient = k8sutil.KubernetesClient{ + ConfigMapsGetter: clientSet.CoreV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + } + + err := c.processSinglePendingCm(tt.cm) + + if err != nil { + if tt.expectedErr == "" { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), tt.expectedErr) { + t.Fatalf("error mismatch: got %q, expected to contain %q", err, tt.expectedErr) + } + } else if tt.expectedErr != "" { + t.Fatalf("expected error containing %q, but got no error", tt.expectedErr) + } + + if !tt.updateCmFails { + updatedCm, err := clientSet.CoreV1().ConfigMaps("default").Get(context.TODO(), "pitr-state-test-cluster", metav1.GetOptions{}) + if err != nil { + t.Fatalf("could not get configmap: %v", err) + } + if updatedCm.Labels[cluster.PitrStateLabelKey] != tt.expectedLabel { + t.Errorf("expected label %q but got %q", tt.expectedLabel, updatedCm.Labels[cluster.PitrStateLabelKey]) + } + } + }) + } +} + +func TestProcessPendingCm(t *testing.T) { + tests := []struct { + name string + namespace string + cms []*v1.ConfigMap + listFails bool + err string + expectedProcessedPending int + }{ + { + "process one of two pending cms", + "default", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-2", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValueInProgress, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-3", + Namespace: "default", + }, + }, + }, + false, + "", + 1, + }, + { + "no pending cms to process", + "default", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValueInProgress, + }, + }, + }, + }, + false, + "", + 0, + }, + { + "list fails", + "default", + []*v1.ConfigMap{}, + true, + "could not list pending restore ConfigMaps", + 0, + }, + { + "process multiple pending cms", + "default", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-2", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + }, + false, + "", + 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newPostgresqlTestController() + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + if tt.listFails { + clientSet.PrependReactor("list", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic list error") + }) + } + + c.KubeClient = k8sutil.KubernetesClient{ + ConfigMapsGetter: clientSet.CoreV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + } + + for _, cm := range tt.cms { + _, err := c.KubeClient.ConfigMaps(tt.namespace).Create(context.TODO(), cm, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Could not create config map: %v", err) + } + } + + err := c.processPendingCm(tt.namespace) + if err != nil { + if tt.err == "" { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), tt.err) { + t.Fatalf("errors does not match, actual err: %v, expected err: %v", err, tt.err) + } + } else if tt.err != "" { + t.Fatalf("expected error containing %q, but got no error", tt.err) + } + + if !tt.listFails { + var pendingProcessed int + for _, action := range acidClientSet.Actions() { + if action.GetVerb() == "get" && action.GetResource().Resource == "postgresqls" { + pendingProcessed++ + } + } + + if pendingProcessed != tt.expectedProcessedPending { + t.Errorf("expected %d pending cms to be processed, but found %d", tt.expectedProcessedPending, pendingProcessed) + } + } + }) + } +} + +func TestProcessPendingRestores(t *testing.T) { + tests := []struct { + name string + watchedNamespace string + cms []*v1.ConfigMap + pendingCmListFails bool + inProgressCmListFails bool + expectedErr string + expectedPendingProcessed int + expectedInProgressCreate int + }{ + { + "process both pending and in-progress cms with watched namespace", + "default", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-2", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValueInProgress, + }, + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: "{\"metadata\":{\"name\":\"acid-test-cluster\"}}", + }, + }, + }, + false, + false, + "", + 1, + 1, + }, + { + "use all namespaces when watched namespace is empty", + "", + []*v1.ConfigMap{}, + false, + false, + "", + 0, + 0, + }, + { + "processPendingCm fails", + "default", + []*v1.ConfigMap{}, + true, + false, + "could not list pending restore ConfigMaps", + 0, + 0, + }, + { + "processInProgressCm fails", + "default", + []*v1.ConfigMap{}, + false, + true, + "could not list in-progress restore ConfigMaps", + 0, + 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newPostgresqlTestController() + c.opConfig.WatchedNamespace = tt.watchedNamespace + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + listCallCount := 0 + if tt.pendingCmListFails || tt.inProgressCmListFails { + clientSet.PrependReactor("list", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + listCallCount++ + if tt.pendingCmListFails && listCallCount == 1 { + return true, nil, fmt.Errorf("synthetic list error") + } + if tt.inProgressCmListFails && listCallCount == 2 { + return true, nil, fmt.Errorf("synthetic list error") + } + return false, nil, nil + }) + } + + c.KubeClient = k8sutil.KubernetesClient{ + ConfigMapsGetter: clientSet.CoreV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + } + + namespace := tt.watchedNamespace + if namespace == "" { + namespace = "default" + } + for _, cm := range tt.cms { + _, err := c.KubeClient.ConfigMaps(namespace).Create(context.TODO(), cm, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Could not create config map: %v", err) + } + } + + err := c.processPendingRestores() + if err != nil { + if tt.expectedErr == "" { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), tt.expectedErr) { + t.Fatalf("error mismatch: got %q, expected to contain %q", err, tt.expectedErr) + } + } else if tt.expectedErr != "" { + t.Fatalf("expected error containing %q, but got no error", tt.expectedErr) + } + + if tt.expectedErr == "" { + var pendingProcessed int + var inProgressCreate int + for _, action := range acidClientSet.Actions() { + if action.GetVerb() == "get" && action.GetResource().Resource == "postgresqls" { + pendingProcessed++ + } + if action.GetVerb() == "create" && action.GetResource().Resource == "postgresqls" { + inProgressCreate++ + } + } + + if pendingProcessed != tt.expectedPendingProcessed { + t.Errorf("expected %d pending cms to be processed, but found %d", tt.expectedPendingProcessed, pendingProcessed) + } + if inProgressCreate != tt.expectedInProgressCreate { + t.Errorf("expected %d in-progress cms to create postgresql resources, but found %d", tt.expectedInProgressCreate, inProgressCreate) + } + } + }) + } +} + +func TestValidateRestoreInPlace(t *testing.T) { + validTimestamp := time.Now().Add(-1 * time.Hour).Format(time.RFC3339) + futureTimestamp := time.Now().Add(1 * time.Hour).Format(time.RFC3339) + + tests := []struct { + name string + pgOld *acidv1.Postgresql + pgNew *acidv1.Postgresql + expectedErr string + }{ + { + "missing clone section", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{}, + }, + "'clone' section is missing in the manifest", + }, + { + "cluster name mismatch", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "different-cluster", + EndTimestamp: validTimestamp, + }, + }, + }, + "clone cluster name \"different-cluster\" does not match the current cluster name \"acid-test-cluster\"", + }, + { + "invalid timestamp format", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "acid-test-cluster", + EndTimestamp: "invalid-timestamp", + }, + }, + }, + "could not parse clone timestamp", + }, + { + "future timestamp", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "acid-test-cluster", + EndTimestamp: futureTimestamp, + }, + }, + }, + "clone timestamp", + }, + { + "valid restore parameters", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "acid-test-cluster", + EndTimestamp: validTimestamp, + }, + }, + }, + "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newPostgresqlTestController() + + err := c.validateRestoreInPlace(tt.pgOld, tt.pgNew) + + if err != nil { + if tt.expectedErr == "" { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), tt.expectedErr) { + t.Fatalf("error mismatch: got %q, expected to contain %q", err, tt.expectedErr) + } + } else if tt.expectedErr != "" { + t.Fatalf("expected error containing %q, but got no error", tt.expectedErr) + } + }) + } +} + +func TestHandleRestoreInPlace(t *testing.T) { + validTimestamp := time.Now().Add(-1 * time.Hour).Format(time.RFC3339) + + tests := []struct { + name string + pgOld *acidv1.Postgresql + pgNew *acidv1.Postgresql + cmExists bool + cmCreateFails bool + cmUpdateFails bool + pgDeleteFails bool + expectCmCreateAttempt bool + expectCmUpdateAttempt bool + expectPgDeleteAttempt bool + }{ + { + "validation fails - missing clone section", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{}, + }, + false, + false, + false, + false, + false, + false, + false, + }, + { + "successful restore - cm created and pg deleted", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "acid-test-cluster", + EndTimestamp: validTimestamp, + }, + }, + }, + false, + false, + false, + false, + true, + false, + true, + }, + { + "cm already exists - cm updated and pg deleted", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "acid-test-cluster", + EndTimestamp: validTimestamp, + }, + }, + }, + true, + false, + false, + false, + false, + true, + true, + }, + { + "cm create fails - no pg delete", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "acid-test-cluster", + EndTimestamp: validTimestamp, + }, + }, + }, + false, + true, + false, + false, + true, + false, + false, + }, + { + "cm update fails - no pg delete", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "acid-test-cluster", + EndTimestamp: validTimestamp, + }, + }, + }, + true, + false, + true, + false, + false, + true, + false, + }, + { + "pg delete fails", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "acid-test-cluster", + EndTimestamp: validTimestamp, + }, + }, + }, + false, + false, + false, + true, + true, + false, + true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newPostgresqlTestController() + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + // Pre-create postgresql resource + _, err := acidClientSet.AcidV1().Postgresqls(tt.pgOld.Namespace).Create(context.TODO(), tt.pgOld, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("could not create postgresql resource: %v", err) + } + + if tt.cmExists { + cmName := fmt.Sprintf("pitr-state-%s", tt.pgNew.Name) + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: tt.pgNew.Namespace, + }, + } + _, err := clientSet.CoreV1().ConfigMaps(tt.pgNew.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("could not create configmap: %v", err) + } + } + + if tt.cmCreateFails { + clientSet.PrependReactor("create", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic create error") + }) + } + + if tt.cmUpdateFails { + clientSet.PrependReactor("update", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic update error") + }) + } + + if tt.pgDeleteFails { + acidClientSet.PrependReactor("delete", "postgresqls", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic delete error") + }) + } + + c.KubeClient = k8sutil.KubernetesClient{ + ConfigMapsGetter: clientSet.CoreV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + } + + // Clear actions from setup phase before calling the handler + clientSet.ClearActions() + acidClientSet.ClearActions() + + c.handleRestoreInPlace(tt.pgOld, tt.pgNew) + + // Check ConfigMap actions (only actions from the handler itself) + var cmCreateAttempt, cmUpdateAttempt bool + for _, action := range clientSet.Actions() { + if action.GetVerb() == "create" && action.GetResource().Resource == "configmaps" { + cmCreateAttempt = true + } + if action.GetVerb() == "update" && action.GetResource().Resource == "configmaps" { + cmUpdateAttempt = true + } + } + + // Check Postgresql actions + var pgDeleteAttempt bool + for _, action := range acidClientSet.Actions() { + if action.GetVerb() == "delete" && action.GetResource().Resource == "postgresqls" { + pgDeleteAttempt = true + } + } + + if cmCreateAttempt != tt.expectCmCreateAttempt { + t.Errorf("expected cmCreateAttempt=%v, got %v", tt.expectCmCreateAttempt, cmCreateAttempt) + } + if cmUpdateAttempt != tt.expectCmUpdateAttempt { + t.Errorf("expected cmUpdateAttempt=%v, got %v", tt.expectCmUpdateAttempt, cmUpdateAttempt) + } + if pgDeleteAttempt != tt.expectPgDeleteAttempt { + t.Errorf("expected pgDeleteAttempt=%v, got %v", tt.expectPgDeleteAttempt, pgDeleteAttempt) + } + }) + } +} diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 9fadd6a5b..8af00f87e 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -18,6 +18,7 @@ type CRD struct { ReadyWaitTimeout time.Duration `name:"ready_wait_timeout" default:"30s"` ResyncPeriod time.Duration `name:"resync_period" default:"30m"` RepairPeriod time.Duration `name:"repair_period" default:"5m"` + PitrBackupRetention time.Duration `name:"pitr_backup_retention" default:"168h"` EnableCRDRegistration *bool `name:"enable_crd_registration" default:"true"` EnableCRDValidation *bool `name:"enable_crd_validation" default:"true"` CRDCategories []string `name:"crd_categories" default:"all"`