From 4d21e44183327627389c8dd22896cdf859ff76a0 Mon Sep 17 00:00:00 2001 From: Polina Bungina Date: Thu, 23 May 2024 10:41:12 +0200 Subject: [PATCH 1/9] Initial inherited annotations fix implementation --- manifests/operator-service-account-rbac.yaml | 1 + pkg/cluster/cluster.go | 81 ++-- pkg/cluster/connection_pooler.go | 72 ++- pkg/cluster/k8sres_test.go | 2 +- pkg/cluster/resources.go | 44 +- pkg/cluster/sync.go | 69 ++- pkg/cluster/sync_test.go | 6 +- pkg/cluster/util_test.go | 479 ++++++++++++++++--- pkg/cluster/volumes.go | 107 ++--- pkg/cluster/volumes_test.go | 3 +- pkg/util/k8sutil/k8sutil.go | 13 - 11 files changed, 605 insertions(+), 272 deletions(-) diff --git a/manifests/operator-service-account-rbac.yaml b/manifests/operator-service-account-rbac.yaml index c10dc5fd7..1013ed9ba 100644 --- a/manifests/operator-service-account-rbac.yaml +++ b/manifests/operator-service-account-rbac.yaml @@ -173,6 +173,7 @@ rules: - get - list - patch + - update # to CRUD cron jobs for logical backups - apiGroups: - batch diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 0475c718c..ad27dd320 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -30,6 +30,7 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" + apipolicyv1 "k8s.io/api/policy/v1" policyv1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -400,7 +401,7 @@ func (c *Cluster) Create() (err error) { if len(c.Spec.Streams) > 0 { // creating streams requires syncing the statefulset first - err = c.syncStatefulSet() + err = c.syncStatefulSet(true) if err != nil { return fmt.Errorf("could not sync statefulset: %v", err) } @@ -493,7 +494,6 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations); changed { match = false needsReplace = true - needsRollUpdate = true reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason) } if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.SecurityContext, statefulSet.Spec.Template.Spec.SecurityContext) { @@ -513,9 +513,9 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i)) continue } - if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) { + if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations); changed { needsReplace = true - reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one", name)) + reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one: ", name)+reason) } if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) { name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name @@ -764,6 +764,16 @@ func (c *Cluster) compareAnnotations(old, new map[string]string) (bool, string) } +func (c *Cluster) extractIgnoredAnnotations(annoList map[string]string) map[string]string { + result := make(map[string]string) + for _, ignore := range c.OpConfig.IgnoredAnnotations { + if _, ok := annoList[ignore]; ok { + result[ignore] = annoList[ignore] + } + } + return result +} + func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) { if old.Spec.Type != new.Spec.Type { return false, fmt.Sprintf("new service's type %q does not match the current one %q", @@ -818,6 +828,17 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool return true, "" } +func (c *Cluster) ComparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) { + //TODO: improve comparison + if match := reflect.DeepEqual(new.Spec, cur.Spec); !match { + return false, "new PDB spec does not match the current one" + } + if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations); changed { + return false, "new PDB's annotations does not match the current one:" + reason + } + return true, "" +} + func getPgVersion(cronJob *batchv1.CronJob) string { envs := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env for _, env := range envs { @@ -922,12 +943,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } // Service - if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) || - !reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) { - if err := c.syncServices(); err != nil { - c.logger.Errorf("could not sync services: %v", err) - updateFailed = true - } + if err := c.syncServices(); err != nil { + c.logger.Errorf("could not sync services: %v", err) + updateFailed = true } // Users @@ -946,7 +964,10 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { // only when streams were not specified in oldSpec but in newSpec needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0 - if !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser { + annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations) + + initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser + if initUsers { c.logger.Debugf("initialize users") if err := c.initUsers(); err != nil { c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err) @@ -954,7 +975,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { updateFailed = true return } - + } + if initUsers || annotationsChanged { c.logger.Debugf("syncing secrets") //TODO: mind the secrets of the deleted/new users if err := c.syncSecrets(); err != nil { @@ -968,7 +990,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { if c.OpConfig.StorageResizeMode != "off" { c.syncVolumes() } else { - c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume sync.") + c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume size sync.") } // streams configuration @@ -978,29 +1000,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { // Statefulset func() { - oldSs, err := c.generateStatefulSet(&oldSpec.Spec) - if err != nil { - c.logger.Errorf("could not generate old statefulset spec: %v", err) - updateFailed = true - return - } - - newSs, err := c.generateStatefulSet(&newSpec.Spec) - if err != nil { - c.logger.Errorf("could not generate new statefulset spec: %v", err) + if err := c.syncStatefulSet(syncStatefulSet); err != nil { + c.logger.Errorf("could not sync statefulsets: %v", err) updateFailed = true - return - } - - if syncStatefulSet || !reflect.DeepEqual(oldSs, newSs) { - c.logger.Debugf("syncing statefulsets") - syncStatefulSet = false - // TODO: avoid generating the StatefulSet object twice by passing it to syncStatefulSet - if err := c.syncStatefulSet(); err != nil { - c.logger.Errorf("could not sync statefulsets: %v", err) - updateFailed = true - } } + syncStatefulSet = false }() // add or remove standby_cluster section from Patroni config depending on changes in standby section @@ -1011,12 +1015,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } // pod disruption budget - if oldSpec.Spec.NumberOfInstances != newSpec.Spec.NumberOfInstances { - c.logger.Debug("syncing pod disruption budgets") - if err := c.syncPodDisruptionBudget(true); err != nil { - c.logger.Errorf("could not sync pod disruption budget: %v", err) - updateFailed = true - } + if err := c.syncPodDisruptionBudget(true); err != nil { + c.logger.Errorf("could not sync pod disruption budget: %v", err) + updateFailed = true } // logical backup job diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index 7a97497d7..a79e45616 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -10,6 +10,7 @@ import ( "github.com/sirupsen/logrus" acidzalando "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "golang.org/x/exp/maps" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -691,26 +692,6 @@ func updateConnectionPoolerDeployment(KubeClient k8sutil.KubernetesClient, newDe return deployment, nil } -// updateConnectionPoolerAnnotations updates the annotations of connection pooler deployment -func updateConnectionPoolerAnnotations(KubeClient k8sutil.KubernetesClient, deployment *appsv1.Deployment, annotations map[string]string) (*appsv1.Deployment, error) { - patchData, err := metaAnnotationsPatch(annotations) - if err != nil { - return nil, fmt.Errorf("could not form patch for the connection pooler deployment metadata: %v", err) - } - result, err := KubeClient.Deployments(deployment.Namespace).Patch( - context.TODO(), - deployment.Name, - types.MergePatchType, - []byte(patchData), - metav1.PatchOptions{}, - "") - if err != nil { - return nil, fmt.Errorf("could not patch connection pooler annotations %q: %v", patchData, err) - } - return result, nil - -} - // Test if two connection pooler configuration needs to be synced. For simplicity // compare not the actual K8S objects, but the configuration itself and request // sync if there is any difference. @@ -1022,6 +1003,25 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql syncReason = append(syncReason, specReason...) } + newPodAnnotations := c.annotationsSet(c.generatePodAnnotations(&c.Spec)) + if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations); changed { + specSync = true + syncReason = append(syncReason, []string{"new connection pooler's pod template annotations do not match the current one: " + reason}...) + deployment.Spec.Template.Annotations = newPodAnnotations + } + newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations + ignoredAnnotations := c.extractIgnoredAnnotations(deployment.Annotations) + maps.Copy(newAnnotations, ignoredAnnotations) + if changed, reason := c.compareAnnotations(deployment.Annotations, newAnnotations); changed { + deployment.Annotations = newAnnotations + c.logger.Infof("new connection pooler deployments's annotations does not match the current one:" + reason) + c.logger.Debug("updating connection pooler deployments's annotations") + deployment, err = c.KubeClient.Deployments(deployment.Namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("could not update connection pooler annotations %q: %v", deployment.Name, err) + } + } + defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(&c.Config, newConnectionPooler, deployment) syncReason = append(syncReason, defaultsReason...) @@ -1042,15 +1042,6 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql } } - newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(c.ConnectionPooler[role].Deployment.Annotations)) - if newAnnotations != nil { - deployment, err = updateConnectionPoolerAnnotations(c.KubeClient, c.ConnectionPooler[role].Deployment, newAnnotations) - if err != nil { - return nil, err - } - c.ConnectionPooler[role].Deployment = deployment - } - // check if pooler pods must be replaced due to secret update listOptions := metav1.ListOptions{ LabelSelector: labels.Set(c.connectionPoolerLabels(role, true).MatchLabels).String(), @@ -1076,22 +1067,27 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if err != nil { return nil, fmt.Errorf("could not delete pooler pod: %v", err) } + } else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed { + newAnnotations := c.extractIgnoredAnnotations(pod.Annotations) + maps.Copy(newAnnotations, deployment.Spec.Template.Annotations) + pod.Annotations = newAnnotations + c.logger.Debugf("updating annotations for connection pooler's pod %q", pod.Name) + _, err := c.KubeClient.Pods(pod.Namespace).Update(context.TODO(), &pod, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("could not update annotations for pod %q: %v", pod.Name, err) + } } } if service, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil { c.ConnectionPooler[role].Service = service desiredSvc := c.generateConnectionPoolerService(c.ConnectionPooler[role]) - if match, reason := c.compareServices(service, desiredSvc); !match { - syncReason = append(syncReason, reason) - c.logServiceChanges(role, service, desiredSvc, false, reason) - newService, err = c.updateService(role, service, desiredSvc) - if err != nil { - return syncReason, fmt.Errorf("could not update %s service to match desired state: %v", role, err) - } - c.ConnectionPooler[role].Service = newService - c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) + newService, err = c.updateService(role, service, desiredSvc) + if err != nil { + return syncReason, fmt.Errorf("could not update %s service to match desired state: %v", role, err) } + c.ConnectionPooler[role].Service = newService + c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) return NoSync, nil } diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 6e5a669ad..60fce89a2 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -3306,7 +3306,7 @@ func TestGenerateResourceRequirements(t *testing.T) { cluster.Namespace = namespace _, err := cluster.createStatefulSet() if k8sutil.ResourceAlreadyExists(err) { - err = cluster.syncStatefulSet() + err = cluster.syncStatefulSet(true) } assert.NoError(t, err) diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 1d4758c02..d254317cc 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -291,29 +291,16 @@ func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newSe err error ) + match, reason := c.compareServices(oldService, newService) + if match { + return oldService, nil + } + + c.logServiceChanges(role, oldService, newService, false, reason) c.setProcessName("updating %v service", role) serviceName := util.NameFromMeta(oldService.ObjectMeta) - // update the service annotation in order to propagate ELB notation. - if len(newService.ObjectMeta.Annotations) > 0 { - if annotationsPatchData, err := metaAnnotationsPatch(newService.ObjectMeta.Annotations); err == nil { - _, err = c.KubeClient.Services(serviceName.Namespace).Patch( - context.TODO(), - serviceName.Name, - types.MergePatchType, - []byte(annotationsPatchData), - metav1.PatchOptions{}, - "") - - if err != nil { - return nil, fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err) - } - } else { - return nil, fmt.Errorf("could not form patch for the service metadata: %v", err) - } - } - // now, patch the service spec, but when disabling LoadBalancers do update instead // patch does not work because of LoadBalancerSourceRanges field (even if set to nil) oldServiceType := oldService.Spec.Type @@ -321,21 +308,10 @@ func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newSe if newServiceType == "ClusterIP" && newServiceType != oldServiceType { newService.ResourceVersion = oldService.ResourceVersion newService.Spec.ClusterIP = oldService.Spec.ClusterIP - svc, err = c.KubeClient.Services(serviceName.Namespace).Update(context.TODO(), newService, metav1.UpdateOptions{}) - if err != nil { - return nil, fmt.Errorf("could not update service %q: %v", serviceName, err) - } - } else { - patchData, err := specPatch(newService.Spec) - if err != nil { - return nil, fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) - } - - svc, err = c.KubeClient.Services(serviceName.Namespace).Patch( - context.TODO(), serviceName.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "") - if err != nil { - return nil, fmt.Errorf("could not patch service %q: %v", serviceName, err) - } + } + svc, err = c.KubeClient.Services(serviceName.Namespace).Update(context.TODO(), newService, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("could not update service %q: %v", serviceName, err) } return svc, nil diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 4c89c98fe..bd1a9e596 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "maps" "reflect" "regexp" "strconv" @@ -92,7 +93,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } c.logger.Debug("syncing statefulsets") - if err = c.syncStatefulSet(); err != nil { + if err = c.syncStatefulSet(true); err != nil { if !k8sutil.ResourceAlreadyExists(err) { err = fmt.Errorf("could not sync statefulsets: %v", err) return err @@ -200,15 +201,14 @@ func (c *Cluster) syncService(role PostgresRole) error { if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.serviceName(role), metav1.GetOptions{}); err == nil { c.Services[role] = svc desiredSvc := c.generateService(role, &c.Spec) - if match, reason := c.compareServices(svc, desiredSvc); !match { - c.logServiceChanges(role, svc, desiredSvc, false, reason) - updatedSvc, err := c.updateService(role, svc, desiredSvc) - if err != nil { - return fmt.Errorf("could not update %s service to match desired state: %v", role, err) - } - c.Services[role] = updatedSvc - c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) + ignoredAnnotations := c.extractIgnoredAnnotations(svc.Annotations) + maps.Copy(desiredSvc.Annotations, ignoredAnnotations) + updatedSvc, err := c.updateService(role, svc, desiredSvc) + if err != nil { + return fmt.Errorf("could not update %s service to match desired state: %v", role, err) } + c.Services[role] = updatedSvc + c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) return nil } if !k8sutil.ResourceNotFound(err) { @@ -275,7 +275,10 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil { c.PodDisruptionBudget = pdb newPDB := c.generatePodDisruptionBudget() - if match, reason := k8sutil.SamePDB(pdb, newPDB); !match { + ignoredAnnotations := c.extractIgnoredAnnotations(pdb.Annotations) + maps.Copy(newPDB.Annotations, ignoredAnnotations) + match, reason := c.ComparePodDisruptionBudget(pdb, newPDB) + if !match { c.logPDBChanges(pdb, newPDB, isUpdate, reason) if err = c.updatePodDisruptionBudget(newPDB); err != nil { return err @@ -309,7 +312,7 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { return nil } -func (c *Cluster) syncStatefulSet() error { +func (c *Cluster) syncStatefulSet(force bool) error { var ( restartWait uint32 configPatched bool @@ -326,9 +329,13 @@ func (c *Cluster) syncStatefulSet() error { // NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early. sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(context.TODO(), c.statefulSetName(), metav1.GetOptions{}) + if err != nil && !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("error during reading of statefulset: %v", err) + } + if err != nil { - if !k8sutil.ResourceNotFound(err) { - return fmt.Errorf("error during reading of statefulset: %v", err) + if !force { + return nil } // statefulset does not exist, try to re-create it c.Statefulset = nil @@ -354,6 +361,16 @@ func (c *Cluster) syncStatefulSet() error { c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta)) } else { + desiredSts, err := c.generateStatefulSet(&c.Spec) + if err != nil { + return fmt.Errorf("could not generate statefulset: %v", err) + } + ignoredAnnotations := c.extractIgnoredAnnotations(sset.Annotations) + maps.Copy(desiredSts.Annotations, ignoredAnnotations) + if reflect.DeepEqual(sset, desiredSts) && !force { + return nil + } + c.logger.Debugf("syncing statefulsets") // check if there are still pods with a rolling update flag for _, pod := range pods { if c.getRollingUpdateFlagFromPod(&pod) { @@ -374,12 +391,20 @@ func (c *Cluster) syncStatefulSet() error { // statefulset is already there, make sure we use its definition in order to compare with the spec. c.Statefulset = sset - desiredSts, err := c.generateStatefulSet(&c.Spec) - if err != nil { - return fmt.Errorf("could not generate statefulset: %v", err) - } - cmp := c.compareStatefulSetWith(desiredSts) + if !cmp.rollingUpdate { + for _, pod := range pods { + if changed, _ := c.compareAnnotations(pod.Annotations, desiredSts.Spec.Template.Annotations); changed { + ignoredAnnotations := c.extractIgnoredAnnotations(pod.Annotations) + maps.Copy(ignoredAnnotations, desiredSts.Spec.Template.Annotations) + pod.Annotations = ignoredAnnotations + c.logger.Debugf("updating annotations for pod %q", pod.Name) + if _, err := c.KubeClient.Pods(pod.Namespace).Update(context.TODO(), &pod, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("could not update annotations for pod %q: %v", pod.Name, err) + } + } + } + } if !cmp.match { if cmp.rollingUpdate { podsToRecreate = make([]v1.Pod, 0) @@ -934,6 +959,14 @@ func (c *Cluster) updateSecret( userMap[userKey] = pwdUser } + ignoredAnnotations := c.extractIgnoredAnnotations(secret.Annotations) + maps.Copy(generatedSecret.Annotations, ignoredAnnotations) + if changed, reason := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations); changed { + c.logger.Infof("%q secret's annotations does not match the current one: %s", secretName, reason) + secret.Annotations = generatedSecret.Annotations + updateSecret = true + } + if updateSecret { c.logger.Debugln(updateSecretMsg) if _, err = c.KubeClient.Secrets(secret.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{}); err != nil { diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index 46d1be5b7..b1d132bdb 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -128,7 +128,7 @@ func TestSyncStatefulSetsAnnotations(t *testing.T) { } // now sync statefulset - the diff will trigger a replacement of the statefulset - cluster.syncStatefulSet() + cluster.syncStatefulSet(true) // compare again after the SYNC - must be identical to the desired state cmp = cluster.compareStatefulSetWith(desiredSts) @@ -572,7 +572,7 @@ func TestSyncStandbyClusterConfiguration(t *testing.T) { cluster.Spec.StandbyCluster = &acidv1.StandbyDescription{ S3WalPath: "s3://custom/path/to/bucket/", } - cluster.syncStatefulSet() + cluster.syncStatefulSet(true) updatedSts := cluster.Statefulset // check that pods do not have a STANDBY_* environment variable @@ -605,7 +605,7 @@ func TestSyncStandbyClusterConfiguration(t *testing.T) { */ // remove standby section cluster.Spec.StandbyCluster = &acidv1.StandbyDescription{} - cluster.syncStatefulSet() + cluster.syncStatefulSet(true) updatedSts2 := cluster.Statefulset // check that pods do not have a STANDBY_* environment variable diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index 5d8b92f2c..d958b71e0 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -1,16 +1,26 @@ package cluster import ( + "bytes" "context" + "fmt" + "io" + "net/http" + "reflect" "testing" + "time" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/zalando/postgres-operator/mocks" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" fakeacidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" - "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/k8sutil" + "github.com/zalando/postgres-operator/pkg/util/patroni" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" k8sFake "k8s.io/client-go/kubernetes/fake" ) @@ -19,29 +29,56 @@ func newFakeK8sAnnotationsClient() (k8sutil.KubernetesClient, *k8sFake.Clientset acidClientSet := fakeacidv1.NewSimpleClientset() return k8sutil.KubernetesClient{ - PodDisruptionBudgetsGetter: clientSet.PolicyV1(), - ServicesGetter: clientSet.CoreV1(), - StatefulSetsGetter: clientSet.AppsV1(), - PostgresqlsGetter: acidClientSet.AcidV1(), + PodDisruptionBudgetsGetter: clientSet.PolicyV1(), + SecretsGetter: clientSet.CoreV1(), + ServicesGetter: clientSet.CoreV1(), + StatefulSetsGetter: clientSet.AppsV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + PersistentVolumeClaimsGetter: clientSet.CoreV1(), + PersistentVolumesGetter: clientSet.CoreV1(), + EndpointsGetter: clientSet.CoreV1(), + PodsGetter: clientSet.CoreV1(), + DeploymentsGetter: clientSet.AppsV1(), }, clientSet } -func TestInheritedAnnotations(t *testing.T) { - testName := "test inheriting annotations from manifest" - client, _ := newFakeK8sAnnotationsClient() - clusterName := "acid-test-cluster" - namespace := "default" - annotationValue := "acid" - role := Master +func createPods(cluster *Cluster) []v1.Pod { + podsList := make([]v1.Pod, 0) + for i, role := range []PostgresRole{Master, Replica} { + podsList = append(podsList, v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%d", clusterName, i), + Namespace: namespace, + Labels: map[string]string{ + "application": "spilo", + "cluster-name": clusterName, + "spilo-role": string(role), + }, + }, + }) + podsList = append(podsList, v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-pooler-%s", clusterName, role), + Namespace: namespace, + Labels: cluster.connectionPoolerLabels(role, true).MatchLabels, + }, + }) + } + + return podsList +} +func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, error) { pg := acidv1.Postgresql{ ObjectMeta: metav1.ObjectMeta{ Name: clusterName, Annotations: map[string]string{ - "owned-by": annotationValue, + "owned-by": "acid", + "foo": "bar", }, }, Spec: acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(true), EnableReplicaConnectionPooler: boolToPointer(true), Volume: acidv1.Volume{ Size: "1Gi", @@ -49,9 +86,11 @@ func TestInheritedAnnotations(t *testing.T) { }, } - var cluster = New( + cluster := New( Config{ OpConfig: config.Config{ + PatroniAPICheckInterval: time.Duration(1), + PatroniAPICheckTimeout: time.Duration(5), ConnectionPooler: config.ConnectionPooler{ ConnectionPoolerDefaultCPURequest: "100m", ConnectionPoolerDefaultCPULimit: "100m", @@ -59,85 +98,397 @@ func TestInheritedAnnotations(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", NumberOfInstances: k8sutil.Int32ToPointer(1), }, + PDBNameFormat: "postgres-{cluster}-pdb", PodManagementPolicy: "ordered_ready", Resources: config.Resources{ - ClusterLabels: map[string]string{"application": "spilo"}, - ClusterNameLabel: "cluster-name", - DefaultCPURequest: "300m", - DefaultCPULimit: "300m", - DefaultMemoryRequest: "300Mi", - DefaultMemoryLimit: "300Mi", - InheritedAnnotations: []string{"owned-by"}, - PodRoleLabel: "spilo-role", + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + InheritedAnnotations: []string{"owned-by"}, + IgnoredAnnotations: []string{"ignore"}, + PodRoleLabel: "spilo-role", + ResourceCheckInterval: time.Duration(testResourceCheckInterval), + ResourceCheckTimeout: time.Duration(testResourceCheckTimeout), }, }, }, client, pg, logger, eventRecorder) - cluster.Name = clusterName cluster.Namespace = namespace + _, err := cluster.createStatefulSet() + if err != nil { + return nil, err + } + _, err = cluster.createService(Master) + if err != nil { + return nil, err + } + _, err = cluster.createPodDisruptionBudget() + if err != nil { + return nil, err + } + _, err = cluster.createConnectionPooler(mockInstallLookupFunction) + if err != nil { + return nil, err + } + pvcList := CreatePVCs(namespace, clusterName, cluster.labelsSet(false), 2, "1Gi") + for _, pvc := range pvcList.Items { + cluster.KubeClient.PersistentVolumeClaims(namespace).Create(context.TODO(), &pvc, metav1.CreateOptions{}) + } + podsList := createPods(cluster) + for _, pod := range podsList { + cluster.KubeClient.Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{}) + } - // test annotationsSet function - inheritedAnnotations := cluster.annotationsSet(nil) + return cluster, nil +} - listOptions := metav1.ListOptions{ - LabelSelector: cluster.labelsSet(false).String(), +func annotateResources(cluster *Cluster) error { + externalAnnotations := map[string]string{"ignore": "me", "do-not-ignore": "me"} + listOptions := metav1.ListOptions{LabelSelector: cluster.labelsSet(false).String()} + + stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), listOptions) + if err != nil { + return err + } + for _, sts := range stsList.Items { + sts.Annotations = externalAnnotations + if _, err = cluster.KubeClient.StatefulSets(namespace).Update(context.TODO(), &sts, metav1.UpdateOptions{}); err != nil { + return err + } } - // check statefulset annotations - _, err := cluster.createStatefulSet() + podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), listOptions) + if err != nil { + return err + } + + for _, pod := range podList.Items { + pod.Annotations = externalAnnotations + if _, err = cluster.KubeClient.Pods(namespace).Update(context.TODO(), &pod, metav1.UpdateOptions{}); err != nil { + return err + } + } + + svcList, err := cluster.KubeClient.Services(namespace).List(context.TODO(), listOptions) + if err != nil { + return err + } + for _, svc := range svcList.Items { + svc.Annotations = externalAnnotations + if _, err = cluster.KubeClient.Services(namespace).Update(context.TODO(), &svc, metav1.UpdateOptions{}); err != nil { + return err + } + } + + pdbList, err := cluster.KubeClient.PodDisruptionBudgets(namespace).List(context.TODO(), listOptions) + if err != nil { + return err + } + for _, pdb := range pdbList.Items { + pdb.Annotations = externalAnnotations + _, err = cluster.KubeClient.PodDisruptionBudgets(namespace).Update(context.TODO(), &pdb, metav1.UpdateOptions{}) + if err != nil { + return err + } + } + + pvcList, err := cluster.KubeClient.PersistentVolumeClaims(namespace).List(context.TODO(), listOptions) + if err != nil { + return err + } + for _, pvc := range pvcList.Items { + pvc.Annotations = externalAnnotations + if _, err = cluster.KubeClient.PersistentVolumeClaims(namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil { + return err + } + } + + for _, role := range []PostgresRole{Master, Replica} { + deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{}) + if err != nil { + return err + } + deploy.Annotations = externalAnnotations + if _, err = cluster.KubeClient.Deployments(namespace).Update(context.TODO(), deploy, metav1.UpdateOptions{}); err != nil { + return err + } + poolerPodList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: labels.Set(cluster.connectionPoolerLabels(role, true).MatchLabels).String(), + }) + if err != nil { + return err + } + for _, pod := range poolerPodList.Items { + pod.Annotations = externalAnnotations + if _, err = cluster.KubeClient.Pods(namespace).Update(context.TODO(), &pod, metav1.UpdateOptions{}); err != nil { + return err + } + } + } + + secrets, err := cluster.KubeClient.Secrets(namespace).List(context.TODO(), listOptions) + if err != nil { + return err + } + for _, secret := range secrets.Items { + secret.Annotations = externalAnnotations + if _, err = cluster.KubeClient.Secrets(namespace).Update(context.TODO(), &secret, metav1.UpdateOptions{}); err != nil { + return err + } + } + return nil +} + +func TestInheritedAnnotations(t *testing.T) { + // mocks + ctrl := gomock.NewController(t) + defer ctrl.Finish() + client, _ := newFakeK8sAnnotationsClient() + mockClient := mocks.NewMockHTTPClient(ctrl) + + cluster, err := newInheritedAnnotationsCluster(client) assert.NoError(t, err) - stsList, err := client.StatefulSets(namespace).List(context.TODO(), listOptions) + configJson := `{"postgresql": {"parameters": {"log_min_duration_statement": 200, "max_connections": 50}}}, "ttl": 20}` + response := http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewReader([]byte(configJson))), + } + mockClient.EXPECT().Do(gomock.Any()).Return(&response, nil).AnyTimes() + cluster.patroni = patroni.New(patroniLogger, mockClient) + + err = cluster.Sync(&cluster.Postgresql) assert.NoError(t, err) - for _, sts := range stsList.Items { - if !(util.MapContains(sts.ObjectMeta.Annotations, inheritedAnnotations)) { - t.Errorf("%s: StatefulSet %v not inherited annotations %#v, got %#v", testName, sts.ObjectMeta.Name, inheritedAnnotations, sts.ObjectMeta.Annotations) + + filterLabels := cluster.labelsSet(false) + listOptions := metav1.ListOptions{ + LabelSelector: filterLabels.String(), + } + + // helper functions + containsAnnotations := func(expected map[string]string, actual map[string]string, objName string, objType string) error { + if expected == nil { + if len(actual) != 0 { + return fmt.Errorf("%s%v expected not to have any annotations, got: %#v", objType, objName, actual) + } + } else if !(reflect.DeepEqual(expected, actual)) { + return fmt.Errorf("%s%v expected annotations: %#v, got: %#v", objType, objName, expected, actual) + } + return nil + } + + checkSts := func(annotations map[string]string, noTemplate bool) error { + stsList, err := client.StatefulSets(namespace).List(context.TODO(), listOptions) + if err != nil { + return err } - // pod template - if !(util.MapContains(sts.Spec.Template.ObjectMeta.Annotations, inheritedAnnotations)) { - t.Errorf("%s: pod template %v not inherited annotations %#v, got %#v", testName, sts.ObjectMeta.Name, inheritedAnnotations, sts.ObjectMeta.Annotations) + + for _, sts := range stsList.Items { + if err := containsAnnotations(annotations, sts.ObjectMeta.Annotations, sts.ObjectMeta.Name, "StatefulSet"); err != nil { + return err + } + if noTemplate { + continue + } + // pod template + if err := containsAnnotations(annotations, sts.Spec.Template.ObjectMeta.Annotations, sts.ObjectMeta.Name, "StatefulSet pod template"); err != nil { + return err + } + // pvc template + if err := containsAnnotations(annotations, sts.Spec.VolumeClaimTemplates[0].Annotations, sts.ObjectMeta.Name, "StatefulSet pvc template"); err != nil { + return err + } } - // pvc template - if !(util.MapContains(sts.Spec.VolumeClaimTemplates[0].Annotations, inheritedAnnotations)) { - t.Errorf("%s: PVC template %v not inherited annotations %#v, got %#v", testName, sts.ObjectMeta.Name, inheritedAnnotations, sts.ObjectMeta.Annotations) + return nil + } + + checkPods := func(annotations map[string]string, labelSelector metav1.ListOptions) error { + podList, err := client.Pods(namespace).List(context.TODO(), labelSelector) + if err != nil { + return err + } + for _, pod := range podList.Items { + if err := containsAnnotations(annotations, pod.ObjectMeta.Annotations, pod.ObjectMeta.Name, "Pod"); err != nil { + return err + } } + return nil } - // check service annotations - cluster.createService(Master) - svcList, err := client.Services(namespace).List(context.TODO(), listOptions) - assert.NoError(t, err) - for _, svc := range svcList.Items { - if !(util.MapContains(svc.ObjectMeta.Annotations, inheritedAnnotations)) { - t.Errorf("%s: Service %v not inherited annotations %#v, got %#v", testName, svc.ObjectMeta.Name, inheritedAnnotations, svc.ObjectMeta.Annotations) + checkSvc := func(annotations map[string]string, labelSelector metav1.ListOptions) error { + svcList, err := client.Services(namespace).List(context.TODO(), labelSelector) + if err != nil { + return err } + for _, svc := range svcList.Items { + if err := containsAnnotations(annotations, svc.ObjectMeta.Annotations, svc.ObjectMeta.Name, "Service"); err != nil { + return err + } + } + return nil } - // check pod disruption budget annotations - cluster.createPodDisruptionBudget() - pdbList, err := client.PodDisruptionBudgets(namespace).List(context.TODO(), listOptions) - assert.NoError(t, err) - for _, pdb := range pdbList.Items { - if !(util.MapContains(pdb.ObjectMeta.Annotations, inheritedAnnotations)) { - t.Errorf("%s: Pod Disruption Budget %v not inherited annotations %#v, got %#v", testName, pdb.ObjectMeta.Name, inheritedAnnotations, pdb.ObjectMeta.Annotations) + checkPdb := func(annotations map[string]string) error { + pdbList, err := client.PodDisruptionBudgets(namespace).List(context.TODO(), listOptions) + if err != nil { + return err + } + for _, pdb := range pdbList.Items { + if err := containsAnnotations(annotations, pdb.ObjectMeta.Annotations, pdb.ObjectMeta.Name, "Pod Disruption Budget"); err != nil { + return err + } + } + return nil + } + + checkPvc := func(annotations map[string]string) error { + pvcs, err := cluster.listPersistentVolumeClaims() + if err != nil { + return err } + for _, pvc := range pvcs { + if err := containsAnnotations(annotations, pvc.ObjectMeta.Annotations, pvc.ObjectMeta.Name, "Volume claim"); err != nil { + return err + } + } + return nil } - // check pooler deployment annotations - cluster.ConnectionPooler = map[PostgresRole]*ConnectionPoolerObjects{} - cluster.ConnectionPooler[role] = &ConnectionPoolerObjects{ - Name: cluster.connectionPoolerName(role), - ClusterName: cluster.Name, - Namespace: cluster.Namespace, - Role: role, + checkPooler := func(annotations map[string]string, noTemplate bool) error { + for _, role := range []PostgresRole{Master, Replica} { + poolerListOptions := metav1.ListOptions{ + LabelSelector: labels.Set(cluster.connectionPoolerLabels(role, true).MatchLabels).String(), + } + + deploy, err := client.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{}) + if err != nil { + return err + } + if err := containsAnnotations(annotations, deploy.Annotations, deploy.Name, "Deployment"); err != nil { + return err + } + if err := checkSvc(annotations, poolerListOptions); err != nil { + return err + } + if err := checkPods(annotations, poolerListOptions); err != nil { + return err + } + if noTemplate { + continue + } + if err := containsAnnotations(annotations, deploy.Spec.Template.Annotations, deploy.Name, "Pooler pod template"); err != nil { + return err + } + } + return nil } - deploy, err := cluster.generateConnectionPoolerDeployment(cluster.ConnectionPooler[role]) - assert.NoError(t, err) - if !(util.MapContains(deploy.ObjectMeta.Annotations, inheritedAnnotations)) { - t.Errorf("%s: Deployment %v not inherited annotations %#v, got %#v", testName, deploy.ObjectMeta.Name, inheritedAnnotations, deploy.ObjectMeta.Annotations) + checkSecrets := func(annotations map[string]string) error { + secretList, err := cluster.KubeClient.Secrets(namespace).List(context.TODO(), listOptions) + if err != nil { + return err + } + for _, secret := range secretList.Items { + if err := containsAnnotations(annotations, secret.Annotations, secret.Name, "Secret"); err != nil { + return err + } + } + return nil } + checkResources := func(resultAnnotations map[string]string, noTemplate bool) error { + if err = checkSts(resultAnnotations, noTemplate); err != nil { + return err + } + if err = checkPods(resultAnnotations, listOptions); err != nil { + return err + } + if err = checkSvc(resultAnnotations, listOptions); err != nil { + return err + } + if err = checkPdb(resultAnnotations); err != nil { + return err + } + if err = checkPooler(resultAnnotations, noTemplate); err != nil { + return err + } + if err = checkPvc(resultAnnotations); err != nil { + return err + } + if err = checkSecrets(resultAnnotations); err != nil { + return err + } + return nil + } + + // Finally, tests! + result := map[string]string{"owned-by": "acid"} + assert.True(t, reflect.DeepEqual(result, cluster.annotationsSet(nil))) + + // 1. Check initial state + err = checkResources(result, false) + assert.NoError(t, err) + + // 2. Check annotation value change + + // 2.1 Sync event + newSpec := cluster.Postgresql.DeepCopy() + newSpec.ObjectMeta.Annotations["owned-by"] = "fooSync" + result["owned-by"] = "fooSync" + // + new PVC + cluster.KubeClient.PersistentVolumeClaims(namespace).Create(context.TODO(), &CreatePVCs(namespace, clusterName, filterLabels, 1, "1Gi").Items[0], metav1.CreateOptions{}) + err = cluster.Sync(newSpec) + assert.NoError(t, err) + + err = checkResources(result, false) + assert.NoError(t, err) + + // 2.2 Update event + newSpec = cluster.Postgresql.DeepCopy() + newSpec.ObjectMeta.Annotations["owned-by"] = "fooUpdate" + result["owned-by"] = "fooUpdate" + // + new PVC + cluster.KubeClient.PersistentVolumeClaims(namespace).Create(context.TODO(), &CreatePVCs(namespace, clusterName, filterLabels, 1, "1Gi").Items[0], metav1.CreateOptions{}) + + err = cluster.Update(cluster.Postgresql.DeepCopy(), newSpec) + assert.NoError(t, err) + + err = checkResources(result, false) + assert.NoError(t, err) + + // 3. Ignored annotations + err = annotateResources(cluster) + assert.NoError(t, err) + err = cluster.Sync(newSpec.DeepCopy()) + assert.NoError(t, err) + err = checkResources(map[string]string{"ignore": "me", "owned-by": "fooUpdate"}, true) + assert.NoError(t, err) + + // 4. Check removal of an inherited annotation + + // 4.1 remove parameter from operator config + cluster.OpConfig.IgnoredAnnotations = nil + cluster.OpConfig.InheritedAnnotations = nil + err = cluster.Sync(newSpec.DeepCopy()) + assert.NoError(t, err) + + err = checkResources(nil, false) + assert.NoError(t, err) + + cluster.OpConfig.InheritedAnnotations = []string{"owned-by"} + err = cluster.Sync(newSpec.DeepCopy()) + assert.NoError(t, err) + + // 4.2 delete value for the inherited annotation + delete(newSpec.Annotations, "owned-by") + err = cluster.Update(cluster.Postgresql.DeepCopy(), newSpec.DeepCopy()) + assert.NoError(t, err) + + err = checkResources(nil, false) + assert.NoError(t, err) } func Test_trimCronjobName(t *testing.T) { diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go index 1a4c7c73f..9190fe9bb 100644 --- a/pkg/cluster/volumes.go +++ b/pkg/cluster/volumes.go @@ -3,6 +3,7 @@ package cluster import ( "context" "fmt" + "maps" "strconv" "strings" @@ -11,7 +12,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/aws/aws-sdk-go/aws" - acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/constants" @@ -44,16 +44,20 @@ func (c *Cluster) syncVolumes() error { } // resize pvc to adjust filesystem size until better K8s support - if err = c.syncVolumeClaims(); err != nil { + if err = c.syncVolumeClaims(false); err != nil { err = fmt.Errorf("could not sync persistent volume claims: %v", err) return err } } else if c.OpConfig.StorageResizeMode == "pvc" { - if err = c.syncVolumeClaims(); err != nil { + if err = c.syncVolumeClaims(false); err != nil { err = fmt.Errorf("could not sync persistent volume claims: %v", err) return err } } else if c.OpConfig.StorageResizeMode == "ebs" { + if err = c.syncVolumeClaims(true); err != nil { + err = fmt.Errorf("could not sync persistent volume claims: %v", err) + return err + } // potentially enlarge volumes before changing the statefulset. By doing that // in this order we make sure the operator is not stuck waiting for a pod that // cannot start because it ran out of disk space. @@ -65,7 +69,10 @@ func (c *Cluster) syncVolumes() error { return err } } else { - c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume sync.") + c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume size sync.") + if err := c.syncVolumeClaims(true); err != nil { + c.logger.Errorf("could not sync persistent volume claims: %v", err) + } } return nil @@ -184,21 +191,49 @@ func (c *Cluster) populateVolumeMetaData() error { } // syncVolumeClaims reads all persistent volume claims and checks that their size matches the one declared in the statefulset. -func (c *Cluster) syncVolumeClaims() error { +func (c *Cluster) syncVolumeClaims(noResize bool) error { c.setProcessName("syncing volume claims") - needsResizing, err := c.volumeClaimsNeedResizing(c.Spec.Volume) + newSize, err := resource.ParseQuantity(c.Spec.Volume.Size) if err != nil { - return fmt.Errorf("could not compare size of the volume claims: %v", err) + return fmt.Errorf("could not parse volume size from the manifest: %v", err) } + manifestSize := quantityToGigabyte(newSize) - if !needsResizing { - c.logger.Infof("volume claims do not require changes") - return nil + pvcs, err := c.listPersistentVolumeClaims() + if err != nil { + return fmt.Errorf("could not receive persistent volume claims: %v", err) } + for _, pvc := range pvcs { + needsUpdate := false + currentSize := quantityToGigabyte(pvc.Spec.Resources.Requests[v1.ResourceStorage]) + if !noResize && currentSize != manifestSize { + if currentSize < manifestSize { + pvc.Spec.Resources.Requests[v1.ResourceStorage] = newSize + needsUpdate = true + c.logger.Debugf("persistent volume claim for volume %q needs to be resized", pvc.Name) + } else { + c.logger.Warningf("cannot shrink persistent volume") + } + } + + newAnnotations := c.annotationsSet(nil) + if changed, reason := c.compareAnnotations(pvc.Annotations, newAnnotations); changed { + maps.Copy(newAnnotations, c.extractIgnoredAnnotations(pvc.Annotations)) + pvc.Annotations = newAnnotations + needsUpdate = true + c.logger.Debugf("persistent volume claim's annotations for volume %q needs to be updated: %s", pvc.Name, reason) + } - if err := c.resizeVolumeClaims(c.Spec.Volume); err != nil { - return fmt.Errorf("could not sync volume claims: %v", err) + if needsUpdate { + c.logger.Debugf("updating persistent volume claim definition for volume %q", pvc.Name) + if _, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("could not update persistent volume claim: %q", err) + } + c.logger.Debugf("successfully updated persistent volume claim %q", pvc.Name) + } else { + c.logger.Debugf("volume claim for volume %q do not require updates", pvc.Name) + } } c.logger.Infof("volume claims have been synced successfully") @@ -261,35 +296,6 @@ func (c *Cluster) deletePersistentVolumeClaims() error { return nil } -func (c *Cluster) resizeVolumeClaims(newVolume acidv1.Volume) error { - c.logger.Debugln("resizing PVCs") - pvcs, err := c.listPersistentVolumeClaims() - if err != nil { - return err - } - newQuantity, err := resource.ParseQuantity(newVolume.Size) - if err != nil { - return fmt.Errorf("could not parse volume size: %v", err) - } - newSize := quantityToGigabyte(newQuantity) - for _, pvc := range pvcs { - volumeSize := quantityToGigabyte(pvc.Spec.Resources.Requests[v1.ResourceStorage]) - if volumeSize >= newSize { - if volumeSize > newSize { - c.logger.Warningf("cannot shrink persistent volume") - } - continue - } - pvc.Spec.Resources.Requests[v1.ResourceStorage] = newQuantity - c.logger.Debugf("updating persistent volume claim definition for volume %q", pvc.Name) - if _, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("could not update persistent volume claim: %q", err) - } - c.logger.Debugf("successfully updated persistent volume claim %q", pvc.Name) - } - return nil -} - func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) { result := make([]*v1.PersistentVolume, 0) @@ -406,25 +412,6 @@ func (c *Cluster) resizeVolumes() error { return nil } -func (c *Cluster) volumeClaimsNeedResizing(newVolume acidv1.Volume) (bool, error) { - newSize, err := resource.ParseQuantity(newVolume.Size) - manifestSize := quantityToGigabyte(newSize) - if err != nil { - return false, fmt.Errorf("could not parse volume size from the manifest: %v", err) - } - pvcs, err := c.listPersistentVolumeClaims() - if err != nil { - return false, fmt.Errorf("could not receive persistent volume claims: %v", err) - } - for _, pvc := range pvcs { - currentSize := quantityToGigabyte(pvc.Spec.Resources.Requests[v1.ResourceStorage]) - if currentSize != manifestSize { - return true, nil - } - } - return false, nil -} - func (c *Cluster) volumesNeedResizing() (bool, error) { newQuantity, _ := resource.ParseQuantity(c.Spec.Volume.Size) newSize := quantityToGigabyte(newQuantity) diff --git a/pkg/cluster/volumes_test.go b/pkg/cluster/volumes_test.go index 4ef94fcfb..329224893 100644 --- a/pkg/cluster/volumes_test.go +++ b/pkg/cluster/volumes_test.go @@ -74,6 +74,7 @@ func TestResizeVolumeClaim(t *testing.T) { cluster.Name = clusterName cluster.Namespace = namespace filterLabels := cluster.labelsSet(false) + cluster.Spec.Volume.Size = newVolumeSize // define and create PVCs for 1Gi volumes pvcList := CreatePVCs(namespace, clusterName, filterLabels, 2, "1Gi") @@ -85,7 +86,7 @@ func TestResizeVolumeClaim(t *testing.T) { } // test resizing - cluster.resizeVolumeClaims(acidv1.Volume{Size: newVolumeSize}) + cluster.syncVolumes() pvcs, err := cluster.listPersistentVolumeClaims() assert.NoError(t, err) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 66c30dede..3db1122a9 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -3,7 +3,6 @@ package k8sutil import ( "context" "fmt" - "reflect" b64 "encoding/base64" "encoding/json" @@ -17,7 +16,6 @@ import ( "github.com/zalando/postgres-operator/pkg/spec" apiappsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" - apipolicyv1 "k8s.io/api/policy/v1" apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -242,17 +240,6 @@ func (client *KubernetesClient) SetFinalizer(clusterName spec.NamespacedName, pg return updatedPg, nil } -// SamePDB compares the PodDisruptionBudgets -func SamePDB(cur, new *apipolicyv1.PodDisruptionBudget) (match bool, reason string) { - //TODO: improve comparison - match = reflect.DeepEqual(new.Spec, cur.Spec) - if !match { - reason = "new PDB spec does not match the current one" - } - - return -} - func (c *mockSecret) Get(ctx context.Context, name string, options metav1.GetOptions) (*v1.Secret, error) { oldFormatSecret := &v1.Secret{} oldFormatSecret.Name = "testcluster" From 12a41130fee6a45276b5ce6b344b69aef62cea6e Mon Sep 17 00:00:00 2001 From: Polina Bungina Date: Sat, 8 Jun 2024 23:52:22 +0200 Subject: [PATCH 2/9] Only patch annotations + tests refactoring --- manifests/operator-service-account-rbac.yaml | 2 +- pkg/cluster/cluster.go | 14 - pkg/cluster/cluster_test.go | 199 ---------- pkg/cluster/connection_pooler.go | 54 ++- pkg/cluster/resources.go | 50 ++- pkg/cluster/sync.go | 40 +- pkg/cluster/util_test.go | 398 +++++++++---------- pkg/cluster/volumes.go | 22 +- 8 files changed, 277 insertions(+), 502 deletions(-) diff --git a/manifests/operator-service-account-rbac.yaml b/manifests/operator-service-account-rbac.yaml index 1013ed9ba..97629ee95 100644 --- a/manifests/operator-service-account-rbac.yaml +++ b/manifests/operator-service-account-rbac.yaml @@ -102,6 +102,7 @@ rules: - delete - get - update + - patch # to check nodes for node readiness label - apiGroups: - "" @@ -173,7 +174,6 @@ rules: - get - list - patch - - update # to CRUD cron jobs for logical backups - apiGroups: - batch diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index ad27dd320..65f2921f3 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -764,16 +764,6 @@ func (c *Cluster) compareAnnotations(old, new map[string]string) (bool, string) } -func (c *Cluster) extractIgnoredAnnotations(annoList map[string]string) map[string]string { - result := make(map[string]string) - for _, ignore := range c.OpConfig.IgnoredAnnotations { - if _, ok := annoList[ignore]; ok { - result[ignore] = annoList[ignore] - } - } - return result -} - func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) { if old.Spec.Type != new.Spec.Type { return false, fmt.Sprintf("new service's type %q does not match the current one %q", @@ -790,10 +780,6 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) { } } - if changed, reason := c.compareAnnotations(old.Annotations, new.Annotations); changed { - return !changed, "new service's annotations does not match the current one:" + reason - } - return true, "" } diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 9c6587746..e7d38928b 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -1443,205 +1443,6 @@ func TestCompareServices(t *testing.T) { match: false, reason: `new service's LoadBalancerSourceRange does not match the current one`, }, - { - about: "services differ on DNS annotation", - current: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "new_clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - reason: `new service's annotations does not match the current one: "external-dns.alpha.kubernetes.io/hostname" changed from "clstr.acid.zalan.do" to "new_clstr.acid.zalan.do".`, - }, - { - about: "services differ on AWS ELB annotation", - current: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: "1800", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - reason: `new service's annotations does not match the current one: "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" changed from "3600" to "1800".`, - }, - { - about: "service changes existing annotation", - current: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "foo": "bar", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "foo": "baz", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - reason: `new service's annotations does not match the current one: "foo" changed from "bar" to "baz".`, - }, - { - about: "service changes multiple existing annotations", - current: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "foo": "bar", - "bar": "foo", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "foo": "baz", - "bar": "fooz", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - // Test just the prefix to avoid flakiness and map sorting - reason: `new service's annotations does not match the current one:`, - }, - { - about: "service adds a new custom annotation", - current: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "foo": "bar", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - reason: `new service's annotations does not match the current one: Added "foo" with value "bar".`, - }, - { - about: "service removes a custom annotation", - current: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "foo": "bar", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - reason: `new service's annotations does not match the current one: Removed "foo".`, - }, - { - about: "service removes a custom annotation and adds a new one", - current: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "foo": "bar", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "bar": "foo", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - reason: `new service's annotations does not match the current one: Removed "foo". Added "bar" with value "foo".`, - }, - { - about: "service removes a custom annotation, adds a new one and change another", - current: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "foo": "bar", - "zalan": "do", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "bar": "foo", - "zalan": "do.com", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - // Test just the prefix to avoid flakiness and map sorting - reason: `new service's annotations does not match the current one: Removed "foo".`, - }, - { - about: "service add annotations", - current: newService( - map[string]string{}, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - // Test just the prefix to avoid flakiness and map sorting - reason: `new service's annotations does not match the current one: Added `, - }, - { - about: "ignored annotations", - current: newService( - map[string]string{}, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - "k8s.v1.cni.cncf.io/network-status": "up", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: true, - }, } for _, tt := range tests { diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index a79e45616..48f4ea849 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -10,7 +10,6 @@ import ( "github.com/sirupsen/logrus" acidzalando "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" - "golang.org/x/exp/maps" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -692,6 +691,26 @@ func updateConnectionPoolerDeployment(KubeClient k8sutil.KubernetesClient, newDe return deployment, nil } +// patchConnectionPoolerAnnotations updates the annotations of connection pooler deployment +func patchConnectionPoolerAnnotations(KubeClient k8sutil.KubernetesClient, deployment *appsv1.Deployment, annotations map[string]string) (*appsv1.Deployment, error) { + patchData, err := metaAnnotationsPatch(annotations) + if err != nil { + return nil, fmt.Errorf("could not form patch for the connection pooler deployment metadata: %v", err) + } + result, err := KubeClient.Deployments(deployment.Namespace).Patch( + context.TODO(), + deployment.Name, + types.MergePatchType, + []byte(patchData), + metav1.PatchOptions{}, + "") + if err != nil { + return nil, fmt.Errorf("could not patch connection pooler annotations %q: %v", patchData, err) + } + return result, nil + +} + // Test if two connection pooler configuration needs to be synced. For simplicity // compare not the actual K8S objects, but the configuration itself and request // sync if there is any difference. @@ -1009,18 +1028,6 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql syncReason = append(syncReason, []string{"new connection pooler's pod template annotations do not match the current one: " + reason}...) deployment.Spec.Template.Annotations = newPodAnnotations } - newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations - ignoredAnnotations := c.extractIgnoredAnnotations(deployment.Annotations) - maps.Copy(newAnnotations, ignoredAnnotations) - if changed, reason := c.compareAnnotations(deployment.Annotations, newAnnotations); changed { - deployment.Annotations = newAnnotations - c.logger.Infof("new connection pooler deployments's annotations does not match the current one:" + reason) - c.logger.Debug("updating connection pooler deployments's annotations") - deployment, err = c.KubeClient.Deployments(deployment.Namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{}) - if err != nil { - return nil, fmt.Errorf("could not update connection pooler annotations %q: %v", deployment.Name, err) - } - } defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(&c.Config, newConnectionPooler, deployment) syncReason = append(syncReason, defaultsReason...) @@ -1040,6 +1047,15 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql } c.ConnectionPooler[role].Deployment = deployment } + + newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations + if changed, _ := c.compareAnnotations(deployment.Annotations, newAnnotations); changed { + deployment, err = patchConnectionPoolerAnnotations(c.KubeClient, deployment, newAnnotations) + if err != nil { + return nil, err + } + c.ConnectionPooler[role].Deployment = deployment + } } // check if pooler pods must be replaced due to secret update @@ -1068,13 +1084,13 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql return nil, fmt.Errorf("could not delete pooler pod: %v", err) } } else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed { - newAnnotations := c.extractIgnoredAnnotations(pod.Annotations) - maps.Copy(newAnnotations, deployment.Spec.Template.Annotations) - pod.Annotations = newAnnotations - c.logger.Debugf("updating annotations for connection pooler's pod %q", pod.Name) - _, err := c.KubeClient.Pods(pod.Namespace).Update(context.TODO(), &pod, metav1.UpdateOptions{}) + patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations) + if err != nil { + return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err) + } + _, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) if err != nil { - return nil, fmt.Errorf("could not update annotations for pod %q: %v", pod.Name, err) + return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err) } } } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index d254317cc..8c97dc6a2 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -286,32 +286,38 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { } func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newService *v1.Service) (*v1.Service, error) { - var ( - svc *v1.Service - err error - ) + var err error + svc := oldService + serviceName := util.NameFromMeta(oldService.ObjectMeta) match, reason := c.compareServices(oldService, newService) - if match { - return oldService, nil + if !match { + c.logServiceChanges(role, oldService, newService, false, reason) + c.setProcessName("updating %v service", role) + + // now, patch the service spec, but when disabling LoadBalancers do update instead + // patch does not work because of LoadBalancerSourceRanges field (even if set to nil) + oldServiceType := oldService.Spec.Type + newServiceType := newService.Spec.Type + if newServiceType == "ClusterIP" && newServiceType != oldServiceType { + newService.ResourceVersion = oldService.ResourceVersion + newService.Spec.ClusterIP = oldService.Spec.ClusterIP + } + svc, err = c.KubeClient.Services(serviceName.Namespace).Update(context.TODO(), newService, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("could not update service %q: %v", serviceName, err) + } } - c.logServiceChanges(role, oldService, newService, false, reason) - c.setProcessName("updating %v service", role) - - serviceName := util.NameFromMeta(oldService.ObjectMeta) - - // now, patch the service spec, but when disabling LoadBalancers do update instead - // patch does not work because of LoadBalancerSourceRanges field (even if set to nil) - oldServiceType := oldService.Spec.Type - newServiceType := newService.Spec.Type - if newServiceType == "ClusterIP" && newServiceType != oldServiceType { - newService.ResourceVersion = oldService.ResourceVersion - newService.Spec.ClusterIP = oldService.Spec.ClusterIP - } - svc, err = c.KubeClient.Services(serviceName.Namespace).Update(context.TODO(), newService, metav1.UpdateOptions{}) - if err != nil { - return nil, fmt.Errorf("could not update service %q: %v", serviceName, err) + if changed, _ := c.compareAnnotations(oldService.Annotations, newService.Annotations); changed { + patchData, err := metaAnnotationsPatch(newService.Annotations) + if err != nil { + return nil, fmt.Errorf("could not form patch for service %q annotations: %v", oldService.Name, err) + } + svc, err = c.KubeClient.Services(serviceName.Namespace).Patch(context.TODO(), newService.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return nil, fmt.Errorf("could not patch annotations for service %q: %v", oldService.Name, err) + } } return svc, nil diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index bd1a9e596..7de8fd4a8 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "maps" "reflect" "regexp" "strconv" @@ -21,6 +20,7 @@ import ( v1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) var requirePrimaryRestartWhenDecreased = []string{ @@ -201,8 +201,6 @@ func (c *Cluster) syncService(role PostgresRole) error { if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.serviceName(role), metav1.GetOptions{}); err == nil { c.Services[role] = svc desiredSvc := c.generateService(role, &c.Spec) - ignoredAnnotations := c.extractIgnoredAnnotations(svc.Annotations) - maps.Copy(desiredSvc.Annotations, ignoredAnnotations) updatedSvc, err := c.updateService(role, svc, desiredSvc) if err != nil { return fmt.Errorf("could not update %s service to match desired state: %v", role, err) @@ -275,8 +273,6 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil { c.PodDisruptionBudget = pdb newPDB := c.generatePodDisruptionBudget() - ignoredAnnotations := c.extractIgnoredAnnotations(pdb.Annotations) - maps.Copy(newPDB.Annotations, ignoredAnnotations) match, reason := c.ComparePodDisruptionBudget(pdb, newPDB) if !match { c.logPDBChanges(pdb, newPDB, isUpdate, reason) @@ -365,8 +361,6 @@ func (c *Cluster) syncStatefulSet(force bool) error { if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } - ignoredAnnotations := c.extractIgnoredAnnotations(sset.Annotations) - maps.Copy(desiredSts.Annotations, ignoredAnnotations) if reflect.DeepEqual(sset, desiredSts) && !force { return nil } @@ -395,12 +389,13 @@ func (c *Cluster) syncStatefulSet(force bool) error { if !cmp.rollingUpdate { for _, pod := range pods { if changed, _ := c.compareAnnotations(pod.Annotations, desiredSts.Spec.Template.Annotations); changed { - ignoredAnnotations := c.extractIgnoredAnnotations(pod.Annotations) - maps.Copy(ignoredAnnotations, desiredSts.Spec.Template.Annotations) - pod.Annotations = ignoredAnnotations - c.logger.Debugf("updating annotations for pod %q", pod.Name) - if _, err := c.KubeClient.Pods(pod.Namespace).Update(context.TODO(), &pod, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("could not update annotations for pod %q: %v", pod.Name, err) + patchData, err := metaAnnotationsPatch(desiredSts.Spec.Template.Annotations) + if err != nil { + return fmt.Errorf("could not form patch for pod %q annotations: %v", pod.Name, err) + } + _, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("could not patch annotations for pod %q: %v", pod.Name, err) } } } @@ -959,14 +954,6 @@ func (c *Cluster) updateSecret( userMap[userKey] = pwdUser } - ignoredAnnotations := c.extractIgnoredAnnotations(secret.Annotations) - maps.Copy(generatedSecret.Annotations, ignoredAnnotations) - if changed, reason := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations); changed { - c.logger.Infof("%q secret's annotations does not match the current one: %s", secretName, reason) - secret.Annotations = generatedSecret.Annotations - updateSecret = true - } - if updateSecret { c.logger.Debugln(updateSecretMsg) if _, err = c.KubeClient.Secrets(secret.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{}); err != nil { @@ -975,6 +962,17 @@ func (c *Cluster) updateSecret( c.Secrets[secret.UID] = secret } + if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations); changed { + patchData, err := metaAnnotationsPatch(generatedSecret.Annotations) + if err != nil { + return fmt.Errorf("could not form patch for secret %q annotations: %v", secret.Name, err) + } + _, err = c.KubeClient.Secrets(secret.Namespace).Patch(context.TODO(), secret.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("could not patch annotations for secret %q: %v", secret.Name, err) + } + } + return nil } diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index d958b71e0..0af6c748c 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "maps" "net/http" "reflect" "testing" @@ -24,6 +25,8 @@ import ( k8sFake "k8s.io/client-go/kubernetes/fake" ) +var externalAnnotations = map[string]string{"existing": "annotation"} + func newFakeK8sAnnotationsClient() (k8sutil.KubernetesClient, *k8sFake.Clientset) { clientSet := k8sFake.NewSimpleClientset() acidClientSet := fakeacidv1.NewSimpleClientset() @@ -42,6 +45,152 @@ func newFakeK8sAnnotationsClient() (k8sutil.KubernetesClient, *k8sFake.Clientset }, clientSet } +func clusterLabelsOptions(cluster *Cluster) metav1.ListOptions { + clusterLabel := labels.Set(map[string]string{cluster.OpConfig.ClusterNameLabel: cluster.Name}) + return metav1.ListOptions{ + LabelSelector: clusterLabel.String(), + } +} + +func checkResourcesInheritedAnnotations(cluster *Cluster, resultAnnotations map[string]string) error { + clusterOptions := clusterLabelsOptions(cluster) + // helper functions + containsAnnotations := func(expected map[string]string, actual map[string]string, objName string, objType string) error { + if expected == nil { + if len(actual) != 0 { + return fmt.Errorf("%s %v expected not to have any annotations, got: %#v", objType, objName, actual) + } + } else if !(reflect.DeepEqual(expected, actual)) { + return fmt.Errorf("%s %v expected annotations: %#v, got: %#v", objType, objName, expected, actual) + } + return nil + } + + updateAnnotations := func(annotations map[string]string) map[string]string { + result := make(map[string]string, 0) + for anno := range annotations { + if _, ok := externalAnnotations[anno]; !ok { + result[anno] = annotations[anno] + } + } + return result + } + + checkSts := func(annotations map[string]string) error { + stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + stsAnnotations := updateAnnotations(annotations) + + for _, sts := range stsList.Items { + if err := containsAnnotations(stsAnnotations, sts.Annotations, sts.ObjectMeta.Name, "StatefulSet"); err != nil { + return err + } + // pod template + if err := containsAnnotations(stsAnnotations, sts.Spec.Template.Annotations, sts.ObjectMeta.Name, "StatefulSet pod template"); err != nil { + return err + } + // pvc template + if err := containsAnnotations(stsAnnotations, sts.Spec.VolumeClaimTemplates[0].Annotations, sts.ObjectMeta.Name, "StatefulSet pvc template"); err != nil { + return err + } + } + return nil + } + + checkPods := func(annotations map[string]string) error { + podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, pod := range podList.Items { + if err := containsAnnotations(annotations, pod.Annotations, pod.ObjectMeta.Name, "Pod"); err != nil { + return err + } + } + return nil + } + + checkSvc := func(annotations map[string]string) error { + svcList, err := cluster.KubeClient.Services(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, svc := range svcList.Items { + if err := containsAnnotations(annotations, svc.Annotations, svc.ObjectMeta.Name, "Service"); err != nil { + return err + } + } + return nil + } + + checkPdb := func(annotations map[string]string) error { + pdbList, err := cluster.KubeClient.PodDisruptionBudgets(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, pdb := range pdbList.Items { + if err := containsAnnotations(updateAnnotations(annotations), pdb.Annotations, pdb.ObjectMeta.Name, "Pod Disruption Budget"); err != nil { + return err + } + } + return nil + } + + checkPvc := func(annotations map[string]string) error { + pvcList, err := cluster.KubeClient.PersistentVolumeClaims(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, pvc := range pvcList.Items { + if err := containsAnnotations(annotations, pvc.Annotations, pvc.ObjectMeta.Name, "Volume claim"); err != nil { + return err + } + } + return nil + } + + checkPooler := func(annotations map[string]string) error { + for _, role := range []PostgresRole{Master, Replica} { + deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{}) + if err != nil { + return err + } + if err := containsAnnotations(annotations, deploy.Annotations, deploy.Name, "Deployment"); err != nil { + return err + } + if err := containsAnnotations(updateAnnotations(annotations), deploy.Spec.Template.Annotations, deploy.Name, "Pooler pod template"); err != nil { + return err + } + } + return nil + } + + checkSecrets := func(annotations map[string]string) error { + secretList, err := cluster.KubeClient.Secrets(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, secret := range secretList.Items { + if err := containsAnnotations(annotations, secret.Annotations, secret.Name, "Secret"); err != nil { + return err + } + } + return nil + } + + checkFuncs := []func(map[string]string) error{ + checkSts, checkPods, checkSvc, checkPdb, checkPooler, checkPvc, checkSecrets, + } + for _, f := range checkFuncs { + if err := f(resultAnnotations); err != nil { + return err + } + } + return nil +} + func createPods(cluster *Cluster) []v1.Pod { podsList := make([]v1.Pod, 0) for i, role := range []PostgresRole{Master, Replica} { @@ -74,7 +223,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, Name: clusterName, Annotations: map[string]string{ "owned-by": "acid", - "foo": "bar", + "foo": "bar", // should not be inherited }, }, Spec: acidv1.PostgresSpec{ @@ -108,7 +257,6 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, DefaultMemoryRequest: "300Mi", DefaultMemoryLimit: "300Mi", InheritedAnnotations: []string{"owned-by"}, - IgnoredAnnotations: []string{"ignore"}, PodRoleLabel: "spilo-role", ResourceCheckInterval: time.Duration(testResourceCheckInterval), ResourceCheckTimeout: time.Duration(testResourceCheckTimeout), @@ -135,21 +283,26 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, } pvcList := CreatePVCs(namespace, clusterName, cluster.labelsSet(false), 2, "1Gi") for _, pvc := range pvcList.Items { - cluster.KubeClient.PersistentVolumeClaims(namespace).Create(context.TODO(), &pvc, metav1.CreateOptions{}) + _, err = cluster.KubeClient.PersistentVolumeClaims(namespace).Create(context.TODO(), &pvc, metav1.CreateOptions{}) + if err != nil { + return nil, err + } } podsList := createPods(cluster) for _, pod := range podsList { - cluster.KubeClient.Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{}) + _, err = cluster.KubeClient.Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{}) + if err != nil { + return nil, err + } } return cluster, nil } func annotateResources(cluster *Cluster) error { - externalAnnotations := map[string]string{"ignore": "me", "do-not-ignore": "me"} - listOptions := metav1.ListOptions{LabelSelector: cluster.labelsSet(false).String()} + clusterOptions := clusterLabelsOptions(cluster) - stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), listOptions) + stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) if err != nil { return err } @@ -160,11 +313,10 @@ func annotateResources(cluster *Cluster) error { } } - podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), listOptions) + podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions) if err != nil { return err } - for _, pod := range podList.Items { pod.Annotations = externalAnnotations if _, err = cluster.KubeClient.Pods(namespace).Update(context.TODO(), &pod, metav1.UpdateOptions{}); err != nil { @@ -172,7 +324,7 @@ func annotateResources(cluster *Cluster) error { } } - svcList, err := cluster.KubeClient.Services(namespace).List(context.TODO(), listOptions) + svcList, err := cluster.KubeClient.Services(namespace).List(context.TODO(), clusterOptions) if err != nil { return err } @@ -183,7 +335,7 @@ func annotateResources(cluster *Cluster) error { } } - pdbList, err := cluster.KubeClient.PodDisruptionBudgets(namespace).List(context.TODO(), listOptions) + pdbList, err := cluster.KubeClient.PodDisruptionBudgets(namespace).List(context.TODO(), clusterOptions) if err != nil { return err } @@ -195,7 +347,7 @@ func annotateResources(cluster *Cluster) error { } } - pvcList, err := cluster.KubeClient.PersistentVolumeClaims(namespace).List(context.TODO(), listOptions) + pvcList, err := cluster.KubeClient.PersistentVolumeClaims(namespace).List(context.TODO(), clusterOptions) if err != nil { return err } @@ -215,21 +367,9 @@ func annotateResources(cluster *Cluster) error { if _, err = cluster.KubeClient.Deployments(namespace).Update(context.TODO(), deploy, metav1.UpdateOptions{}); err != nil { return err } - poolerPodList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), metav1.ListOptions{ - LabelSelector: labels.Set(cluster.connectionPoolerLabels(role, true).MatchLabels).String(), - }) - if err != nil { - return err - } - for _, pod := range poolerPodList.Items { - pod.Annotations = externalAnnotations - if _, err = cluster.KubeClient.Pods(namespace).Update(context.TODO(), &pod, metav1.UpdateOptions{}); err != nil { - return err - } - } } - secrets, err := cluster.KubeClient.Secrets(namespace).List(context.TODO(), listOptions) + secrets, err := cluster.KubeClient.Secrets(namespace).List(context.TODO(), clusterOptions) if err != nil { return err } @@ -264,230 +404,54 @@ func TestInheritedAnnotations(t *testing.T) { assert.NoError(t, err) filterLabels := cluster.labelsSet(false) - listOptions := metav1.ListOptions{ - LabelSelector: filterLabels.String(), - } - - // helper functions - containsAnnotations := func(expected map[string]string, actual map[string]string, objName string, objType string) error { - if expected == nil { - if len(actual) != 0 { - return fmt.Errorf("%s%v expected not to have any annotations, got: %#v", objType, objName, actual) - } - } else if !(reflect.DeepEqual(expected, actual)) { - return fmt.Errorf("%s%v expected annotations: %#v, got: %#v", objType, objName, expected, actual) - } - return nil - } - - checkSts := func(annotations map[string]string, noTemplate bool) error { - stsList, err := client.StatefulSets(namespace).List(context.TODO(), listOptions) - if err != nil { - return err - } - - for _, sts := range stsList.Items { - if err := containsAnnotations(annotations, sts.ObjectMeta.Annotations, sts.ObjectMeta.Name, "StatefulSet"); err != nil { - return err - } - if noTemplate { - continue - } - // pod template - if err := containsAnnotations(annotations, sts.Spec.Template.ObjectMeta.Annotations, sts.ObjectMeta.Name, "StatefulSet pod template"); err != nil { - return err - } - // pvc template - if err := containsAnnotations(annotations, sts.Spec.VolumeClaimTemplates[0].Annotations, sts.ObjectMeta.Name, "StatefulSet pvc template"); err != nil { - return err - } - } - return nil - } - - checkPods := func(annotations map[string]string, labelSelector metav1.ListOptions) error { - podList, err := client.Pods(namespace).List(context.TODO(), labelSelector) - if err != nil { - return err - } - for _, pod := range podList.Items { - if err := containsAnnotations(annotations, pod.ObjectMeta.Annotations, pod.ObjectMeta.Name, "Pod"); err != nil { - return err - } - } - return nil - } - - checkSvc := func(annotations map[string]string, labelSelector metav1.ListOptions) error { - svcList, err := client.Services(namespace).List(context.TODO(), labelSelector) - if err != nil { - return err - } - for _, svc := range svcList.Items { - if err := containsAnnotations(annotations, svc.ObjectMeta.Annotations, svc.ObjectMeta.Name, "Service"); err != nil { - return err - } - } - return nil - } - - checkPdb := func(annotations map[string]string) error { - pdbList, err := client.PodDisruptionBudgets(namespace).List(context.TODO(), listOptions) - if err != nil { - return err - } - for _, pdb := range pdbList.Items { - if err := containsAnnotations(annotations, pdb.ObjectMeta.Annotations, pdb.ObjectMeta.Name, "Pod Disruption Budget"); err != nil { - return err - } - } - return nil - } - - checkPvc := func(annotations map[string]string) error { - pvcs, err := cluster.listPersistentVolumeClaims() - if err != nil { - return err - } - for _, pvc := range pvcs { - if err := containsAnnotations(annotations, pvc.ObjectMeta.Annotations, pvc.ObjectMeta.Name, "Volume claim"); err != nil { - return err - } - } - return nil - } - - checkPooler := func(annotations map[string]string, noTemplate bool) error { - for _, role := range []PostgresRole{Master, Replica} { - poolerListOptions := metav1.ListOptions{ - LabelSelector: labels.Set(cluster.connectionPoolerLabels(role, true).MatchLabels).String(), - } - - deploy, err := client.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{}) - if err != nil { - return err - } - if err := containsAnnotations(annotations, deploy.Annotations, deploy.Name, "Deployment"); err != nil { - return err - } - if err := checkSvc(annotations, poolerListOptions); err != nil { - return err - } - if err := checkPods(annotations, poolerListOptions); err != nil { - return err - } - if noTemplate { - continue - } - if err := containsAnnotations(annotations, deploy.Spec.Template.Annotations, deploy.Name, "Pooler pod template"); err != nil { - return err - } - } - return nil - } - - checkSecrets := func(annotations map[string]string) error { - secretList, err := cluster.KubeClient.Secrets(namespace).List(context.TODO(), listOptions) - if err != nil { - return err - } - for _, secret := range secretList.Items { - if err := containsAnnotations(annotations, secret.Annotations, secret.Name, "Secret"); err != nil { - return err - } - } - return nil - } - - checkResources := func(resultAnnotations map[string]string, noTemplate bool) error { - if err = checkSts(resultAnnotations, noTemplate); err != nil { - return err - } - if err = checkPods(resultAnnotations, listOptions); err != nil { - return err - } - if err = checkSvc(resultAnnotations, listOptions); err != nil { - return err - } - if err = checkPdb(resultAnnotations); err != nil { - return err - } - if err = checkPooler(resultAnnotations, noTemplate); err != nil { - return err - } - if err = checkPvc(resultAnnotations); err != nil { - return err - } - if err = checkSecrets(resultAnnotations); err != nil { - return err - } - return nil - } // Finally, tests! result := map[string]string{"owned-by": "acid"} assert.True(t, reflect.DeepEqual(result, cluster.annotationsSet(nil))) // 1. Check initial state - err = checkResources(result, false) + err = checkResourcesInheritedAnnotations(cluster, result) assert.NoError(t, err) // 2. Check annotation value change // 2.1 Sync event newSpec := cluster.Postgresql.DeepCopy() - newSpec.ObjectMeta.Annotations["owned-by"] = "fooSync" + newSpec.Annotations["owned-by"] = "fooSync" result["owned-by"] = "fooSync" - // + new PVC - cluster.KubeClient.PersistentVolumeClaims(namespace).Create(context.TODO(), &CreatePVCs(namespace, clusterName, filterLabels, 1, "1Gi").Items[0], metav1.CreateOptions{}) + err = cluster.Sync(newSpec) assert.NoError(t, err) + err = checkResourcesInheritedAnnotations(cluster, result) + assert.NoError(t, err) - err = checkResources(result, false) + // + existing PVC without annotations + cluster.KubeClient.PersistentVolumeClaims(namespace).Create(context.TODO(), &CreatePVCs(namespace, clusterName, filterLabels, 3, "1Gi").Items[2], metav1.CreateOptions{}) + err = cluster.Sync(newSpec) + assert.NoError(t, err) + err = checkResourcesInheritedAnnotations(cluster, result) assert.NoError(t, err) // 2.2 Update event newSpec = cluster.Postgresql.DeepCopy() - newSpec.ObjectMeta.Annotations["owned-by"] = "fooUpdate" + newSpec.Annotations["owned-by"] = "fooUpdate" result["owned-by"] = "fooUpdate" // + new PVC - cluster.KubeClient.PersistentVolumeClaims(namespace).Create(context.TODO(), &CreatePVCs(namespace, clusterName, filterLabels, 1, "1Gi").Items[0], metav1.CreateOptions{}) + cluster.KubeClient.PersistentVolumeClaims(namespace).Create(context.TODO(), &CreatePVCs(namespace, clusterName, filterLabels, 4, "1Gi").Items[3], metav1.CreateOptions{}) err = cluster.Update(cluster.Postgresql.DeepCopy(), newSpec) assert.NoError(t, err) - err = checkResources(result, false) + err = checkResourcesInheritedAnnotations(cluster, result) assert.NoError(t, err) - // 3. Ignored annotations + // 3. Existing annotations (should not be removed) err = annotateResources(cluster) assert.NoError(t, err) + maps.Copy(result, externalAnnotations) err = cluster.Sync(newSpec.DeepCopy()) assert.NoError(t, err) - err = checkResources(map[string]string{"ignore": "me", "owned-by": "fooUpdate"}, true) - assert.NoError(t, err) - - // 4. Check removal of an inherited annotation - - // 4.1 remove parameter from operator config - cluster.OpConfig.IgnoredAnnotations = nil - cluster.OpConfig.InheritedAnnotations = nil - err = cluster.Sync(newSpec.DeepCopy()) - assert.NoError(t, err) - - err = checkResources(nil, false) - assert.NoError(t, err) - - cluster.OpConfig.InheritedAnnotations = []string{"owned-by"} - err = cluster.Sync(newSpec.DeepCopy()) - assert.NoError(t, err) - - // 4.2 delete value for the inherited annotation - delete(newSpec.Annotations, "owned-by") - err = cluster.Update(cluster.Postgresql.DeepCopy(), newSpec.DeepCopy()) - assert.NoError(t, err) - - err = checkResources(nil, false) + err = checkResourcesInheritedAnnotations(cluster, result) assert.NoError(t, err) } diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go index 9190fe9bb..b48863790 100644 --- a/pkg/cluster/volumes.go +++ b/pkg/cluster/volumes.go @@ -3,13 +3,13 @@ package cluster import ( "context" "fmt" - "maps" "strconv" "strings" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "github.com/aws/aws-sdk-go/aws" "github.com/zalando/postgres-operator/pkg/spec" @@ -217,14 +217,6 @@ func (c *Cluster) syncVolumeClaims(noResize bool) error { } } - newAnnotations := c.annotationsSet(nil) - if changed, reason := c.compareAnnotations(pvc.Annotations, newAnnotations); changed { - maps.Copy(newAnnotations, c.extractIgnoredAnnotations(pvc.Annotations)) - pvc.Annotations = newAnnotations - needsUpdate = true - c.logger.Debugf("persistent volume claim's annotations for volume %q needs to be updated: %s", pvc.Name, reason) - } - if needsUpdate { c.logger.Debugf("updating persistent volume claim definition for volume %q", pvc.Name) if _, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil { @@ -234,6 +226,18 @@ func (c *Cluster) syncVolumeClaims(noResize bool) error { } else { c.logger.Debugf("volume claim for volume %q do not require updates", pvc.Name) } + + newAnnotations := c.annotationsSet(nil) + if changed, _ := c.compareAnnotations(pvc.Annotations, newAnnotations); changed { + patchData, err := metaAnnotationsPatch(newAnnotations) + if err != nil { + return fmt.Errorf("could not form patch for the persistent volume claim for volume %q: %v", pvc.Name, err) + } + _, err = c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Patch(context.TODO(), pvc.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("could not patch annotations of the persistent volume claim for volume %q: %v", pvc.Name, err) + } + } } c.logger.Infof("volume claims have been synced successfully") From ac3990c543fea074eec4f7d6b31891e56fe7f800 Mon Sep 17 00:00:00 2001 From: Polina Bungina Date: Mon, 10 Jun 2024 11:56:59 +0200 Subject: [PATCH 3/9] Annotate logical backup jobs/pods, endpoints --- pkg/cluster/cluster.go | 6 ++++++ pkg/cluster/k8sres.go | 7 ++++--- pkg/cluster/sync.go | 22 +++++++++++++++++++++- pkg/cluster/util_test.go | 29 ++++++++++++++++++++++++++++- 4 files changed, 59 insertions(+), 5 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 65f2921f3..787b43a8b 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -797,6 +797,12 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool newImage, curImage) } + newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations + curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations + if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed { + return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason) + } + newPgVersion := getPgVersion(new) curPgVersion := getPgVersion(cur) if newPgVersion != curPgVersion { diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 64d85ab37..ed049659b 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -2048,9 +2048,10 @@ func (c *Cluster) getCustomServiceAnnotations(role PostgresRole, spec *acidv1.Po func (c *Cluster) generateEndpoint(role PostgresRole, subsets []v1.EndpointSubset) *v1.Endpoints { endpoints := &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ - Name: c.endpointName(role), - Namespace: c.Namespace, - Labels: c.roleLabelsSet(true, role), + Name: c.endpointName(role), + Namespace: c.Namespace, + Annotations: c.annotationsSet(nil), + Labels: c.roleLabelsSet(true, role), }, } if len(subsets) > 0 { diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 7de8fd4a8..9ec8a0c18 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -239,7 +239,17 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { c.setProcessName("syncing %s endpoint", role) if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), c.endpointName(role), metav1.GetOptions{}); err == nil { - // TODO: No syncing of endpoints here, is this covered completely by updateService? + desiredEp := c.generateEndpoint(role, ep.Subsets) + if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations); changed { + patchData, err := metaAnnotationsPatch(desiredEp.Annotations) + if err != nil { + return fmt.Errorf("could not form patch for %s endpoint: %v", role, err) + } + ep, err = c.KubeClient.Endpoints(c.Namespace).Patch(context.TODO(), c.endpointName(role), types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("could not patch annotations of %s endpoint: %v", role, err) + } + } c.Endpoints[role] = ep return nil } @@ -1410,6 +1420,16 @@ func (c *Cluster) syncLogicalBackupJob() error { } c.logger.Info("the logical backup job is synced") } + if changed, _ := c.compareAnnotations(job.Annotations, desiredJob.Annotations); changed { + patchData, err := metaAnnotationsPatch(desiredJob.Annotations) + if err != nil { + return fmt.Errorf("could not form patch for the logical backup job %q: %v", jobName, err) + } + _, err = c.KubeClient.CronJobs(c.Namespace).Patch(context.TODO(), jobName, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("could not patch annotations of the logical backup job %q: %v", jobName, err) + } + } return nil } if !k8sutil.ResourceNotFound(err) { diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index 0af6c748c..3bd23f4b4 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -180,8 +180,21 @@ func checkResourcesInheritedAnnotations(cluster *Cluster, resultAnnotations map[ return nil } + checkEndpoints := func(annotations map[string]string) error { + endpointsList, err := cluster.KubeClient.Endpoints(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, ep := range endpointsList.Items { + if err := containsAnnotations(annotations, ep.Annotations, ep.Name, "Endpoints"); err != nil { + return err + } + } + return nil + } + checkFuncs := []func(map[string]string) error{ - checkSts, checkPods, checkSvc, checkPdb, checkPooler, checkPvc, checkSecrets, + checkSts, checkPods, checkSvc, checkPdb, checkPooler, checkPvc, checkSecrets, checkEndpoints, } for _, f := range checkFuncs { if err := f(resultAnnotations); err != nil { @@ -232,6 +245,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, Volume: acidv1.Volume{ Size: "1Gi", }, + NumberOfInstances: 2, }, } @@ -260,6 +274,8 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, PodRoleLabel: "spilo-role", ResourceCheckInterval: time.Duration(testResourceCheckInterval), ResourceCheckTimeout: time.Duration(testResourceCheckTimeout), + MinInstances: -1, + MaxInstances: -1, }, }, }, client, pg, logger, eventRecorder) @@ -379,6 +395,17 @@ func annotateResources(cluster *Cluster) error { return err } } + + endpoints, err := cluster.KubeClient.Endpoints(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, ep := range endpoints.Items { + ep.Annotations = externalAnnotations + if _, err = cluster.KubeClient.Endpoints(namespace).Update(context.TODO(), &ep, metav1.UpdateOptions{}); err != nil { + return err + } + } return nil } From b0add689c9c292905dae4701cfa268f412dd2499 Mon Sep 17 00:00:00 2001 From: Polina Bungina Date: Mon, 10 Jun 2024 20:24:34 +0200 Subject: [PATCH 4/9] Fix pvc retention policy compratison --- pkg/cluster/cluster.go | 6 ++++++ pkg/cluster/sync.go | 1 - 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 2f396713c..1b53048b1 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -434,6 +434,12 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa reasons = append(reasons, "new statefulset's pod management policy do not match") } + if c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy == nil { + c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy = &appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{ + WhenDeleted: appsv1.RetainPersistentVolumeClaimRetentionPolicyType, + WhenScaled: appsv1.RetainPersistentVolumeClaimRetentionPolicyType, + } + } if !reflect.DeepEqual(c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy, statefulSet.Spec.PersistentVolumeClaimRetentionPolicy) { match = false needsReplace = true diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 9ec8a0c18..565b5f975 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -92,7 +92,6 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } } - c.logger.Debug("syncing statefulsets") if err = c.syncStatefulSet(true); err != nil { if !k8sutil.ResourceAlreadyExists(err) { err = fmt.Errorf("could not sync statefulsets: %v", err) From 7d43a974250471a4db2a61a8e79b2ef38446915f Mon Sep 17 00:00:00 2001 From: Polina Bungina <27892524+hughcapet@users.noreply.github.com> Date: Wed, 12 Jun 2024 21:13:50 +0200 Subject: [PATCH 5/9] Apply suggestions from code review Co-authored-by: Felix Kunde --- pkg/cluster/cluster.go | 4 ++-- pkg/cluster/sync.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 1b53048b1..84d570bd8 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -521,7 +521,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa } if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations); changed { needsReplace = true - reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one: ", name)+reason) + reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one: %s", name, reason)) } if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) { name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name @@ -826,7 +826,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool return true, "" } -func (c *Cluster) ComparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) { +func (c *Cluster) comparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) { //TODO: improve comparison if match := reflect.DeepEqual(new.Spec, cur.Spec); !match { return false, "new PDB spec does not match the current one" diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 565b5f975..b519f5530 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -282,7 +282,7 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil { c.PodDisruptionBudget = pdb newPDB := c.generatePodDisruptionBudget() - match, reason := c.ComparePodDisruptionBudget(pdb, newPDB) + match, reason := c.comparePodDisruptionBudget(pdb, newPDB) if !match { c.logPDBChanges(pdb, newPDB, isUpdate, reason) if err = c.updatePodDisruptionBudget(newPDB); err != nil { From a5781a37e94c2c108d5b2a64a6d5b178dd5bad8b Mon Sep 17 00:00:00 2001 From: Polina Bungina Date: Thu, 13 Jun 2024 07:42:44 +0200 Subject: [PATCH 6/9] Refactor ignored_annotations e2e test --- e2e/tests/test_e2e.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 10eeca7bf..f2c4259d1 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -909,25 +909,35 @@ def test_ignored_annotations(self): ''' k8s = self.k8s - annotation_patch = { - "metadata": { - "annotations": { - "k8s-status": "healthy" - }, - } - } try: + patch_config_ignored_annotations = { + "data": { + "ignored_annotations": "k8s-status", + } + } + k8s.update_config(patch_config_ignored_annotations) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default') + svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default') + + annotation_patch = { + "metadata": { + "annotations": { + "k8s-status": "healthy" + }, + } + } + old_sts_creation_timestamp = sts.metadata.creation_timestamp k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch) - svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default') old_svc_creation_timestamp = svc.metadata.creation_timestamp k8s.api.core_v1.patch_namespaced_service(svc.metadata.name, svc.metadata.namespace, annotation_patch) patch_config_ignored_annotations = { "data": { - "ignored_annotations": "k8s-status", + "ignored_annotations": "k8s-status, foo", } } k8s.update_config(patch_config_ignored_annotations) From 2cad5d6e17c81c9eb931b0e5caf1bd962480c437 Mon Sep 17 00:00:00 2001 From: Polina Bungina Date: Tue, 18 Jun 2024 11:26:36 +0200 Subject: [PATCH 7/9] Attempt to remove syncStatefulSet() arg --- pkg/cluster/cluster.go | 12 ++---------- pkg/cluster/k8sres_test.go | 2 +- pkg/cluster/sync.go | 10 ++-------- pkg/cluster/sync_test.go | 6 +++--- 4 files changed, 8 insertions(+), 22 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 84d570bd8..23004ef9b 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -401,7 +401,7 @@ func (c *Cluster) Create() (err error) { if len(c.Spec.Streams) > 0 { // creating streams requires syncing the statefulset first - err = c.syncStatefulSet(true) + err = c.syncStatefulSet() if err != nil { return fmt.Errorf("could not sync statefulset: %v", err) } @@ -902,7 +902,6 @@ func (c *Cluster) hasFinalizer() bool { func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { updateFailed := false userInitFailed := false - syncStatefulSet := false c.mu.Lock() defer c.mu.Unlock() @@ -933,7 +932,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { if IsBiggerPostgresVersion(oldSpec.Spec.PostgresqlParam.PgVersion, c.GetDesiredMajorVersion()) { c.logger.Infof("postgresql version increased (%s -> %s), depending on config manual upgrade needed", oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion) - syncStatefulSet = true } else { c.logger.Infof("postgresql major version unchanged or smaller, no changes needed") // sticking with old version, this will also advance GetDesiredVersion next time. @@ -991,18 +989,12 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume size sync.") } - // streams configuration - if len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0 { - syncStatefulSet = true - } - // Statefulset func() { - if err := c.syncStatefulSet(syncStatefulSet); err != nil { + if err := c.syncStatefulSet(); err != nil { c.logger.Errorf("could not sync statefulsets: %v", err) updateFailed = true } - syncStatefulSet = false }() // add or remove standby_cluster section from Patroni config depending on changes in standby section diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index bc4c6a908..2eeefb218 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -3348,7 +3348,7 @@ func TestGenerateResourceRequirements(t *testing.T) { cluster.Namespace = namespace _, err := cluster.createStatefulSet() if k8sutil.ResourceAlreadyExists(err) { - err = cluster.syncStatefulSet(true) + err = cluster.syncStatefulSet() } assert.NoError(t, err) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index b519f5530..b106fc722 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -92,7 +92,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } } - if err = c.syncStatefulSet(true); err != nil { + if err = c.syncStatefulSet(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { err = fmt.Errorf("could not sync statefulsets: %v", err) return err @@ -317,7 +317,7 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { return nil } -func (c *Cluster) syncStatefulSet(force bool) error { +func (c *Cluster) syncStatefulSet() error { var ( restartWait uint32 configPatched bool @@ -339,9 +339,6 @@ func (c *Cluster) syncStatefulSet(force bool) error { } if err != nil { - if !force { - return nil - } // statefulset does not exist, try to re-create it c.Statefulset = nil c.logger.Infof("cluster's statefulset does not exist") @@ -370,9 +367,6 @@ func (c *Cluster) syncStatefulSet(force bool) error { if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } - if reflect.DeepEqual(sset, desiredSts) && !force { - return nil - } c.logger.Debugf("syncing statefulsets") // check if there are still pods with a rolling update flag for _, pod := range pods { diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index b1d132bdb..46d1be5b7 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -128,7 +128,7 @@ func TestSyncStatefulSetsAnnotations(t *testing.T) { } // now sync statefulset - the diff will trigger a replacement of the statefulset - cluster.syncStatefulSet(true) + cluster.syncStatefulSet() // compare again after the SYNC - must be identical to the desired state cmp = cluster.compareStatefulSetWith(desiredSts) @@ -572,7 +572,7 @@ func TestSyncStandbyClusterConfiguration(t *testing.T) { cluster.Spec.StandbyCluster = &acidv1.StandbyDescription{ S3WalPath: "s3://custom/path/to/bucket/", } - cluster.syncStatefulSet(true) + cluster.syncStatefulSet() updatedSts := cluster.Statefulset // check that pods do not have a STANDBY_* environment variable @@ -605,7 +605,7 @@ func TestSyncStandbyClusterConfiguration(t *testing.T) { */ // remove standby section cluster.Spec.StandbyCluster = &acidv1.StandbyDescription{} - cluster.syncStatefulSet(true) + cluster.syncStatefulSet() updatedSts2 := cluster.Statefulset // check that pods do not have a STANDBY_* environment variable From 2b7841e4102bc4a9917ad1c717c7d2f24395e567 Mon Sep 17 00:00:00 2001 From: Polina Bungina Date: Tue, 18 Jun 2024 12:32:25 +0200 Subject: [PATCH 8/9] Address other PR comments --- e2e/tests/test_e2e.py | 7 +------ pkg/cluster/volumes.go | 40 ++++++++++++++++++---------------------- 2 files changed, 19 insertions(+), 28 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index f2c4259d1..43dd467b5 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -935,12 +935,7 @@ def test_ignored_annotations(self): old_svc_creation_timestamp = svc.metadata.creation_timestamp k8s.api.core_v1.patch_namespaced_service(svc.metadata.name, svc.metadata.namespace, annotation_patch) - patch_config_ignored_annotations = { - "data": { - "ignored_annotations": "k8s-status, foo", - } - } - k8s.update_config(patch_config_ignored_annotations) + k8s.delete_operator_pod() self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default') diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go index b48863790..e96bd624b 100644 --- a/pkg/cluster/volumes.go +++ b/pkg/cluster/volumes.go @@ -42,22 +42,14 @@ func (c *Cluster) syncVolumes() error { c.logger.Errorf("errors occured during EBS volume adjustments: %v", err) } } + } - // resize pvc to adjust filesystem size until better K8s support - if err = c.syncVolumeClaims(false); err != nil { - err = fmt.Errorf("could not sync persistent volume claims: %v", err) - return err - } - } else if c.OpConfig.StorageResizeMode == "pvc" { - if err = c.syncVolumeClaims(false); err != nil { - err = fmt.Errorf("could not sync persistent volume claims: %v", err) - return err - } - } else if c.OpConfig.StorageResizeMode == "ebs" { - if err = c.syncVolumeClaims(true); err != nil { - err = fmt.Errorf("could not sync persistent volume claims: %v", err) - return err - } + if err = c.syncVolumeClaims(); err != nil { + err = fmt.Errorf("could not sync persistent volume claims: %v", err) + return err + } + + if c.OpConfig.StorageResizeMode == "ebs" { // potentially enlarge volumes before changing the statefulset. By doing that // in this order we make sure the operator is not stuck waiting for a pod that // cannot start because it ran out of disk space. @@ -68,11 +60,6 @@ func (c *Cluster) syncVolumes() error { err = fmt.Errorf("could not sync persistent volumes: %v", err) return err } - } else { - c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume size sync.") - if err := c.syncVolumeClaims(true); err != nil { - c.logger.Errorf("could not sync persistent volume claims: %v", err) - } } return nil @@ -191,9 +178,18 @@ func (c *Cluster) populateVolumeMetaData() error { } // syncVolumeClaims reads all persistent volume claims and checks that their size matches the one declared in the statefulset. -func (c *Cluster) syncVolumeClaims(noResize bool) error { +func (c *Cluster) syncVolumeClaims() error { c.setProcessName("syncing volume claims") + ignoreResize := false + + if c.OpConfig.StorageResizeMode == "off" || c.OpConfig.StorageResizeMode == "ebs" { + ignoreResize = true + if c.OpConfig.StorageResizeMode == "off" { + c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume size sync.") + } + } + newSize, err := resource.ParseQuantity(c.Spec.Volume.Size) if err != nil { return fmt.Errorf("could not parse volume size from the manifest: %v", err) @@ -207,7 +203,7 @@ func (c *Cluster) syncVolumeClaims(noResize bool) error { for _, pvc := range pvcs { needsUpdate := false currentSize := quantityToGigabyte(pvc.Spec.Resources.Requests[v1.ResourceStorage]) - if !noResize && currentSize != manifestSize { + if !ignoreResize && currentSize != manifestSize { if currentSize < manifestSize { pvc.Spec.Resources.Requests[v1.ResourceStorage] = newSize needsUpdate = true From 874ee59372382624a3e14d980224e269e6e2b223 Mon Sep 17 00:00:00 2001 From: Polina Bungina <27892524+hughcapet@users.noreply.github.com> Date: Wed, 26 Jun 2024 08:31:03 +0200 Subject: [PATCH 9/9] Update pkg/cluster/volumes.go Co-authored-by: Felix Kunde --- pkg/cluster/volumes.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go index e96bd624b..7d8bd1753 100644 --- a/pkg/cluster/volumes.go +++ b/pkg/cluster/volumes.go @@ -185,9 +185,8 @@ func (c *Cluster) syncVolumeClaims() error { if c.OpConfig.StorageResizeMode == "off" || c.OpConfig.StorageResizeMode == "ebs" { ignoreResize = true - if c.OpConfig.StorageResizeMode == "off" { - c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume size sync.") - } + c.logger.Debugf("Storage resize mode is set to %q. Skipping volume size sync of PVCs.", c.OpConfig.StorageResizeMode) + } newSize, err := resource.ParseQuantity(c.Spec.Volume.Size)