From 52a1c9cbc70f599e969fa3d216be8cf16b19f5c5 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 2 Aug 2024 14:58:38 +0200 Subject: [PATCH 01/24] sync all resources to cluster fields --- pkg/cluster/cluster.go | 138 ++++++++++----------------------------- pkg/cluster/k8sres.go | 16 ++--- pkg/cluster/resources.go | 75 ++++++++++++++++----- pkg/cluster/streams.go | 39 +++++------ pkg/cluster/sync.go | 69 ++++++++++++++++++-- pkg/cluster/types.go | 1 + 6 files changed, 179 insertions(+), 159 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 86aaa4788..2c5c12186 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -3,7 +3,6 @@ package cluster // Postgres CustomResourceDefinition object i.e. Spilo import ( - "context" "database/sql" "encoding/json" "fmt" @@ -15,6 +14,7 @@ import ( "github.com/sirupsen/logrus" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + zalandov1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1" "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme" "github.com/zalando/postgres-operator/pkg/spec" @@ -62,9 +62,12 @@ type Config struct { type kubeResources struct { Services map[PostgresRole]*v1.Service Endpoints map[PostgresRole]*v1.Endpoints + ConfigMaps map[string]*v1.ConfigMap Secrets map[types.UID]*v1.Secret Statefulset *appsv1.StatefulSet PodDisruptionBudget *policyv1.PodDisruptionBudget + LogicalBackupJob *batchv1.CronJob + Streams map[string]*zalandov1.FabricEventStream //Pods are treated separately //PVCs are treated separately } @@ -134,7 +137,8 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres kubeResources: kubeResources{ Secrets: make(map[types.UID]*v1.Secret), Services: make(map[PostgresRole]*v1.Service), - Endpoints: make(map[PostgresRole]*v1.Endpoints)}, + Endpoints: make(map[PostgresRole]*v1.Endpoints), + Streams: make(map[string]*zalandov1.FabricEventStream)}, userSyncStrategy: users.DefaultUserSyncStrategy{ PasswordEncryption: passwordEncryption, RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix, @@ -357,6 +361,20 @@ func (c *Cluster) Create() (err error) { c.logger.Infof("pods are ready") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready") + // sync resources created by Patroni + if c.patroniKubernetesUseConfigMaps() { + if err = c.syncConfigMaps(); err != nil { + c.logger.Warnf("Patroni configmaps not yet synced: %v", err) + } + } else { + if err = c.syncEndpoint(Patroni); err != nil { + err = fmt.Errorf("%s endpoint not yet synced: %v", Patroni, err) + } + } + if err = c.syncService(Patroni); err != nil { + err = fmt.Errorf("%s servic not yet synced: %v", Patroni, err) + } + // create database objects unless we are running without pods or disabled // that feature explicitly if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) { @@ -382,10 +400,6 @@ func (c *Cluster) Create() (err error) { c.logger.Info("a k8s cron job for logical backup has been successfully created") } - if err := c.listResources(); err != nil { - c.logger.Errorf("could not list resources: %v", err) - } - // Create connection pooler deployment and services if necessary. Since we // need to perform some operations with the database itself (e.g. install // lookup function), do it as the last step, when everything is available. @@ -410,6 +424,10 @@ func (c *Cluster) Create() (err error) { } } + if err := c.listResources(); err != nil { + c.logger.Errorf("could not list resources: %v", err) + } + return nil } @@ -1182,8 +1200,7 @@ func (c *Cluster) Delete() error { c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budget: %v", err) } - for _, role := range []PostgresRole{Master, Replica} { - + for _, role := range []PostgresRole{Master, Replica, Patroni} { if !c.patroniKubernetesUseConfigMaps() { if err := c.deleteEndpoint(role); err != nil { anyErrors = true @@ -1199,10 +1216,14 @@ func (c *Cluster) Delete() error { } } - if err := c.deletePatroniClusterObjects(); err != nil { - anyErrors = true - c.logger.Warningf("could not remove leftover patroni objects; %v", err) - c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not remove leftover patroni objects; %v", err) + if c.patroniKubernetesUseConfigMaps() { + for _, suffix := range []string{"leader", "config", "sync", "failover"} { + if err := c.deletePatroniConfigMap(suffix); err != nil { + anyErrors = true + c.logger.Warningf("could not delete %s config map: %v", suffix, err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s config map: %v", suffix, err) + } + } } // Delete connection pooler objects anyway, even if it's not mentioned in the @@ -1734,96 +1755,3 @@ func (c *Cluster) Lock() { func (c *Cluster) Unlock() { c.mu.Unlock() } - -type simpleActionWithResult func() - -type clusterObjectGet func(name string) (spec.NamespacedName, error) - -type clusterObjectDelete func(name string) error - -func (c *Cluster) deletePatroniClusterObjects() error { - // TODO: figure out how to remove leftover patroni objects in other cases - var actionsList []simpleActionWithResult - - if !c.patroniUsesKubernetes() { - c.logger.Infof("not cleaning up Etcd Patroni objects on cluster delete") - } - - actionsList = append(actionsList, c.deletePatroniClusterServices) - if c.patroniKubernetesUseConfigMaps() { - actionsList = append(actionsList, c.deletePatroniClusterConfigMaps) - } else { - actionsList = append(actionsList, c.deletePatroniClusterEndpoints) - } - - c.logger.Debugf("removing leftover Patroni objects (endpoints / services and configmaps)") - for _, deleter := range actionsList { - deleter() - } - return nil -} - -func deleteClusterObject( - get clusterObjectGet, - del clusterObjectDelete, - objType string, - clusterName string, - logger *logrus.Entry) { - for _, suffix := range patroniObjectSuffixes { - name := fmt.Sprintf("%s-%s", clusterName, suffix) - - namespacedName, err := get(name) - if err == nil { - logger.Debugf("deleting %s %q", - objType, namespacedName) - - if err = del(name); err != nil { - logger.Warningf("could not delete %s %q: %v", - objType, namespacedName, err) - } - - } else if !k8sutil.ResourceNotFound(err) { - logger.Warningf("could not fetch %s %q: %v", - objType, namespacedName, err) - } - } -} - -func (c *Cluster) deletePatroniClusterServices() { - get := func(name string) (spec.NamespacedName, error) { - svc, err := c.KubeClient.Services(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}) - return util.NameFromMeta(svc.ObjectMeta), err - } - - deleteServiceFn := func(name string) error { - return c.KubeClient.Services(c.Namespace).Delete(context.TODO(), name, c.deleteOptions) - } - - deleteClusterObject(get, deleteServiceFn, "service", c.Name, c.logger) -} - -func (c *Cluster) deletePatroniClusterEndpoints() { - get := func(name string) (spec.NamespacedName, error) { - ep, err := c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}) - return util.NameFromMeta(ep.ObjectMeta), err - } - - deleteEndpointFn := func(name string) error { - return c.KubeClient.Endpoints(c.Namespace).Delete(context.TODO(), name, c.deleteOptions) - } - - deleteClusterObject(get, deleteEndpointFn, "endpoint", c.Name, c.logger) -} - -func (c *Cluster) deletePatroniClusterConfigMaps() { - get := func(name string) (spec.NamespacedName, error) { - cm, err := c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}) - return util.NameFromMeta(cm.ObjectMeta), err - } - - deleteConfigMapFn := func(name string) error { - return c.KubeClient.ConfigMaps(c.Namespace).Delete(context.TODO(), name, c.deleteOptions) - } - - deleteClusterObject(get, deleteConfigMapFn, "configmap", c.Name, c.logger) -} diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index eb4402f03..83243481d 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -79,18 +79,12 @@ func (c *Cluster) statefulSetName() string { return c.Name } -func (c *Cluster) endpointName(role PostgresRole) string { - name := c.Name - if role == Replica { - name = fmt.Sprintf("%s-%s", name, "repl") - } - - return name -} - func (c *Cluster) serviceName(role PostgresRole) string { name := c.Name - if role == Replica { + switch role { + case Replica: + name = fmt.Sprintf("%s-%s", name, "repl") + case Patroni: name = fmt.Sprintf("%s-%s", name, "repl") } @@ -2061,7 +2055,7 @@ func (c *Cluster) getCustomServiceAnnotations(role PostgresRole, spec *acidv1.Po func (c *Cluster) generateEndpoint(role PostgresRole, subsets []v1.EndpointSubset) *v1.Endpoints { endpoints := &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ - Name: c.endpointName(role), + Name: c.serviceName(role), Namespace: c.Namespace, Annotations: c.annotationsSet(nil), Labels: c.roleLabelsSet(true, role), diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 8c97dc6a2..2c8a22e4d 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -31,11 +31,23 @@ func (c *Cluster) listResources() error { c.logger.Infof("found statefulset: %q (uid: %q)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) } - for _, obj := range c.Secrets { - c.logger.Infof("found secret: %q (uid: %q) namesapce: %s", util.NameFromMeta(obj.ObjectMeta), obj.UID, obj.ObjectMeta.Namespace) + for appId, stream := range c.Streams { + c.logger.Infof("found stream: %q with application id %q (uid: %q)", util.NameFromMeta(stream.ObjectMeta), appId, stream.UID) } - if !c.patroniKubernetesUseConfigMaps() { + if c.LogicalBackupJob != nil { + c.logger.Infof("found logical backup job: %q (uid: %q)", util.NameFromMeta(c.LogicalBackupJob.ObjectMeta), c.LogicalBackupJob.UID) + } + + for _, secret := range c.Secrets { + c.logger.Infof("found secret: %q (uid: %q) namespace: %s", util.NameFromMeta(secret.ObjectMeta), secret.UID, secret.ObjectMeta.Namespace) + } + + if c.patroniKubernetesUseConfigMaps() { + for suffix, configmap := range c.ConfigMaps { + c.logger.Infof("found %s config map: %q (uid: %q)", suffix, util.NameFromMeta(configmap.ObjectMeta), configmap.UID) + } + } else { for role, endpoint := range c.Endpoints { c.logger.Infof("found %s endpoint: %q (uid: %q)", role, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID) } @@ -63,6 +75,15 @@ func (c *Cluster) listResources() error { c.logger.Infof("found PVC: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) } + for role, poolerObjs := range c.ConnectionPooler { + if poolerObjs.Deployment != nil { + c.logger.Infof("found %s pooler deployment: %q (uid: %q) ", role, util.NameFromMeta(poolerObjs.Deployment.ObjectMeta), poolerObjs.Deployment.UID) + } + if poolerObjs.Service != nil { + c.logger.Infof("found %s pooler service: %q (uid: %q) ", role, util.NameFromMeta(poolerObjs.Service.ObjectMeta), poolerObjs.Service.UID) + } + } + return nil } @@ -332,11 +353,10 @@ func (c *Cluster) deleteService(role PostgresRole) error { } if err := c.KubeClient.Services(c.Services[role].Namespace).Delete(context.TODO(), c.Services[role].Name, c.deleteOptions); err != nil { - if k8sutil.ResourceNotFound(err) { - c.logger.Debugf("%s service has already been deleted", role) - } else if err != nil { - return err + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not delete %s service: %v", role, err) } + c.logger.Debugf("%s service has already been deleted", role) } c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(c.Services[role].ObjectMeta)) @@ -478,11 +498,10 @@ func (c *Cluster) deleteEndpoint(role PostgresRole) error { } if err := c.KubeClient.Endpoints(c.Endpoints[role].Namespace).Delete(context.TODO(), c.Endpoints[role].Name, c.deleteOptions); err != nil { - if k8sutil.ResourceNotFound(err) { - c.logger.Debugf("%s endpoint has already been deleted", role) - } else if err != nil { - return fmt.Errorf("could not delete endpoint: %v", err) + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not delete %s endpoint: %v", role, err) } + c.logger.Debugf("%s endpoint has already been deleted", role) } c.logger.Infof("%s endpoint %q has been deleted", role, util.NameFromMeta(c.Endpoints[role].ObjectMeta)) @@ -491,6 +510,27 @@ func (c *Cluster) deleteEndpoint(role PostgresRole) error { return nil } +func (c *Cluster) deletePatroniConfigMap(suffix string) error { + c.setProcessName("deleting config map") + c.logger.Debugln("deleting config map") + if c.ConfigMaps[suffix] == nil { + c.logger.Debugf("there is no %s config map in the cluster", suffix) + return nil + } + + if err := c.KubeClient.ConfigMaps(c.ConfigMaps[suffix].Namespace).Delete(context.TODO(), c.ConfigMaps[suffix].Name, c.deleteOptions); err != nil { + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not delete %s configmap %q: %v", suffix, c.ConfigMaps[suffix].Name, err) + } + c.logger.Debugf("%s config map has already been deleted", suffix) + } + + c.logger.Infof("%s config map %q has been deleted", suffix, util.NameFromMeta(c.ConfigMaps[suffix].ObjectMeta)) + delete(c.ConfigMaps, suffix) + + return nil +} + func (c *Cluster) deleteSecrets() error { c.setProcessName("deleting secrets") errors := make([]string, 0) @@ -540,10 +580,11 @@ func (c *Cluster) createLogicalBackupJob() (err error) { } c.logger.Debugf("Generated cronJobSpec: %v", logicalBackupJobSpec) - _, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Create(context.TODO(), logicalBackupJobSpec, metav1.CreateOptions{}) + cronJob, err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Create(context.TODO(), logicalBackupJobSpec, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("could not create k8s cron job: %v", err) } + c.LogicalBackupJob = cronJob return nil } @@ -557,7 +598,7 @@ func (c *Cluster) patchLogicalBackupJob(newJob *batchv1.CronJob) error { } // update the backup job spec - _, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Patch( + cronJob, err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Patch( context.TODO(), c.getLogicalBackupJobName(), types.MergePatchType, @@ -567,20 +608,24 @@ func (c *Cluster) patchLogicalBackupJob(newJob *batchv1.CronJob) error { if err != nil { return fmt.Errorf("could not patch logical backup job: %v", err) } + c.LogicalBackupJob = cronJob return nil } func (c *Cluster) deleteLogicalBackupJob() error { - + if c.LogicalBackupJob == nil { + return nil + } c.logger.Info("removing the logical backup job") - err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Delete(context.TODO(), c.getLogicalBackupJobName(), c.deleteOptions) + err := c.KubeClient.CronJobsGetter.CronJobs(c.LogicalBackupJob.Namespace).Delete(context.TODO(), c.getLogicalBackupJobName(), c.deleteOptions) if k8sutil.ResourceNotFound(err) { c.logger.Debugf("logical backup cron job %q has already been deleted", c.getLogicalBackupJobName()) } else if err != nil { return err } + c.LogicalBackupJob = nil return nil } diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index c76523f4a..20e3c9047 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -29,18 +29,19 @@ func (c *Cluster) createStreams(appId string) (*zalandov1.FabricEventStream, err return streamCRD, nil } -func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) error { +func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) (patchedStream *zalandov1.FabricEventStream, err error) { c.setProcessName("updating event streams") + patch, err := json.Marshal(newEventStreams) if err != nil { - return fmt.Errorf("could not marshal new event stream CRD %q: %v", newEventStreams.Name, err) + return nil, fmt.Errorf("could not marshal new event stream CRD %q: %v", newEventStreams.Name, err) } - if _, err := c.KubeClient.FabricEventStreams(newEventStreams.Namespace).Patch( + if patchedStream, err = c.KubeClient.FabricEventStreams(newEventStreams.Namespace).Patch( context.TODO(), newEventStreams.Name, types.MergePatchType, patch, metav1.PatchOptions{}); err != nil { - return err + return nil, err } - return nil + return patchedStream, nil } func (c *Cluster) deleteStream(stream *zalandov1.FabricEventStream) error { @@ -404,31 +405,23 @@ func (c *Cluster) createOrUpdateStreams() error { // there will be a separate event stream resource for each ID appIds := gatherApplicationIds(c.Spec.Streams) - // list all existing stream CRDs - listOptions := metav1.ListOptions{ - LabelSelector: c.labelsSet(true).String(), - } - streams, err := c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions) - if err != nil { - return fmt.Errorf("could not list of FabricEventStreams: %v", err) - } - for _, appId := range appIds { streamExists := false // update stream when it exists and EventStreams array differs - for _, stream := range streams.Items { + for _, stream := range c.Streams { if appId == stream.Spec.ApplicationId { streamExists = true desiredStreams := c.generateFabricEventStream(appId) if match, reason := sameStreams(stream.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match { c.logger.Debugf("updating event streams: %s", reason) desiredStreams.ObjectMeta = stream.ObjectMeta - err = c.updateStreams(desiredStreams) + updatedStream, err := c.updateStreams(desiredStreams) if err != nil { return fmt.Errorf("failed updating event stream %s: %v", stream.Name, err) } - c.logger.Infof("event stream %q has been successfully updated", stream.Name) + c.Streams[appId] = updatedStream + c.logger.Infof("event stream %q has been successfully updated", updatedStream.Name) } continue } @@ -436,22 +429,24 @@ func (c *Cluster) createOrUpdateStreams() error { if !streamExists { c.logger.Infof("event streams with applicationId %s do not exist, create it", appId) - streamCRD, err := c.createStreams(appId) + createdStream, err := c.createStreams(appId) if err != nil { return fmt.Errorf("failed creating event streams with applicationId %s: %v", appId, err) } - c.logger.Infof("event streams %q have been successfully created", streamCRD.Name) + c.logger.Infof("event streams %q have been successfully created", createdStream.Name) + c.Streams[appId] = createdStream } } // check if there is any deletion - for _, stream := range streams.Items { - if !util.SliceContains(appIds, stream.Spec.ApplicationId) { + for appId, stream := range c.Streams { + if !util.SliceContains(appIds, appId) { c.logger.Infof("event streams with applicationId %s do not exist in the manifest, delete it", stream.Spec.ApplicationId) - err := c.deleteStream(&stream) + err := c.deleteStream(stream) if err != nil { return fmt.Errorf("failed deleting event streams with applicationId %s: %v", stream.Spec.ApplicationId, err) } + c.Streams[appId] = nil c.logger.Infof("event streams %q have been successfully deleted", stream.Name) } } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index b106fc722..24cbccedc 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -80,6 +80,12 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { return err } + if c.patroniKubernetesUseConfigMaps() { + if err = c.syncConfigMaps(); err != nil { + c.logger.Errorf("could not sync configmaps: %v", err) + } + } + // sync volume may already transition volumes to gp3, if iops/throughput or type is specified if err = c.syncVolumes(); err != nil { return err @@ -173,8 +179,40 @@ func (c *Cluster) syncFinalizer() error { return nil } +func (c *Cluster) syncConfigMaps() error { + for _, suffix := range []string{"leader", "config", "sync", "failover"} { + if err := c.syncConfigMap(suffix); err != nil { + return fmt.Errorf("could not sync %s configmap: %v", suffix, err) + } + } + + return nil +} + +func (c *Cluster) syncConfigMap(suffix string) error { + var ( + cm *v1.ConfigMap + err error + ) + name := fmt.Sprintf("%s-%s", c.Name, suffix) + c.logger.Debugf("syncing %s configmap", name) + c.setProcessName("syncing %s config map", name) + + if cm, err = c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); err == nil { + c.ConfigMaps[suffix] = cm + return nil + } + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not get %s config map: %v", suffix, err) + } + // no existing config map, Patroni will handle it + c.ConfigMaps[suffix] = nil + + return nil +} + func (c *Cluster) syncServices() error { - for _, role := range []PostgresRole{Master, Replica} { + for _, role := range []PostgresRole{Master, Replica, Patroni} { c.logger.Debugf("syncing %s service", role) if !c.patroniKubernetesUseConfigMaps() { @@ -199,6 +237,10 @@ func (c *Cluster) syncService(role PostgresRole) error { if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.serviceName(role), metav1.GetOptions{}); err == nil { c.Services[role] = svc + // do not touch config service managed by Patroni + if role == Patroni { + return nil + } desiredSvc := c.generateService(role, &c.Spec) updatedSvc, err := c.updateService(role, svc, desiredSvc) if err != nil { @@ -211,6 +253,10 @@ func (c *Cluster) syncService(role PostgresRole) error { if !k8sutil.ResourceNotFound(err) { return fmt.Errorf("could not get %s service: %v", role, err) } + // if config service does not exist Patroni will create it + if role == Patroni { + return nil + } // no existing service, create new one c.Services[role] = nil c.logger.Infof("could not find the cluster's %s service", role) @@ -237,14 +283,19 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { ) c.setProcessName("syncing %s endpoint", role) - if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), c.endpointName(role), metav1.GetOptions{}); err == nil { + if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), c.serviceName(role), metav1.GetOptions{}); err == nil { + c.Endpoints[role] = ep + // do not touch config endpoint managed by Patroni + if role == Patroni { + return nil + } desiredEp := c.generateEndpoint(role, ep.Subsets) if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations); changed { patchData, err := metaAnnotationsPatch(desiredEp.Annotations) if err != nil { return fmt.Errorf("could not form patch for %s endpoint: %v", role, err) } - ep, err = c.KubeClient.Endpoints(c.Namespace).Patch(context.TODO(), c.endpointName(role), types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + ep, err = c.KubeClient.Endpoints(c.Namespace).Patch(context.TODO(), c.serviceName(role), types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) if err != nil { return fmt.Errorf("could not patch annotations of %s endpoint: %v", role, err) } @@ -255,6 +306,10 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { if !k8sutil.ResourceNotFound(err) { return fmt.Errorf("could not get %s endpoint: %v", role, err) } + // if config endpoint does not exist Patroni will create it + if role == Patroni { + return nil + } // no existing endpoint, create new one c.Endpoints[role] = nil c.logger.Infof("could not find the cluster's %s endpoint", role) @@ -266,7 +321,7 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { return fmt.Errorf("could not create missing %s endpoint: %v", role, err) } c.logger.Infof("%s endpoint %q already exists", role, util.NameFromMeta(ep.ObjectMeta)) - if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), c.endpointName(role), metav1.GetOptions{}); err != nil { + if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), c.serviceName(role), metav1.GetOptions{}); err != nil { return fmt.Errorf("could not fetch existing %s endpoint: %v", role, err) } } @@ -959,7 +1014,7 @@ func (c *Cluster) updateSecret( if updateSecret { c.logger.Debugln(updateSecretMsg) - if _, err = c.KubeClient.Secrets(secret.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{}); err != nil { + if secret, err = c.KubeClient.Secrets(secret.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{}); err != nil { return fmt.Errorf("could not update secret %s: %v", secretName, err) } c.Secrets[secret.UID] = secret @@ -970,10 +1025,11 @@ func (c *Cluster) updateSecret( if err != nil { return fmt.Errorf("could not form patch for secret %q annotations: %v", secret.Name, err) } - _, err = c.KubeClient.Secrets(secret.Namespace).Patch(context.TODO(), secret.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + secret, err = c.KubeClient.Secrets(secret.Namespace).Patch(context.TODO(), secret.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) if err != nil { return fmt.Errorf("could not patch annotations for secret %q: %v", secret.Name, err) } + c.Secrets[secret.UID] = secret } return nil @@ -1423,6 +1479,7 @@ func (c *Cluster) syncLogicalBackupJob() error { return fmt.Errorf("could not patch annotations of the logical backup job %q: %v", jobName, err) } } + c.LogicalBackupJob = desiredJob return nil } if !k8sutil.ResourceNotFound(err) { diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 1b4d0f389..8e9263d49 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -17,6 +17,7 @@ const ( // spilo roles Master PostgresRole = "master" Replica PostgresRole = "replica" + Patroni PostgresRole = "config" // roles returned by Patroni cluster endpoint Leader PostgresRole = "leader" From e4c372570edfba50cd92ad500cc6d1eec85bb2ed Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 2 Aug 2024 15:04:47 +0200 Subject: [PATCH 02/24] initialize ConfigMaps field --- pkg/cluster/cluster.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 2c5c12186..2396fe515 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -135,10 +135,11 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres systemUsers: make(map[string]spec.PgUser), podSubscribers: make(map[spec.NamespacedName]chan PodEvent), kubeResources: kubeResources{ - Secrets: make(map[types.UID]*v1.Secret), - Services: make(map[PostgresRole]*v1.Service), - Endpoints: make(map[PostgresRole]*v1.Endpoints), - Streams: make(map[string]*zalandov1.FabricEventStream)}, + Secrets: make(map[types.UID]*v1.Secret), + Services: make(map[PostgresRole]*v1.Service), + Endpoints: make(map[PostgresRole]*v1.Endpoints), + ConfigMaps: make(map[string]*v1.ConfigMap), + Streams: make(map[string]*zalandov1.FabricEventStream)}, userSyncStrategy: users.DefaultUserSyncStrategy{ PasswordEncryption: passwordEncryption, RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix, From cda48d2b44d692d32c7ade34570c8a7914453d00 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 2 Aug 2024 15:17:56 +0200 Subject: [PATCH 03/24] unify config map log messages --- pkg/cluster/resources.go | 2 +- pkg/cluster/sync.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 2c8a22e4d..93c44c28b 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -520,7 +520,7 @@ func (c *Cluster) deletePatroniConfigMap(suffix string) error { if err := c.KubeClient.ConfigMaps(c.ConfigMaps[suffix].Namespace).Delete(context.TODO(), c.ConfigMaps[suffix].Name, c.deleteOptions); err != nil { if !k8sutil.ResourceNotFound(err) { - return fmt.Errorf("could not delete %s configmap %q: %v", suffix, c.ConfigMaps[suffix].Name, err) + return fmt.Errorf("could not delete %s config map %q: %v", suffix, c.ConfigMaps[suffix].Name, err) } c.logger.Debugf("%s config map has already been deleted", suffix) } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 24cbccedc..24e469d43 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -82,7 +82,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { if c.patroniKubernetesUseConfigMaps() { if err = c.syncConfigMaps(); err != nil { - c.logger.Errorf("could not sync configmaps: %v", err) + c.logger.Errorf("could not sync config maps: %v", err) } } @@ -182,7 +182,7 @@ func (c *Cluster) syncFinalizer() error { func (c *Cluster) syncConfigMaps() error { for _, suffix := range []string{"leader", "config", "sync", "failover"} { if err := c.syncConfigMap(suffix); err != nil { - return fmt.Errorf("could not sync %s configmap: %v", suffix, err) + return fmt.Errorf("could not sync %s config map: %v", suffix, err) } } @@ -195,7 +195,7 @@ func (c *Cluster) syncConfigMap(suffix string) error { err error ) name := fmt.Sprintf("%s-%s", c.Name, suffix) - c.logger.Debugf("syncing %s configmap", name) + c.logger.Debugf("syncing %s config map", name) c.setProcessName("syncing %s config map", name) if cm, err = c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); err == nil { From bf02b64515d2f3e6ba97022597b24eedd4e3f902 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 2 Aug 2024 17:04:02 +0200 Subject: [PATCH 04/24] fix config service name --- pkg/cluster/k8sres.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 83243481d..2ad763530 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -85,7 +85,7 @@ func (c *Cluster) serviceName(role PostgresRole) string { case Replica: name = fmt.Sprintf("%s-%s", name, "repl") case Patroni: - name = fmt.Sprintf("%s-%s", name, "repl") + name = fmt.Sprintf("%s-%s", name, "config") } return name From ae1fab1b457a8a56fad6b31a9d8894b01240a06b Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 2 Aug 2024 21:07:09 +0200 Subject: [PATCH 05/24] what's up with the endpoints --- e2e/tests/test_e2e.py | 1 + 1 file changed, 1 insertion(+) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index d29fd3d5c..85fca5ec1 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2256,6 +2256,7 @@ def test_zz_cluster_deletion(self): # check if everything has been deleted self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_label), 0, "Pods not deleted") self.eventuallyEqual(lambda: k8s.count_services_with_label(cluster_label), 0, "Service not deleted") + print('Operator log: {}'.format(k8s.get_operator_log())) self.eventuallyEqual(lambda: k8s.count_endpoints_with_label(cluster_label), 0, "Endpoints not deleted") self.eventuallyEqual(lambda: k8s.count_statefulsets_with_label(cluster_label), 0, "Statefulset not deleted") self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted") From c9032ac656795999cb4236e06219282f4f610477 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Tue, 6 Aug 2024 12:56:14 +0200 Subject: [PATCH 06/24] swap services and endpoints deletion --- pkg/cluster/cluster.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 2396fe515..f08fdc342 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1202,6 +1202,12 @@ func (c *Cluster) Delete() error { } for _, role := range []PostgresRole{Master, Replica, Patroni} { + if err := c.deleteService(role); err != nil { + anyErrors = true + c.logger.Warningf("could not delete %s service: %v", role, err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s service: %v", role, err) + } + if !c.patroniKubernetesUseConfigMaps() { if err := c.deleteEndpoint(role); err != nil { anyErrors = true @@ -1209,12 +1215,6 @@ func (c *Cluster) Delete() error { c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s endpoint: %v", role, err) } } - - if err := c.deleteService(role); err != nil { - anyErrors = true - c.logger.Warningf("could not delete %s service: %v", role, err) - c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s service: %v", role, err) - } } if c.patroniKubernetesUseConfigMaps() { From 1759091275d127bf7d2b9e6c38d7c61e72f8a9a2 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Tue, 6 Aug 2024 13:45:24 +0200 Subject: [PATCH 07/24] debug the leftover endpoints --- e2e/tests/k8s_api.py | 5 ++++- e2e/tests/test_e2e.py | 10 +++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/e2e/tests/k8s_api.py b/e2e/tests/k8s_api.py index 276ddfa25..2b5aa289c 100644 --- a/e2e/tests/k8s_api.py +++ b/e2e/tests/k8s_api.py @@ -188,7 +188,10 @@ def count_services_with_label(self, labels, namespace='default'): return len(self.api.core_v1.list_namespaced_service(namespace, label_selector=labels).items) def count_endpoints_with_label(self, labels, namespace='default'): - return len(self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items) + eps = self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items + for ep in eps: + print("found endpoint: {}".format(ep.metadata.name)) + return len(eps) def count_secrets_with_label(self, labels, namespace='default'): return len(self.api.core_v1.list_namespaced_secret(namespace, label_selector=labels).items) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 85fca5ec1..066efce37 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2187,7 +2187,7 @@ def test_taint_based_eviction(self): self.assert_distributed_pods(master_nodes) @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_zz_cluster_deletion(self): + def test_aa_cluster_deletion(self): ''' Test deletion with configured protection ''' @@ -2255,15 +2255,15 @@ def test_zz_cluster_deletion(self): # check if everything has been deleted self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_label), 0, "Pods not deleted") - self.eventuallyEqual(lambda: k8s.count_services_with_label(cluster_label), 0, "Service not deleted") - print('Operator log: {}'.format(k8s.get_operator_log())) - self.eventuallyEqual(lambda: k8s.count_endpoints_with_label(cluster_label), 0, "Endpoints not deleted") self.eventuallyEqual(lambda: k8s.count_statefulsets_with_label(cluster_label), 0, "Statefulset not deleted") self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted") self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted") self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config") self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config") - + self.eventuallyEqual(lambda: k8s.count_services_with_label(cluster_label), 0, "Service not deleted") + print('Operator log: {}'.format(k8s.get_operator_log())) + self.eventuallyEqual(lambda: k8s.count_endpoints_with_label(cluster_label), 0, "Endpoints not deleted") + except timeout_decorator.TimeoutError: print('Operator log: {}'.format(k8s.get_operator_log())) raise From ec243cb5c86df5ecdbb45be94b1b1e3217eb37f0 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Tue, 6 Aug 2024 13:58:29 +0200 Subject: [PATCH 08/24] comment lines --- e2e/tests/test_e2e.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 066efce37..adb7c3c07 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2258,8 +2258,8 @@ def test_aa_cluster_deletion(self): self.eventuallyEqual(lambda: k8s.count_statefulsets_with_label(cluster_label), 0, "Statefulset not deleted") self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted") self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted") - self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config") - self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config") + # self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config") + # self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config") self.eventuallyEqual(lambda: k8s.count_services_with_label(cluster_label), 0, "Service not deleted") print('Operator log: {}'.format(k8s.get_operator_log())) self.eventuallyEqual(lambda: k8s.count_endpoints_with_label(cluster_label), 0, "Endpoints not deleted") From f2bcaeec5927f40200d084e7afb73a31f5fd9ee7 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Tue, 6 Aug 2024 14:08:23 +0200 Subject: [PATCH 09/24] undo changes - delete seems to work now --- e2e/tests/k8s_api.py | 5 +---- e2e/tests/test_e2e.py | 7 +++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/e2e/tests/k8s_api.py b/e2e/tests/k8s_api.py index 2b5aa289c..276ddfa25 100644 --- a/e2e/tests/k8s_api.py +++ b/e2e/tests/k8s_api.py @@ -188,10 +188,7 @@ def count_services_with_label(self, labels, namespace='default'): return len(self.api.core_v1.list_namespaced_service(namespace, label_selector=labels).items) def count_endpoints_with_label(self, labels, namespace='default'): - eps = self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items - for ep in eps: - print("found endpoint: {}".format(ep.metadata.name)) - return len(eps) + return len(self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items) def count_secrets_with_label(self, labels, namespace='default'): return len(self.api.core_v1.list_namespaced_secret(namespace, label_selector=labels).items) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index adb7c3c07..978f1e07e 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2187,7 +2187,7 @@ def test_taint_based_eviction(self): self.assert_distributed_pods(master_nodes) @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_aa_cluster_deletion(self): + def test_zz_cluster_deletion(self): ''' Test deletion with configured protection ''' @@ -2258,10 +2258,9 @@ def test_aa_cluster_deletion(self): self.eventuallyEqual(lambda: k8s.count_statefulsets_with_label(cluster_label), 0, "Statefulset not deleted") self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted") self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted") - # self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config") - # self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config") + self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config") + self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config") self.eventuallyEqual(lambda: k8s.count_services_with_label(cluster_label), 0, "Service not deleted") - print('Operator log: {}'.format(k8s.get_operator_log())) self.eventuallyEqual(lambda: k8s.count_endpoints_with_label(cluster_label), 0, "Endpoints not deleted") except timeout_decorator.TimeoutError: From 0482af4d530af971356be2c558046bc3c5e3a807 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Tue, 6 Aug 2024 15:43:32 +0200 Subject: [PATCH 10/24] the endpoint is near --- e2e/tests/k8s_api.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/e2e/tests/k8s_api.py b/e2e/tests/k8s_api.py index 276ddfa25..2b5aa289c 100644 --- a/e2e/tests/k8s_api.py +++ b/e2e/tests/k8s_api.py @@ -188,7 +188,10 @@ def count_services_with_label(self, labels, namespace='default'): return len(self.api.core_v1.list_namespaced_service(namespace, label_selector=labels).items) def count_endpoints_with_label(self, labels, namespace='default'): - return len(self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items) + eps = self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items + for ep in eps: + print("found endpoint: {}".format(ep.metadata.name)) + return len(eps) def count_secrets_with_label(self, labels, namespace='default'): return len(self.api.core_v1.list_namespaced_secret(namespace, label_selector=labels).items) From 74bc89b5837c169b3bb70e74f6a607aecd35f65e Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Tue, 6 Aug 2024 17:54:50 +0200 Subject: [PATCH 11/24] separated sync and delete logic for Patroni resources --- e2e/tests/k8s_api.py | 5 +-- pkg/cluster/cluster.go | 56 ++++++++++--------------- pkg/cluster/resources.go | 88 ++++++++++++++++++++++++++++++++-------- pkg/cluster/sync.go | 73 +++++++++++++++++++++------------ 4 files changed, 142 insertions(+), 80 deletions(-) diff --git a/e2e/tests/k8s_api.py b/e2e/tests/k8s_api.py index 2b5aa289c..276ddfa25 100644 --- a/e2e/tests/k8s_api.py +++ b/e2e/tests/k8s_api.py @@ -188,10 +188,7 @@ def count_services_with_label(self, labels, namespace='default'): return len(self.api.core_v1.list_namespaced_service(namespace, label_selector=labels).items) def count_endpoints_with_label(self, labels, namespace='default'): - eps = self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items - for ep in eps: - print("found endpoint: {}".format(ep.metadata.name)) - return len(eps) + return len(self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items) def count_secrets_with_label(self, labels, namespace='default'): return len(self.api.core_v1.list_namespaced_secret(namespace, label_selector=labels).items) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index f08fdc342..b131b59a5 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -30,7 +30,6 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" - apipolicyv1 "k8s.io/api/policy/v1" policyv1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -62,7 +61,8 @@ type Config struct { type kubeResources struct { Services map[PostgresRole]*v1.Service Endpoints map[PostgresRole]*v1.Endpoints - ConfigMaps map[string]*v1.ConfigMap + PatroniEndpoints map[string]*v1.Endpoints + PatroniConfigMaps map[string]*v1.ConfigMap Secrets map[types.UID]*v1.Secret Statefulset *appsv1.StatefulSet PodDisruptionBudget *policyv1.PodDisruptionBudget @@ -135,11 +135,12 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres systemUsers: make(map[string]spec.PgUser), podSubscribers: make(map[spec.NamespacedName]chan PodEvent), kubeResources: kubeResources{ - Secrets: make(map[types.UID]*v1.Secret), - Services: make(map[PostgresRole]*v1.Service), - Endpoints: make(map[PostgresRole]*v1.Endpoints), - ConfigMaps: make(map[string]*v1.ConfigMap), - Streams: make(map[string]*zalandov1.FabricEventStream)}, + Secrets: make(map[types.UID]*v1.Secret), + Services: make(map[PostgresRole]*v1.Service), + Endpoints: make(map[PostgresRole]*v1.Endpoints), + PatroniEndpoints: make(map[string]*v1.Endpoints), + PatroniConfigMaps: make(map[string]*v1.ConfigMap), + Streams: make(map[string]*zalandov1.FabricEventStream)}, userSyncStrategy: users.DefaultUserSyncStrategy{ PasswordEncryption: passwordEncryption, RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix, @@ -363,17 +364,8 @@ func (c *Cluster) Create() (err error) { c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready") // sync resources created by Patroni - if c.patroniKubernetesUseConfigMaps() { - if err = c.syncConfigMaps(); err != nil { - c.logger.Warnf("Patroni configmaps not yet synced: %v", err) - } - } else { - if err = c.syncEndpoint(Patroni); err != nil { - err = fmt.Errorf("%s endpoint not yet synced: %v", Patroni, err) - } - } - if err = c.syncService(Patroni); err != nil { - err = fmt.Errorf("%s servic not yet synced: %v", Patroni, err) + if err = c.syncPatroniResources(); err != nil { + c.logger.Warnf("Patroni resources not yet synced: %v", err) } // create database objects unless we are running without pods or disabled @@ -866,7 +858,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool return true, "" } -func (c *Cluster) comparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) { +func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) { //TODO: improve comparison if match := reflect.DeepEqual(new.Spec, cur.Spec); !match { return false, "new PDB spec does not match the current one" @@ -1201,13 +1193,7 @@ func (c *Cluster) Delete() error { c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budget: %v", err) } - for _, role := range []PostgresRole{Master, Replica, Patroni} { - if err := c.deleteService(role); err != nil { - anyErrors = true - c.logger.Warningf("could not delete %s service: %v", role, err) - c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s service: %v", role, err) - } - + for _, role := range []PostgresRole{Master, Replica} { if !c.patroniKubernetesUseConfigMaps() { if err := c.deleteEndpoint(role); err != nil { anyErrors = true @@ -1215,18 +1201,20 @@ func (c *Cluster) Delete() error { c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s endpoint: %v", role, err) } } - } - if c.patroniKubernetesUseConfigMaps() { - for _, suffix := range []string{"leader", "config", "sync", "failover"} { - if err := c.deletePatroniConfigMap(suffix); err != nil { - anyErrors = true - c.logger.Warningf("could not delete %s config map: %v", suffix, err) - c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s config map: %v", suffix, err) - } + if err := c.deleteService(role); err != nil { + anyErrors = true + c.logger.Warningf("could not delete %s service: %v", role, err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s service: %v", role, err) } } + if err := c.deletePatroniResources(); err != nil { + anyErrors = true + c.logger.Warningf("could not delete all Patroni resources: %v", err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete all Patroni resources: %v", err) + } + // Delete connection pooler objects anyway, even if it's not mentioned in the // manifest, just to not keep orphaned components in case if something went // wrong diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 93c44c28b..30a97169e 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -43,20 +43,24 @@ func (c *Cluster) listResources() error { c.logger.Infof("found secret: %q (uid: %q) namespace: %s", util.NameFromMeta(secret.ObjectMeta), secret.UID, secret.ObjectMeta.Namespace) } + for role, service := range c.Services { + c.logger.Infof("found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID) + } + + for role, endpoint := range c.Endpoints { + c.logger.Infof("found %s endpoint: %q (uid: %q)", role, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID) + } + if c.patroniKubernetesUseConfigMaps() { - for suffix, configmap := range c.ConfigMaps { - c.logger.Infof("found %s config map: %q (uid: %q)", suffix, util.NameFromMeta(configmap.ObjectMeta), configmap.UID) + for suffix, configmap := range c.PatroniConfigMaps { + c.logger.Infof("found %s Patroni config map: %q (uid: %q)", suffix, util.NameFromMeta(configmap.ObjectMeta), configmap.UID) } } else { - for role, endpoint := range c.Endpoints { - c.logger.Infof("found %s endpoint: %q (uid: %q)", role, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID) + for suffix, endpoint := range c.PatroniEndpoints { + c.logger.Infof("found %s Patroni endpoint: %q (uid: %q)", suffix, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID) } } - for role, service := range c.Services { - c.logger.Infof("found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID) - } - pods, err := c.listPods() if err != nil { return fmt.Errorf("could not get the list of pods: %v", err) @@ -510,23 +514,73 @@ func (c *Cluster) deleteEndpoint(role PostgresRole) error { return nil } +func (c *Cluster) deletePatroniResources() error { + c.setProcessName("deleting Patroni resources") + errors := make([]string, 0) + + if err := c.deleteService(Patroni); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + + for _, suffix := range patroniObjectSuffixes { + if c.patroniKubernetesUseConfigMaps() { + if err := c.deletePatroniConfigMap(suffix); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + } else { + if err := c.deletePatroniEndpoint(suffix); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + } + } + + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + + return nil +} + func (c *Cluster) deletePatroniConfigMap(suffix string) error { - c.setProcessName("deleting config map") - c.logger.Debugln("deleting config map") - if c.ConfigMaps[suffix] == nil { - c.logger.Debugf("there is no %s config map in the cluster", suffix) + c.setProcessName("deleting Patroni config map") + c.logger.Debugln("deleting Patroni config map") + cm := c.PatroniConfigMaps[suffix] + if cm == nil { + c.logger.Debugf("there is no %s Patroni config map in the cluster", suffix) + return nil + } + + if err := c.KubeClient.ConfigMaps(cm.Namespace).Delete(context.TODO(), cm.Name, c.deleteOptions); err != nil { + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not delete %s Patroni config map %q: %v", suffix, cm.Name, err) + } + c.logger.Debugf("%s Patroni config map has already been deleted", suffix) + } + + c.logger.Infof("%s Patroni config map %q has been deleted", suffix, util.NameFromMeta(cm.ObjectMeta)) + delete(c.PatroniConfigMaps, suffix) + + return nil +} + +func (c *Cluster) deletePatroniEndpoint(suffix string) error { + c.setProcessName("deleting Patroni endpoint") + c.logger.Debugln("deleting Patroni endpoint") + ep := c.PatroniEndpoints[suffix] + if ep == nil { + c.logger.Debugf("there is no %s Patroni endpoint in the cluster", suffix) return nil } - if err := c.KubeClient.ConfigMaps(c.ConfigMaps[suffix].Namespace).Delete(context.TODO(), c.ConfigMaps[suffix].Name, c.deleteOptions); err != nil { + if err := c.KubeClient.Endpoints(ep.Namespace).Delete(context.TODO(), ep.Name, c.deleteOptions); err != nil { if !k8sutil.ResourceNotFound(err) { - return fmt.Errorf("could not delete %s config map %q: %v", suffix, c.ConfigMaps[suffix].Name, err) + return fmt.Errorf("could not delete %s Patroni endpoint %q: %v", suffix, ep.Name, err) } - c.logger.Debugf("%s config map has already been deleted", suffix) + c.logger.Debugf("%s Patroni endpoint has already been deleted", suffix) } - c.logger.Infof("%s config map %q has been deleted", suffix, util.NameFromMeta(c.ConfigMaps[suffix].ObjectMeta)) - delete(c.ConfigMaps, suffix) + c.logger.Infof("%s Patroni endpoint %q has been deleted", suffix, util.NameFromMeta(ep.ObjectMeta)) + delete(c.PatroniEndpoints, suffix) return nil } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 24e469d43..fb95eda1b 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -80,10 +80,8 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { return err } - if c.patroniKubernetesUseConfigMaps() { - if err = c.syncConfigMaps(); err != nil { - c.logger.Errorf("could not sync config maps: %v", err) - } + if err = c.syncPatroniResources(); err != nil { + c.logger.Errorf("could not sync Patroni resources: %v", err) } // sync volume may already transition volumes to gp3, if iops/throughput or type is specified @@ -179,40 +177,74 @@ func (c *Cluster) syncFinalizer() error { return nil } -func (c *Cluster) syncConfigMaps() error { - for _, suffix := range []string{"leader", "config", "sync", "failover"} { - if err := c.syncConfigMap(suffix); err != nil { - return fmt.Errorf("could not sync %s config map: %v", suffix, err) +func (c *Cluster) syncPatroniResources() error { + errors := make([]string, 0) + + if err := c.syncService(Patroni); err != nil { + errors = append(errors, fmt.Sprintf("could not sync %s service: %v", Patroni, err)) + } + + for _, suffix := range patroniObjectSuffixes { + if c.patroniKubernetesUseConfigMaps() { + if err := c.syncPatroniConfigMap(suffix); err != nil { + errors = append(errors, fmt.Sprintf("could not sync %s Patroni config map: %v", suffix, err)) + } + } else { + if err := c.syncPatroniEndpoint(suffix); err != nil { + errors = append(errors, fmt.Sprintf("could not sync %s Patroni endpoint: %v", suffix, err)) + } } } + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + return nil } -func (c *Cluster) syncConfigMap(suffix string) error { +func (c *Cluster) syncPatroniConfigMap(suffix string) error { var ( cm *v1.ConfigMap err error ) name := fmt.Sprintf("%s-%s", c.Name, suffix) - c.logger.Debugf("syncing %s config map", name) - c.setProcessName("syncing %s config map", name) + c.logger.Debugf("syncing %s Patroni config map", name) + c.setProcessName("syncing %s Patroni config map", name) if cm, err = c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); err == nil { - c.ConfigMaps[suffix] = cm + c.PatroniConfigMaps[suffix] = cm return nil } if !k8sutil.ResourceNotFound(err) { - return fmt.Errorf("could not get %s config map: %v", suffix, err) + return fmt.Errorf("could not get %s Patroni config map: %v", suffix, err) + } + + return nil +} + +func (c *Cluster) syncPatroniEndpoint(suffix string) error { + var ( + ep *v1.Endpoints + err error + ) + name := fmt.Sprintf("%s-%s", c.Name, suffix) + c.logger.Debugf("syncing %s Patroni endpoint", name) + c.setProcessName("syncing %s Patroni endpoint", name) + + if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); err == nil { + c.PatroniEndpoints[suffix] = ep + return nil + } + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not get %s Patroni endpoint: %v", suffix, err) } - // no existing config map, Patroni will handle it - c.ConfigMaps[suffix] = nil return nil } func (c *Cluster) syncServices() error { - for _, role := range []PostgresRole{Master, Replica, Patroni} { + for _, role := range []PostgresRole{Master, Replica} { c.logger.Debugf("syncing %s service", role) if !c.patroniKubernetesUseConfigMaps() { @@ -284,11 +316,6 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { c.setProcessName("syncing %s endpoint", role) if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), c.serviceName(role), metav1.GetOptions{}); err == nil { - c.Endpoints[role] = ep - // do not touch config endpoint managed by Patroni - if role == Patroni { - return nil - } desiredEp := c.generateEndpoint(role, ep.Subsets) if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations); changed { patchData, err := metaAnnotationsPatch(desiredEp.Annotations) @@ -306,10 +333,6 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { if !k8sutil.ResourceNotFound(err) { return fmt.Errorf("could not get %s endpoint: %v", role, err) } - // if config endpoint does not exist Patroni will create it - if role == Patroni { - return nil - } // no existing endpoint, create new one c.Endpoints[role] = nil c.logger.Infof("could not find the cluster's %s endpoint", role) From aed22e2e78cd229ec1b88ce752f07776f169fd29 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Tue, 6 Aug 2024 22:35:46 +0200 Subject: [PATCH 12/24] undo e2e test changes --- e2e/tests/test_e2e.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 978f1e07e..d29fd3d5c 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2255,14 +2255,14 @@ def test_zz_cluster_deletion(self): # check if everything has been deleted self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_label), 0, "Pods not deleted") + self.eventuallyEqual(lambda: k8s.count_services_with_label(cluster_label), 0, "Service not deleted") + self.eventuallyEqual(lambda: k8s.count_endpoints_with_label(cluster_label), 0, "Endpoints not deleted") self.eventuallyEqual(lambda: k8s.count_statefulsets_with_label(cluster_label), 0, "Statefulset not deleted") self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted") self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted") self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config") self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config") - self.eventuallyEqual(lambda: k8s.count_services_with_label(cluster_label), 0, "Service not deleted") - self.eventuallyEqual(lambda: k8s.count_endpoints_with_label(cluster_label), 0, "Endpoints not deleted") - + except timeout_decorator.TimeoutError: print('Operator log: {}'.format(k8s.get_operator_log())) raise From 383409123a8ca9d10ef4fc6c1ac839c097c5e04f Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Wed, 7 Aug 2024 15:53:59 +0200 Subject: [PATCH 13/24] aligh delete streams and secrets logic with other resources --- pkg/cluster/connection_pooler.go | 2 +- pkg/cluster/resources.go | 7 +- pkg/cluster/streams.go | 146 ++++++++++++++++--------------- pkg/cluster/streams_test.go | 69 +++++++-------- pkg/cluster/sync.go | 6 +- 5 files changed, 113 insertions(+), 117 deletions(-) diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index 48f4ea849..c868782f1 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -654,7 +654,7 @@ func (c *Cluster) deleteConnectionPoolerSecret() (err error) { if err != nil { c.logger.Debugf("could not get connection pooler secret %s: %v", secretName, err) } else { - if err = c.deleteSecret(secret.UID, *secret); err != nil { + if err = c.deleteSecret(secret.UID); err != nil { return fmt.Errorf("could not delete pooler secret: %v", err) } } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 30a97169e..f2213545e 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -589,8 +589,8 @@ func (c *Cluster) deleteSecrets() error { c.setProcessName("deleting secrets") errors := make([]string, 0) - for uid, secret := range c.Secrets { - err := c.deleteSecret(uid, *secret) + for uid := range c.Secrets { + err := c.deleteSecret(uid) if err != nil { errors = append(errors, fmt.Sprintf("%v", err)) } @@ -603,8 +603,9 @@ func (c *Cluster) deleteSecrets() error { return nil } -func (c *Cluster) deleteSecret(uid types.UID, secret v1.Secret) error { +func (c *Cluster) deleteSecret(uid types.UID) error { c.setProcessName("deleting secret") + secret := c.Secrets[uid] secretName := util.NameFromMeta(secret.ObjectMeta) c.logger.Debugf("deleting secret %q", secretName) err := c.KubeClient.Secrets(secret.Namespace).Delete(context.TODO(), secret.Name, c.deleteOptions) diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 28820bbb5..018ef234a 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -44,37 +44,31 @@ func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) (p return patchedStream, nil } -func (c *Cluster) deleteStream(stream *zalandov1.FabricEventStream) error { +func (c *Cluster) deleteStream(appId string) error { c.setProcessName("deleting event stream") - err := c.KubeClient.FabricEventStreams(stream.Namespace).Delete(context.TODO(), stream.Name, metav1.DeleteOptions{}) + err := c.KubeClient.FabricEventStreams(c.Streams[appId].Namespace).Delete(context.TODO(), c.Streams[appId].Name, metav1.DeleteOptions{}) if err != nil { - return fmt.Errorf("could not delete event stream %q: %v", stream.Name, err) + return fmt.Errorf("could not delete event stream %q with applicationId %s: %v", c.Streams[appId].Name, appId, err) } + delete(c.Streams, appId) + return nil } func (c *Cluster) deleteStreams() error { - c.setProcessName("deleting event streams") - // check if stream CRD is installed before trying a delete _, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamCRDName, metav1.GetOptions{}) if k8sutil.ResourceNotFound(err) { return nil } - + c.setProcessName("deleting event streams") errors := make([]string, 0) - listOptions := metav1.ListOptions{ - LabelSelector: c.labelsSet(true).String(), - } - streams, err := c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions) - if err != nil { - return fmt.Errorf("could not list of FabricEventStreams: %v", err) - } - for _, stream := range streams.Items { - err := c.deleteStream(&stream) + + for appId := range c.Streams { + err := c.deleteStream(appId) if err != nil { - errors = append(errors, fmt.Sprintf("could not delete event stream %q: %v", stream.Name, err)) + errors = append(errors, fmt.Sprintf("%v", err)) } } @@ -138,7 +132,7 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za } // check if there is any deletion - for slotName, _ := range currentPublications { + for slotName := range currentPublications { if _, exists := databaseSlotsList[slotName]; !exists { deletePublications = append(deletePublications, slotName) } @@ -336,7 +330,7 @@ func (c *Cluster) syncStreams() error { return fmt.Errorf("could not get list of databases: %v", err) } // get database name with empty list of slot, except template0 and template1 - for dbName, _ := range listDatabases { + for dbName := range listDatabases { if dbName != "template0" && dbName != "template1" { databaseSlots[dbName] = map[string]zalandov1.Slot{} } @@ -396,70 +390,62 @@ func (c *Cluster) syncStreams() error { } // finally sync stream CRDs - err = c.createOrUpdateStreams(slotsToSync) - if err != nil { - return err - } - - return nil -} - -func (c *Cluster) createOrUpdateStreams(createdSlots map[string]map[string]string) error { - // fetch different application IDs from streams section // there will be a separate event stream resource for each ID appIds := gatherApplicationIds(c.Spec.Streams) - - for idx, appId := range appIds { - streamExists := false - - // update stream when it exists and EventStreams array differs - for _, stream := range c.Streams { - if appId == stream.Spec.ApplicationId { - streamExists = true - desiredStreams := c.generateFabricEventStream(appId) - if match, reason := sameStreams(stream.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match { - c.logger.Debugf("updating event streams: %s", reason) - desiredStreams.ObjectMeta = stream.ObjectMeta - updatedStream, err := c.updateStreams(desiredStreams) - if err != nil { - return fmt.Errorf("failed updating event stream %s: %v", stream.Name, err) - } - c.Streams[appId] = updatedStream - c.logger.Infof("event stream %q has been successfully updated", updatedStream.Name) + for _, appId := range appIds { + slotExists := false + for slotName := range slotsToSync { + if strings.HasSuffix(slotName, appId) { + slotExists = true + if err = c.syncStream(appId); err != nil { + c.logger.Warningf("could not sync event streams with applicationId %s: %v", appId, err) } - continue + break } } - - if !streamExists { - // check if there is any slot with the applicationId - slotName := getSlotName(c.Spec.Streams[idx].Database, appId) - if _, exists := createdSlots[slotName]; !exists { - c.logger.Warningf("no slot %s with applicationId %s exists, skipping event stream creation", slotName, appId) - continue - } - c.logger.Infof("event streams with applicationId %s do not exist, create it", appId) - createdStream, err := c.createStreams(appId) - if err != nil { - return fmt.Errorf("failed creating event streams with applicationId %s: %v", appId, err) - } - c.logger.Infof("event streams %q have been successfully created", createdStream.Name) - c.Streams[appId] = createdStream + if !slotExists { + c.logger.Warningf("no replication slot for stream with applicationId %s exists, skipping event stream creation", appId) } } // check if there is any deletion - for appId, stream := range c.Streams { - if !util.SliceContains(appIds, appId) { - c.logger.Infof("event streams with applicationId %s do not exist in the manifest, delete it", stream.Spec.ApplicationId) - err := c.deleteStream(stream) - if err != nil { - return fmt.Errorf("failed deleting event streams with applicationId %s: %v", stream.Spec.ApplicationId, err) + if err = c.cleanupRemovedStreams(appIds); err != nil { + return fmt.Errorf("%v", err) + } + + return nil +} + +func (c *Cluster) syncStream(appId string) error { + streamExists := false + // update stream when it exists and EventStreams array differs + for _, stream := range c.Streams { + if appId == stream.Spec.ApplicationId { + streamExists = true + desiredStreams := c.generateFabricEventStream(appId) + if match, reason := sameStreams(stream.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match { + c.logger.Debugf("updating event stream with applicationId %s: %s", appId, reason) + desiredStreams.ObjectMeta = stream.ObjectMeta + updatedStream, err := c.updateStreams(desiredStreams) + if err != nil { + return fmt.Errorf("failed updating event stream %s with applicationId %s: %v", stream.Name, appId, err) + } + c.Streams[appId] = updatedStream + c.logger.Infof("event stream %q with applicationId %s has been successfully updated", updatedStream.Name, appId) } - c.Streams[appId] = nil - c.logger.Infof("event streams %q have been successfully deleted", stream.Name) + continue + } + } + + if !streamExists { + c.logger.Infof("event streams with applicationId %s do not exist, create it", appId) + createdStream, err := c.createStreams(appId) + if err != nil { + return fmt.Errorf("failed creating event streams with applicationId %s: %v", appId, err) } + c.logger.Infof("event streams %q have been successfully created", createdStream.Name) + c.Streams[appId] = createdStream } return nil @@ -489,3 +475,23 @@ func sameStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (matc return true, "" } + +func (c *Cluster) cleanupRemovedStreams(appIds []string) error { + errors := make([]string, 0) + for appId := range c.Streams { + if !util.SliceContains(appIds, appId) { + c.logger.Infof("event streams with applicationId %s do not exist in the manifest, delete it", appId) + err := c.deleteStream(appId) + if err != nil { + errors = append(errors, fmt.Sprintf("failed deleting event streams with applicationId %s: %v", appId, err)) + } + c.logger.Infof("event streams with applicationId %s have been successfully deleted", appId) + } + } + + if len(errors) > 0 { + return fmt.Errorf("could not delete all removed event streams: %v", strings.Join(errors, `', '`)) + } + + return nil +} diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 58d337f25..5de2c8fb8 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -41,10 +41,6 @@ var ( fesUser string = fmt.Sprintf("%s%s", constants.EventStreamSourceSlotPrefix, constants.UserRoleNameSuffix) slotName string = fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1)) - fakeCreatedSlots map[string]map[string]string = map[string]map[string]string{ - slotName: {}, - } - pg = acidv1.Postgresql{ TypeMeta: metav1.TypeMeta{ Kind: "Postgresql", @@ -226,7 +222,7 @@ func TestGenerateFabricEventStream(t *testing.T) { assert.NoError(t, err) // create the streams - err = cluster.createOrUpdateStreams(fakeCreatedSlots) + err = cluster.syncStream(appId) assert.NoError(t, err) // compare generated stream with expected stream @@ -252,7 +248,7 @@ func TestGenerateFabricEventStream(t *testing.T) { } // sync streams once again - err = cluster.createOrUpdateStreams(fakeCreatedSlots) + err = cluster.syncStream(appId) assert.NoError(t, err) streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) @@ -401,7 +397,7 @@ func TestUpdateFabricEventStream(t *testing.T) { assert.NoError(t, err) // now create the stream - err = cluster.createOrUpdateStreams(fakeCreatedSlots) + err = cluster.syncStream(appId) assert.NoError(t, err) // change specs of streams and patch CRD @@ -415,46 +411,25 @@ func TestUpdateFabricEventStream(t *testing.T) { } } - patchData, err := specPatch(pg.Spec) - assert.NoError(t, err) - - pgPatched, err := cluster.KubeClient.Postgresqls(namespace).Patch( - context.TODO(), cluster.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "spec") - assert.NoError(t, err) - - cluster.Postgresql.Spec = pgPatched.Spec - err = cluster.createOrUpdateStreams(fakeCreatedSlots) - assert.NoError(t, err) - // compare stream returned from API with expected stream listOptions := metav1.ListOptions{ LabelSelector: cluster.labelsSet(true).String(), } - streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) - assert.NoError(t, err) - + streams := patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions) result := cluster.generateFabricEventStream(appId) if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match { t.Errorf("Malformed FabricEventStream after updating manifest, expected %#v, got %#v", streams.Items[0], result) } // disable recovery - for _, stream := range pg.Spec.Streams { + for idx, stream := range pg.Spec.Streams { if stream.ApplicationId == appId { stream.EnableRecovery = util.False() + pg.Spec.Streams[idx] = stream } } - patchData, err = specPatch(pg.Spec) - assert.NoError(t, err) - - pgPatched, err = cluster.KubeClient.Postgresqls(namespace).Patch( - context.TODO(), cluster.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "spec") - assert.NoError(t, err) - - cluster.Postgresql.Spec = pgPatched.Spec - err = cluster.createOrUpdateStreams(fakeCreatedSlots) - assert.NoError(t, err) + streams = patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions) result = cluster.generateFabricEventStream(appId) if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match { t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result) @@ -464,16 +439,34 @@ func TestUpdateFabricEventStream(t *testing.T) { cluster.KubeClient.CustomResourceDefinitionsGetter = mockClient.CustomResourceDefinitionsGetter // remove streams from manifest - pgPatched.Spec.Streams = nil + pg.Spec.Streams = nil pgUpdated, err := cluster.KubeClient.Postgresqls(namespace).Update( - context.TODO(), pgPatched, metav1.UpdateOptions{}) + context.TODO(), &pg, metav1.UpdateOptions{}) assert.NoError(t, err) - cluster.Postgresql.Spec = pgUpdated.Spec - cluster.createOrUpdateStreams(fakeCreatedSlots) + appIds := gatherApplicationIds(pgUpdated.Spec.Streams) + cluster.cleanupRemovedStreams(appIds) - streamList, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) - if len(streamList.Items) > 0 || err != nil { + streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) + if len(streams.Items) > 0 || err != nil { t.Errorf("stream resource has not been removed or unexpected error %v", err) } } + +func patchPostgresqlStreams(t *testing.T, cluster *Cluster, pgSpec *acidv1.PostgresSpec, listOptions metav1.ListOptions) (streams *zalandov1.FabricEventStreamList) { + patchData, err := specPatch(pgSpec) + assert.NoError(t, err) + + pgPatched, err := cluster.KubeClient.Postgresqls(namespace).Patch( + context.TODO(), cluster.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "spec") + assert.NoError(t, err) + + cluster.Postgresql.Spec = pgPatched.Spec + err = cluster.syncStream(appId) + assert.NoError(t, err) + + streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) + assert.NoError(t, err) + + return streams +} diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index fb95eda1b..5ad85a5e2 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -290,7 +290,6 @@ func (c *Cluster) syncService(role PostgresRole) error { return nil } // no existing service, create new one - c.Services[role] = nil c.logger.Infof("could not find the cluster's %s service", role) if svc, err = c.createService(role); err == nil { @@ -334,7 +333,6 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { return fmt.Errorf("could not get %s endpoint: %v", role, err) } // no existing endpoint, create new one - c.Endpoints[role] = nil c.logger.Infof("could not find the cluster's %s endpoint", role) if ep, err = c.createEndpoint(role); err == nil { @@ -376,7 +374,6 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { return fmt.Errorf("could not get pod disruption budget: %v", err) } // no existing pod disruption budget, create new one - c.PodDisruptionBudget = nil c.logger.Infof("could not find the cluster's pod disruption budget") if pdb, err = c.createPodDisruptionBudget(); err != nil { @@ -418,7 +415,6 @@ func (c *Cluster) syncStatefulSet() error { if err != nil { // statefulset does not exist, try to re-create it - c.Statefulset = nil c.logger.Infof("cluster's statefulset does not exist") sset, err = c.createStatefulSet() @@ -783,7 +779,7 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv // check if specified slots exist in config and if they differ for slotName, desiredSlot := range desiredPatroniConfig.Slots { // only add slots specified in manifest to c.replicationSlots - for manifestSlotName, _ := range c.Spec.Patroni.Slots { + for manifestSlotName := range c.Spec.Patroni.Slots { if manifestSlotName == slotName { c.replicationSlots[slotName] = desiredSlot } From db20458f2b6a87f6942cb5a7ec927fec95f7bce8 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Wed, 7 Aug 2024 22:14:53 +0200 Subject: [PATCH 14/24] rename gatherApplicationIds to getDistinct --- e2e/tests/test_e2e.py | 12 ++++++------ pkg/cluster/streams.go | 18 +++++++++--------- pkg/cluster/streams_test.go | 6 +++--- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index d29fd3d5c..5f7d79239 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -400,8 +400,8 @@ def test_config_update(self): "max_connections": new_max_connections_value, "wal_level": "logical" } - }, - "patroni": { + }, + "patroni": { "slots": { "first_slot": { "type": "physical" @@ -412,7 +412,7 @@ def test_config_update(self): "retry_timeout": 9, "synchronous_mode": True, "failsafe_mode": True, - } + } } } @@ -515,7 +515,7 @@ def compare_config(): pg_add_new_slots_patch = { "spec": { "patroni": { - "slots": { + "slots": { "test_slot": { "type": "logical", "database": "foo", @@ -2020,13 +2020,13 @@ def test_stream_resources(self): # update the manifest with the streams section patch_streaming_config = { "spec": { - "patroni": { + "patroni": { "slots": { "manual_slot": { "type": "physical" } } - }, + }, "streams": [ { "applicationId": "test-app", diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 018ef234a..2d6fe1cd2 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -79,7 +79,7 @@ func (c *Cluster) deleteStreams() error { return nil } -func gatherApplicationIds(streams []acidv1.Stream) []string { +func getDistinctApplicationIds(streams []acidv1.Stream) []string { appIds := make([]string, 0) for _, stream := range streams { if !util.SliceContains(appIds, stream.ApplicationId) { @@ -336,7 +336,7 @@ func (c *Cluster) syncStreams() error { } } - // gather list of required slots and publications, group by database + // get list of required slots and publications, group by database for _, stream := range c.Spec.Streams { if _, exists := databaseSlots[stream.Database]; !exists { c.logger.Warningf("database %q does not exist in the cluster", stream.Database) @@ -390,13 +390,13 @@ func (c *Cluster) syncStreams() error { } // finally sync stream CRDs - // fetch different application IDs from streams section + // get distinct application IDs from streams section // there will be a separate event stream resource for each ID - appIds := gatherApplicationIds(c.Spec.Streams) + appIds := getDistinctApplicationIds(c.Spec.Streams) for _, appId := range appIds { slotExists := false for slotName := range slotsToSync { - if strings.HasSuffix(slotName, appId) { + if strings.HasSuffix(slotName, strings.Replace(appId, "-", "_", -1)) { slotExists = true if err = c.syncStream(appId); err != nil { c.logger.Warningf("could not sync event streams with applicationId %s: %v", appId, err) @@ -405,7 +405,7 @@ func (c *Cluster) syncStreams() error { } } if !slotExists { - c.logger.Warningf("no replication slot for stream with applicationId %s exists, skipping event stream creation", appId) + c.logger.Warningf("no replication slot for streams with applicationId %s exists, skipping event stream creation", appId) } } @@ -425,14 +425,14 @@ func (c *Cluster) syncStream(appId string) error { streamExists = true desiredStreams := c.generateFabricEventStream(appId) if match, reason := sameStreams(stream.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match { - c.logger.Debugf("updating event stream with applicationId %s: %s", appId, reason) + c.logger.Debugf("updating event streams with applicationId %s: %s", appId, reason) desiredStreams.ObjectMeta = stream.ObjectMeta updatedStream, err := c.updateStreams(desiredStreams) if err != nil { - return fmt.Errorf("failed updating event stream %s with applicationId %s: %v", stream.Name, appId, err) + return fmt.Errorf("failed updating event streams %s with applicationId %s: %v", stream.Name, appId, err) } c.Streams[appId] = updatedStream - c.logger.Infof("event stream %q with applicationId %s has been successfully updated", updatedStream.Name, appId) + c.logger.Infof("event streams %q with applicationId %s have been successfully updated", updatedStream.Name, appId) } continue } diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 5de2c8fb8..5af53bbaa 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -185,10 +185,10 @@ var ( func TestGatherApplicationIds(t *testing.T) { testAppIds := []string{appId} - appIds := gatherApplicationIds(pg.Spec.Streams) + appIds := getDistinctApplicationIds(pg.Spec.Streams) if !util.IsEqualIgnoreOrder(testAppIds, appIds) { - t.Errorf("gathered applicationIds do not match, expected %#v, got %#v", testAppIds, appIds) + t.Errorf("list of applicationIds does not match, expected %#v, got %#v", testAppIds, appIds) } } @@ -444,7 +444,7 @@ func TestUpdateFabricEventStream(t *testing.T) { context.TODO(), &pg, metav1.UpdateOptions{}) assert.NoError(t, err) - appIds := gatherApplicationIds(pgUpdated.Spec.Streams) + appIds := getDistinctApplicationIds(pgUpdated.Spec.Streams) cluster.cleanupRemovedStreams(appIds) streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) From ea3a0f8daac353372488e44532fcd72298125f6e Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Thu, 8 Aug 2024 21:42:20 +0200 Subject: [PATCH 15/24] improve slot check before syncing streams CRD --- pkg/cluster/streams.go | 31 +++++++++----- pkg/cluster/streams_test.go | 85 +++++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 11 deletions(-) diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 2d6fe1cd2..5e1b969fa 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -394,18 +394,12 @@ func (c *Cluster) syncStreams() error { // there will be a separate event stream resource for each ID appIds := getDistinctApplicationIds(c.Spec.Streams) for _, appId := range appIds { - slotExists := false - for slotName := range slotsToSync { - if strings.HasSuffix(slotName, strings.Replace(appId, "-", "_", -1)) { - slotExists = true - if err = c.syncStream(appId); err != nil { - c.logger.Warningf("could not sync event streams with applicationId %s: %v", appId, err) - } - break + if hasSlotsInSync(appId, databaseSlots, slotsToSync) { + if err = c.syncStream(appId); err != nil { + c.logger.Warningf("could not sync event streams with applicationId %s: %v", appId, err) } - } - if !slotExists { - c.logger.Warningf("no replication slot for streams with applicationId %s exists, skipping event stream creation", appId) + } else { + c.logger.Warningf("database replication slots for streams with applicationId %s not in sync, skipping event stream sync", appId) } } @@ -417,6 +411,21 @@ func (c *Cluster) syncStreams() error { return nil } +func hasSlotsInSync(appId string, databaseSlots map[string]map[string]zalandov1.Slot, slotsToSync map[string]map[string]string) bool { + allSlotsInSync := true + for dbName, slots := range databaseSlots { + for slotName := range slots { + if slotName == getSlotName(dbName, appId) { + if _, exists := slotsToSync[slotName]; !exists { + allSlotsInSync = false + } + } + } + } + + return allSlotsInSync +} + func (c *Cluster) syncStream(appId string) error { streamExists := false // update stream when it exists and EventStreams array differs diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 5af53bbaa..318bd8597 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -192,6 +192,91 @@ func TestGatherApplicationIds(t *testing.T) { } } +func TestHasSlotsInSync(t *testing.T) { + + tests := []struct { + subTest string + expectedSlots map[string]map[string]zalandov1.Slot + actualSlots map[string]map[string]string + slotsInSync bool + }{ + { + subTest: "slots are in sync", + expectedSlots: map[string]map[string]zalandov1.Slot{ + dbName: { + slotName: zalandov1.Slot{ + Slot: map[string]string{ + "databases": dbName, + "plugin": constants.EventStreamSourcePluginType, + "type": "logical", + }, + Publication: map[string]acidv1.StreamTable{ + "test1": acidv1.StreamTable{ + EventType: "stream-type-a", + }, + }, + }, + }, + }, + actualSlots: map[string]map[string]string{ + slotName: map[string]string{ + "databases": dbName, + "plugin": constants.EventStreamSourcePluginType, + "type": "logical", + }, + }, + slotsInSync: true, + }, { + subTest: "slots are not in sync", + expectedSlots: map[string]map[string]zalandov1.Slot{ + dbName: { + slotName: zalandov1.Slot{ + Slot: map[string]string{ + "databases": dbName, + "plugin": constants.EventStreamSourcePluginType, + "type": "logical", + }, + Publication: map[string]acidv1.StreamTable{ + "test1": acidv1.StreamTable{ + EventType: "stream-type-a", + }, + }, + }, + }, + "dbnotexists": { + slotName: zalandov1.Slot{ + Slot: map[string]string{ + "databases": "dbnotexists", + "plugin": constants.EventStreamSourcePluginType, + "type": "logical", + }, + Publication: map[string]acidv1.StreamTable{ + "test2": acidv1.StreamTable{ + EventType: "stream-type-b", + }, + }, + }, + }, + }, + actualSlots: map[string]map[string]string{ + slotName: map[string]string{ + "databases": dbName, + "plugin": constants.EventStreamSourcePluginType, + "type": "logical", + }, + }, + slotsInSync: false, + }, + } + + for _, tt := range tests { + result := hasSlotsInSync(appId, tt.expectedSlots, tt.actualSlots) + if !result { + t.Errorf("slots are not in sync, expected %#v, got %#v", tt.expectedSlots, tt.actualSlots) + } + } +} + func TestGenerateFabricEventStream(t *testing.T) { client, _ := newFakeK8sStreamClient() From 7d50d566e26b047713a7bf7c1ac979fafc9188d5 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 9 Aug 2024 12:37:45 +0200 Subject: [PATCH 16/24] patch annotation for Patroni config service --- pkg/cluster/k8sres.go | 3 +++ pkg/cluster/resources.go | 2 +- pkg/cluster/sync.go | 6 ------ 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 2ad763530..2b5f4fd59 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1962,6 +1962,9 @@ func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, spec *ac return c.OpConfig.EnableMasterLoadBalancer + case Patroni: + return false + default: panic(fmt.Sprintf("Unknown role %v", role)) } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index f2213545e..5273961ac 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -316,7 +316,7 @@ func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newSe serviceName := util.NameFromMeta(oldService.ObjectMeta) match, reason := c.compareServices(oldService, newService) - if !match { + if !match && role != Patroni { c.logServiceChanges(role, oldService, newService, false, reason) c.setProcessName("updating %v service", role) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 5ad85a5e2..acf55af35 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -214,7 +214,6 @@ func (c *Cluster) syncPatroniConfigMap(suffix string) error { if cm, err = c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); err == nil { c.PatroniConfigMaps[suffix] = cm - return nil } if !k8sutil.ResourceNotFound(err) { return fmt.Errorf("could not get %s Patroni config map: %v", suffix, err) @@ -234,7 +233,6 @@ func (c *Cluster) syncPatroniEndpoint(suffix string) error { if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); err == nil { c.PatroniEndpoints[suffix] = ep - return nil } if !k8sutil.ResourceNotFound(err) { return fmt.Errorf("could not get %s Patroni endpoint: %v", suffix, err) @@ -269,10 +267,6 @@ func (c *Cluster) syncService(role PostgresRole) error { if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.serviceName(role), metav1.GetOptions{}); err == nil { c.Services[role] = svc - // do not touch config service managed by Patroni - if role == Patroni { - return nil - } desiredSvc := c.generateService(role, &c.Spec) updatedSvc, err := c.updateService(role, svc, desiredSvc) if err != nil { From 409924a78965140e73af49c65cc1694115340f96 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 9 Aug 2024 18:48:52 +0200 Subject: [PATCH 17/24] add ownerReferences and annotations diff to Patroni objects --- pkg/cluster/sync.go | 46 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 314e7b706..28490442e 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -214,6 +214,29 @@ func (c *Cluster) syncPatroniConfigMap(suffix string) error { if cm, err = c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); err == nil { c.PatroniConfigMaps[suffix] = cm + desiredOwnerRefs := c.ownerReferences() + if !reflect.DeepEqual(cm.ObjectMeta.OwnerReferences, desiredOwnerRefs) { + c.logger.Infof("new %s config map's owner references do not match the current ones", suffix) + cm.ObjectMeta.OwnerReferences = desiredOwnerRefs + c.setProcessName("updating %v config map", suffix) + cm, err = c.KubeClient.ConfigMaps(c.Namespace).Update(context.TODO(), cm, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("could not update %s config map: %v", suffix, err) + } + c.PatroniConfigMaps[suffix] = cm + } + desiredAnnotations := c.annotationsSet(cm.Annotations) + if changed, _ := c.compareAnnotations(cm.Annotations, desiredAnnotations); changed { + patchData, err := metaAnnotationsPatch(desiredAnnotations) + if err != nil { + return fmt.Errorf("could not form patch for %s config map: %v", suffix, err) + } + cm, err = c.KubeClient.ConfigMaps(c.Namespace).Patch(context.TODO(), name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("could not patch annotations of %s config map: %v", suffix, err) + } + c.PatroniConfigMaps[suffix] = cm + } } if !k8sutil.ResourceNotFound(err) { return fmt.Errorf("could not get %s Patroni config map: %v", suffix, err) @@ -233,6 +256,29 @@ func (c *Cluster) syncPatroniEndpoint(suffix string) error { if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); err == nil { c.PatroniEndpoints[suffix] = ep + desiredOwnerRefs := c.ownerReferences() + if !reflect.DeepEqual(ep.ObjectMeta.OwnerReferences, desiredOwnerRefs) { + c.logger.Infof("new %s endpoints's owner references do not match the current ones", suffix) + ep.ObjectMeta.OwnerReferences = desiredOwnerRefs + c.setProcessName("updating %v endpoint", suffix) + ep, err = c.KubeClient.Endpoints(c.Namespace).Update(context.TODO(), ep, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("could not update %s endpoint: %v", suffix, err) + } + c.PatroniEndpoints[suffix] = ep + } + desiredAnnotations := c.annotationsSet(ep.Annotations) + if changed, _ := c.compareAnnotations(ep.Annotations, desiredAnnotations); changed { + patchData, err := metaAnnotationsPatch(desiredAnnotations) + if err != nil { + return fmt.Errorf("could not form patch for %s endpoint: %v", suffix, err) + } + ep, err = c.KubeClient.Endpoints(c.Namespace).Patch(context.TODO(), name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("could not patch annotations of %s endpoint: %v", suffix, err) + } + c.PatroniEndpoints[suffix] = ep + } } if !k8sutil.ResourceNotFound(err) { return fmt.Errorf("could not get %s Patroni endpoint: %v", suffix, err) From 461ddc17e2e420135cf126d8845840c128f2eee3 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 9 Aug 2024 22:03:14 +0200 Subject: [PATCH 18/24] add extra sync code for config service so it does not get too ugly --- pkg/cluster/k8sres.go | 3 -- pkg/cluster/resources.go | 2 +- pkg/cluster/sync.go | 91 ++++++++++++++++++++++++++++------------ 3 files changed, 65 insertions(+), 31 deletions(-) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 9a0de2623..89fb4b558 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1972,9 +1972,6 @@ func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, spec *ac return c.OpConfig.EnableMasterLoadBalancer - case Patroni: - return false - default: panic(fmt.Sprintf("Unknown role %v", role)) } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 15cecbd4a..f67498b61 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -316,7 +316,7 @@ func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newSe serviceName := util.NameFromMeta(oldService.ObjectMeta) match, reason := c.compareServices(oldService, newService) - if !match && role != Patroni { + if !match { c.logServiceChanges(role, oldService, newService, false, reason) c.setProcessName("updating %v service", role) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 28490442e..8a8308d23 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -180,7 +180,7 @@ func (c *Cluster) syncFinalizer() error { func (c *Cluster) syncPatroniResources() error { errors := make([]string, 0) - if err := c.syncService(Patroni); err != nil { + if err := c.syncPatroniService(); err != nil { errors = append(errors, fmt.Sprintf("could not sync %s service: %v", Patroni, err)) } @@ -208,20 +208,20 @@ func (c *Cluster) syncPatroniConfigMap(suffix string) error { cm *v1.ConfigMap err error ) - name := fmt.Sprintf("%s-%s", c.Name, suffix) - c.logger.Debugf("syncing %s Patroni config map", name) - c.setProcessName("syncing %s Patroni config map", name) + configMapName := fmt.Sprintf("%s-%s", c.Name, suffix) + c.logger.Debugf("syncing %s config map", configMapName) + c.setProcessName("syncing %s config map", configMapName) - if cm, err = c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); err == nil { + if cm, err = c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), configMapName, metav1.GetOptions{}); err == nil { c.PatroniConfigMaps[suffix] = cm desiredOwnerRefs := c.ownerReferences() if !reflect.DeepEqual(cm.ObjectMeta.OwnerReferences, desiredOwnerRefs) { - c.logger.Infof("new %s config map's owner references do not match the current ones", suffix) + c.logger.Infof("new %s config map's owner references do not match the current ones", configMapName) cm.ObjectMeta.OwnerReferences = desiredOwnerRefs - c.setProcessName("updating %v config map", suffix) + c.setProcessName("updating %s config map", configMapName) cm, err = c.KubeClient.ConfigMaps(c.Namespace).Update(context.TODO(), cm, metav1.UpdateOptions{}) if err != nil { - return fmt.Errorf("could not update %s config map: %v", suffix, err) + return fmt.Errorf("could not update %s config map: %v", configMapName, err) } c.PatroniConfigMaps[suffix] = cm } @@ -229,17 +229,17 @@ func (c *Cluster) syncPatroniConfigMap(suffix string) error { if changed, _ := c.compareAnnotations(cm.Annotations, desiredAnnotations); changed { patchData, err := metaAnnotationsPatch(desiredAnnotations) if err != nil { - return fmt.Errorf("could not form patch for %s config map: %v", suffix, err) + return fmt.Errorf("could not form patch for %s config map: %v", configMapName, err) } - cm, err = c.KubeClient.ConfigMaps(c.Namespace).Patch(context.TODO(), name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + cm, err = c.KubeClient.ConfigMaps(c.Namespace).Patch(context.TODO(), configMapName, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) if err != nil { - return fmt.Errorf("could not patch annotations of %s config map: %v", suffix, err) + return fmt.Errorf("could not patch annotations of %s config map: %v", configMapName, err) } c.PatroniConfigMaps[suffix] = cm } } if !k8sutil.ResourceNotFound(err) { - return fmt.Errorf("could not get %s Patroni config map: %v", suffix, err) + return fmt.Errorf("could not get %s config map: %v", configMapName, err) } return nil @@ -250,20 +250,20 @@ func (c *Cluster) syncPatroniEndpoint(suffix string) error { ep *v1.Endpoints err error ) - name := fmt.Sprintf("%s-%s", c.Name, suffix) - c.logger.Debugf("syncing %s Patroni endpoint", name) - c.setProcessName("syncing %s Patroni endpoint", name) + endpointName := fmt.Sprintf("%s-%s", c.Name, suffix) + c.logger.Debugf("syncing %s endpoint", endpointName) + c.setProcessName("syncing %s endpoint", endpointName) - if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); err == nil { + if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), endpointName, metav1.GetOptions{}); err == nil { c.PatroniEndpoints[suffix] = ep desiredOwnerRefs := c.ownerReferences() if !reflect.DeepEqual(ep.ObjectMeta.OwnerReferences, desiredOwnerRefs) { - c.logger.Infof("new %s endpoints's owner references do not match the current ones", suffix) + c.logger.Infof("new %s endpoints's owner references do not match the current ones", endpointName) ep.ObjectMeta.OwnerReferences = desiredOwnerRefs - c.setProcessName("updating %v endpoint", suffix) + c.setProcessName("updating %s endpoint", endpointName) ep, err = c.KubeClient.Endpoints(c.Namespace).Update(context.TODO(), ep, metav1.UpdateOptions{}) if err != nil { - return fmt.Errorf("could not update %s endpoint: %v", suffix, err) + return fmt.Errorf("could not update %s endpoint: %v", endpointName, err) } c.PatroniEndpoints[suffix] = ep } @@ -271,17 +271,58 @@ func (c *Cluster) syncPatroniEndpoint(suffix string) error { if changed, _ := c.compareAnnotations(ep.Annotations, desiredAnnotations); changed { patchData, err := metaAnnotationsPatch(desiredAnnotations) if err != nil { - return fmt.Errorf("could not form patch for %s endpoint: %v", suffix, err) + return fmt.Errorf("could not form patch for %s endpoint: %v", endpointName, err) } - ep, err = c.KubeClient.Endpoints(c.Namespace).Patch(context.TODO(), name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + ep, err = c.KubeClient.Endpoints(c.Namespace).Patch(context.TODO(), endpointName, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) if err != nil { - return fmt.Errorf("could not patch annotations of %s endpoint: %v", suffix, err) + return fmt.Errorf("could not patch annotations of %s endpoint: %v", endpointName, err) } c.PatroniEndpoints[suffix] = ep } } if !k8sutil.ResourceNotFound(err) { - return fmt.Errorf("could not get %s Patroni endpoint: %v", suffix, err) + return fmt.Errorf("could not get %s endpoint: %v", endpointName, err) + } + + return nil +} + +func (c *Cluster) syncPatroniService() error { + var ( + svc *v1.Service + err error + ) + serviceName := fmt.Sprintf("%s-%s", c.Name, Patroni) + c.setProcessName("syncing %s service", serviceName) + + if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil { + c.Services[Patroni] = svc + desiredOwnerRefs := c.ownerReferences() + if !reflect.DeepEqual(svc.ObjectMeta.OwnerReferences, desiredOwnerRefs) { + c.logger.Infof("new %s service's owner references do not match the current ones", serviceName) + svc.ObjectMeta.OwnerReferences = desiredOwnerRefs + c.setProcessName("updating %v service", serviceName) + svc, err = c.KubeClient.Services(c.Namespace).Update(context.TODO(), svc, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("could not update %s endpoint: %v", serviceName, err) + } + c.Services[Patroni] = svc + } + desiredAnnotations := c.annotationsSet(svc.Annotations) + if changed, _ := c.compareAnnotations(svc.Annotations, desiredAnnotations); changed { + patchData, err := metaAnnotationsPatch(desiredAnnotations) + if err != nil { + return fmt.Errorf("could not form patch for %s service: %v", serviceName, err) + } + svc, err = c.KubeClient.Services(c.Namespace).Patch(context.TODO(), serviceName, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("could not patch annotations of %s service: %v", serviceName, err) + } + c.Services[Patroni] = svc + } + } + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not get %s service: %v", serviceName, err) } return nil @@ -324,10 +365,6 @@ func (c *Cluster) syncService(role PostgresRole) error { if !k8sutil.ResourceNotFound(err) { return fmt.Errorf("could not get %s service: %v", role, err) } - // if config service does not exist Patroni will create it - if role == Patroni { - return nil - } // no existing service, create new one c.logger.Infof("could not find the cluster's %s service", role) From dd2deef38a32e2f52ca09f26fb980305f8d43a3a Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 12 Aug 2024 12:34:21 +0200 Subject: [PATCH 19/24] update docs --- docs/administrator.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/docs/administrator.md b/docs/administrator.md index e91c67640..d2b8e7039 100644 --- a/docs/administrator.md +++ b/docs/administrator.md @@ -252,17 +252,16 @@ will differ and trigger a rolling update of the pods. ## Owner References and Finalizers The Postgres Operator can set [owner references](https://kubernetes.io/docs/concepts/overview/working-with-objects/owners-dependents/) to most of a cluster's child resources to improve -monitoring with GitOps tools and enable cascading deletes. There are three +monitoring with GitOps tools and enable cascading deletes. There are two exceptions: * Persistent Volume Claims, because they are handled by the [PV Reclaim Policy]https://kubernetes.io/docs/tasks/administer-cluster/change-pv-reclaim-policy/ of the Stateful Set -* The config endpoint + headless service resource because it is managed by Patroni * Cross-namespace secrets, because owner references are not allowed across namespaces by design The operator would clean these resources up with its regular delete loop unless they got synced correctly. If for some reason the initial cluster sync fails, e.g. after a cluster creation or operator restart, a deletion of the -cluster manifest would leave orphaned resources behind which the user has to +cluster manifest might leave orphaned resources behind which the user has to clean up manually. Another option is to enable finalizers which first ensures the deletion of all From 1d76627ec315cfdaa229de111e2634a3ccab2289 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 12 Aug 2024 18:00:31 +0200 Subject: [PATCH 20/24] some bugfixes when comparing annotations and return err on found --- pkg/cluster/sync.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 8a8308d23..792db435f 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -15,6 +15,7 @@ import ( "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/k8sutil" + "golang.org/x/exp/maps" "golang.org/x/exp/slices" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" @@ -225,8 +226,10 @@ func (c *Cluster) syncPatroniConfigMap(suffix string) error { } c.PatroniConfigMaps[suffix] = cm } + annotations := make(map[string]string) + maps.Copy(annotations, cm.Annotations) desiredAnnotations := c.annotationsSet(cm.Annotations) - if changed, _ := c.compareAnnotations(cm.Annotations, desiredAnnotations); changed { + if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { patchData, err := metaAnnotationsPatch(desiredAnnotations) if err != nil { return fmt.Errorf("could not form patch for %s config map: %v", configMapName, err) @@ -238,7 +241,8 @@ func (c *Cluster) syncPatroniConfigMap(suffix string) error { c.PatroniConfigMaps[suffix] = cm } } - if !k8sutil.ResourceNotFound(err) { + // if config map does not exist yet, Patroni should create it + if err != nil && !k8sutil.ResourceNotFound(err) { return fmt.Errorf("could not get %s config map: %v", configMapName, err) } @@ -267,8 +271,10 @@ func (c *Cluster) syncPatroniEndpoint(suffix string) error { } c.PatroniEndpoints[suffix] = ep } + annotations := make(map[string]string) + maps.Copy(annotations, ep.Annotations) desiredAnnotations := c.annotationsSet(ep.Annotations) - if changed, _ := c.compareAnnotations(ep.Annotations, desiredAnnotations); changed { + if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { patchData, err := metaAnnotationsPatch(desiredAnnotations) if err != nil { return fmt.Errorf("could not form patch for %s endpoint: %v", endpointName, err) @@ -280,7 +286,8 @@ func (c *Cluster) syncPatroniEndpoint(suffix string) error { c.PatroniEndpoints[suffix] = ep } } - if !k8sutil.ResourceNotFound(err) { + // if endpoint does not exist yet, Patroni should create it + if err != nil && !k8sutil.ResourceNotFound(err) { return fmt.Errorf("could not get %s endpoint: %v", endpointName, err) } @@ -308,8 +315,10 @@ func (c *Cluster) syncPatroniService() error { } c.Services[Patroni] = svc } + annotations := make(map[string]string) + maps.Copy(annotations, svc.Annotations) desiredAnnotations := c.annotationsSet(svc.Annotations) - if changed, _ := c.compareAnnotations(svc.Annotations, desiredAnnotations); changed { + if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { patchData, err := metaAnnotationsPatch(desiredAnnotations) if err != nil { return fmt.Errorf("could not form patch for %s service: %v", serviceName, err) @@ -321,7 +330,8 @@ func (c *Cluster) syncPatroniService() error { c.Services[Patroni] = svc } } - if !k8sutil.ResourceNotFound(err) { + // if config service does not exist yet, Patroni should create it + if err != nil && !k8sutil.ResourceNotFound(err) { return fmt.Errorf("could not get %s service: %v", serviceName, err) } From b63263cd3092ecac6ab18f60ceac1cad639ba1e2 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 12 Aug 2024 18:00:58 +0200 Subject: [PATCH 21/24] sync Patroni resources on update event and extended unit tests --- pkg/cluster/cluster.go | 6 ++ pkg/cluster/util_test.go | 126 ++++++++++++++++++++++++++++++++++----- 2 files changed, 116 insertions(+), 16 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index ffc0ef987..f0f432753 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -988,6 +988,12 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { updateFailed = true } + // Patroni service and endpoints / config maps + if err := c.syncPatroniResources(); err != nil { + c.logger.Errorf("could not sync services: %v", err) + updateFailed = true + } + // Users func() { // check if users need to be synced during update diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index 58380b49a..d303e8bcc 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -16,12 +16,14 @@ import ( "github.com/zalando/postgres-operator/mocks" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" fakeacidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" + "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/patroni" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" k8sFake "k8s.io/client-go/kubernetes/fake" ) @@ -49,6 +51,7 @@ func newFakeK8sAnnotationsClient() (k8sutil.KubernetesClient, *k8sFake.Clientset PersistentVolumeClaimsGetter: clientSet.CoreV1(), PersistentVolumesGetter: clientSet.CoreV1(), EndpointsGetter: clientSet.CoreV1(), + ConfigMapsGetter: clientSet.CoreV1(), PodsGetter: clientSet.CoreV1(), DeploymentsGetter: clientSet.AppsV1(), CronJobsGetter: clientSet.BatchV1(), @@ -66,11 +69,7 @@ func checkResourcesInheritedAnnotations(cluster *Cluster, resultAnnotations map[ clusterOptions := clusterLabelsOptions(cluster) // helper functions containsAnnotations := func(expected map[string]string, actual map[string]string, objName string, objType string) error { - if expected == nil { - if len(actual) != 0 { - return fmt.Errorf("%s %v expected not to have any annotations, got: %#v", objType, objName, actual) - } - } else if !(reflect.DeepEqual(expected, actual)) { + if !util.MapContains(actual, expected) { return fmt.Errorf("%s %v expected annotations: %#v, got: %#v", objType, objName, expected, actual) } return nil @@ -183,7 +182,7 @@ func checkResourcesInheritedAnnotations(cluster *Cluster, resultAnnotations map[ return err } for _, cronJob := range cronJobList.Items { - if err := containsAnnotations(updateAnnotations(annotations), cronJob.Annotations, cronJob.ObjectMeta.Name, "Logical backup cron job"); err != nil { + if err := containsAnnotations(annotations, cronJob.Annotations, cronJob.ObjectMeta.Name, "Logical backup cron job"); err != nil { return err } if err := containsAnnotations(updateAnnotations(annotations), cronJob.Spec.JobTemplate.Spec.Template.Annotations, cronJob.Name, "Logical backup cron job pod template"); err != nil { @@ -219,8 +218,21 @@ func checkResourcesInheritedAnnotations(cluster *Cluster, resultAnnotations map[ return nil } + checkConfigMaps := func(annotations map[string]string) error { + cmList, err := cluster.KubeClient.ConfigMaps(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, cm := range cmList.Items { + if err := containsAnnotations(annotations, cm.Annotations, cm.ObjectMeta.Name, "ConfigMap"); err != nil { + return err + } + } + return nil + } + checkFuncs := []func(map[string]string) error{ - checkSts, checkPods, checkSvc, checkPdb, checkPooler, checkCronJob, checkPvc, checkSecrets, checkEndpoints, + checkSts, checkPods, checkSvc, checkPdb, checkPooler, checkCronJob, checkPvc, checkSecrets, checkEndpoints, checkConfigMaps, } for _, f := range checkFuncs { if err := f(resultAnnotations); err != nil { @@ -281,6 +293,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, OpConfig: config.Config{ PatroniAPICheckInterval: time.Duration(1), PatroniAPICheckTimeout: time.Duration(5), + KubernetesUseConfigMaps: true, ConnectionPooler: config.ConnectionPooler{ ConnectionPoolerDefaultCPURequest: "100m", ConnectionPoolerDefaultCPULimit: "100m", @@ -343,11 +356,57 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, } } + // resources which Patroni creates + if err = createPatroniResources(cluster); err != nil { + return nil, err + } + return cluster, nil } +func createPatroniResources(cluster *Cluster) error { + patroniService := cluster.generateService(Replica, &pg.Spec) + patroniService.ObjectMeta.Name = cluster.serviceName(Patroni) + _, err := cluster.KubeClient.Services(namespace).Create(context.TODO(), patroniService, metav1.CreateOptions{}) + if err != nil { + return err + } + + for _, suffix := range patroniObjectSuffixes { + metadata := metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", clusterName, suffix), + Namespace: namespace, + Annotations: map[string]string{ + "initialize": "123456789", + }, + Labels: cluster.labelsSet(false), + } + + if cluster.OpConfig.KubernetesUseConfigMaps { + configMap := v1.ConfigMap{ + ObjectMeta: metadata, + } + _, err := cluster.KubeClient.ConfigMaps(namespace).Create(context.TODO(), &configMap, metav1.CreateOptions{}) + if err != nil { + return err + } + } else { + endpoints := v1.Endpoints{ + ObjectMeta: metadata, + } + _, err := cluster.KubeClient.Endpoints(namespace).Create(context.TODO(), &endpoints, metav1.CreateOptions{}) + if err != nil { + return err + } + } + } + + return nil +} + func annotateResources(cluster *Cluster) error { clusterOptions := clusterLabelsOptions(cluster) + patchData, err := metaAnnotationsPatch(externalAnnotations) stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) if err != nil { @@ -355,7 +414,7 @@ func annotateResources(cluster *Cluster) error { } for _, sts := range stsList.Items { sts.Annotations = externalAnnotations - if _, err = cluster.KubeClient.StatefulSets(namespace).Update(context.TODO(), &sts, metav1.UpdateOptions{}); err != nil { + if _, err = cluster.KubeClient.StatefulSets(namespace).Patch(context.TODO(), sts.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil { return err } } @@ -366,7 +425,7 @@ func annotateResources(cluster *Cluster) error { } for _, pod := range podList.Items { pod.Annotations = externalAnnotations - if _, err = cluster.KubeClient.Pods(namespace).Update(context.TODO(), &pod, metav1.UpdateOptions{}); err != nil { + if _, err = cluster.KubeClient.Pods(namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil { return err } } @@ -377,7 +436,7 @@ func annotateResources(cluster *Cluster) error { } for _, svc := range svcList.Items { svc.Annotations = externalAnnotations - if _, err = cluster.KubeClient.Services(namespace).Update(context.TODO(), &svc, metav1.UpdateOptions{}); err != nil { + if _, err = cluster.KubeClient.Services(namespace).Patch(context.TODO(), svc.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil { return err } } @@ -388,7 +447,19 @@ func annotateResources(cluster *Cluster) error { } for _, pdb := range pdbList.Items { pdb.Annotations = externalAnnotations - _, err = cluster.KubeClient.PodDisruptionBudgets(namespace).Update(context.TODO(), &pdb, metav1.UpdateOptions{}) + _, err = cluster.KubeClient.PodDisruptionBudgets(namespace).Patch(context.TODO(), pdb.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return err + } + } + + cronJobList, err := cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, cronJob := range cronJobList.Items { + cronJob.Annotations = externalAnnotations + _, err = cluster.KubeClient.CronJobs(namespace).Patch(context.TODO(), cronJob.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) if err != nil { return err } @@ -400,7 +471,7 @@ func annotateResources(cluster *Cluster) error { } for _, pvc := range pvcList.Items { pvc.Annotations = externalAnnotations - if _, err = cluster.KubeClient.PersistentVolumeClaims(namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil { + if _, err = cluster.KubeClient.PersistentVolumeClaims(namespace).Patch(context.TODO(), pvc.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil { return err } } @@ -411,7 +482,7 @@ func annotateResources(cluster *Cluster) error { return err } deploy.Annotations = externalAnnotations - if _, err = cluster.KubeClient.Deployments(namespace).Update(context.TODO(), deploy, metav1.UpdateOptions{}); err != nil { + if _, err = cluster.KubeClient.Deployments(namespace).Patch(context.TODO(), deploy.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil { return err } } @@ -422,7 +493,7 @@ func annotateResources(cluster *Cluster) error { } for _, secret := range secrets.Items { secret.Annotations = externalAnnotations - if _, err = cluster.KubeClient.Secrets(namespace).Update(context.TODO(), &secret, metav1.UpdateOptions{}); err != nil { + if _, err = cluster.KubeClient.Secrets(namespace).Patch(context.TODO(), secret.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil { return err } } @@ -433,10 +504,22 @@ func annotateResources(cluster *Cluster) error { } for _, ep := range endpoints.Items { ep.Annotations = externalAnnotations - if _, err = cluster.KubeClient.Endpoints(namespace).Update(context.TODO(), &ep, metav1.UpdateOptions{}); err != nil { + if _, err = cluster.KubeClient.Endpoints(namespace).Patch(context.TODO(), ep.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil { return err } } + + configMaps, err := cluster.KubeClient.ConfigMaps(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, cm := range configMaps.Items { + cm.Annotations = externalAnnotations + if _, err = cluster.KubeClient.ConfigMaps(namespace).Patch(context.TODO(), cm.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil { + return err + } + } + return nil } @@ -503,7 +586,18 @@ func TestInheritedAnnotations(t *testing.T) { err = checkResourcesInheritedAnnotations(cluster, result) assert.NoError(t, err) - // 3. Existing annotations (should not be removed) + // 3. Change from ConfigMaps to Endpoints + err = cluster.deletePatroniResources() + assert.NoError(t, err) + cluster.OpConfig.KubernetesUseConfigMaps = false + err = createPatroniResources(cluster) + assert.NoError(t, err) + err = cluster.Sync(newSpec.DeepCopy()) + assert.NoError(t, err) + err = checkResourcesInheritedAnnotations(cluster, result) + assert.NoError(t, err) + + // 4. Existing annotations (should not be removed) err = annotateResources(cluster) assert.NoError(t, err) maps.Copy(result, externalAnnotations) From 275acd3794e6d1c5f1e499b27e157b6c27af09f8 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 12 Aug 2024 18:07:32 +0200 Subject: [PATCH 22/24] add one more errror return --- pkg/cluster/util_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index d303e8bcc..76311a353 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -407,6 +407,9 @@ func createPatroniResources(cluster *Cluster) error { func annotateResources(cluster *Cluster) error { clusterOptions := clusterLabelsOptions(cluster) patchData, err := metaAnnotationsPatch(externalAnnotations) + if err != nil { + return err + } stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) if err != nil { From bd9c0de3aab2d7fcb516c255da25699191c4dc3b Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 12 Aug 2024 18:34:51 +0200 Subject: [PATCH 23/24] add config service/endpoint owner references check to e2e test --- e2e/tests/test_e2e.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index c8f03b670..bee14acdd 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2481,11 +2481,15 @@ def check_cluster_child_resources_owner_references(self, cluster_name, cluster_n self.assertTrue(self.has_postgresql_owner_reference(svc.metadata.owner_references, inverse), "primary service owner reference check failed") replica_svc = k8s.api.core_v1.read_namespaced_service(cluster_name + "-repl", cluster_namespace) self.assertTrue(self.has_postgresql_owner_reference(replica_svc.metadata.owner_references, inverse), "replica service owner reference check failed") + config_svc = k8s.api.core_v1.read_namespaced_service(cluster_name + "-config", cluster_namespace) + self.assertTrue(self.has_postgresql_owner_reference(config_svc.metadata.owner_references, inverse), "config service owner reference check failed") ep = k8s.api.core_v1.read_namespaced_endpoints(cluster_name, cluster_namespace) self.assertTrue(self.has_postgresql_owner_reference(ep.metadata.owner_references, inverse), "primary endpoint owner reference check failed") replica_ep = k8s.api.core_v1.read_namespaced_endpoints(cluster_name + "-repl", cluster_namespace) - self.assertTrue(self.has_postgresql_owner_reference(replica_ep.metadata.owner_references, inverse), "replica owner reference check failed") + self.assertTrue(self.has_postgresql_owner_reference(replica_ep.metadata.owner_references, inverse), "replica endpoint owner reference check failed") + config_ep = k8s.api.core_v1.read_namespaced_endpoints(cluster_name + "-config", cluster_namespace) + self.assertTrue(self.has_postgresql_owner_reference(config_ep.metadata.owner_references, inverse), "config endpoint owner reference check failed") pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-pdb".format(cluster_name), cluster_namespace) self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption owner reference check failed") From c28abdec4fbbc3985e6b9b6cfcf89a6b60b781c3 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 12 Aug 2024 23:12:46 +0200 Subject: [PATCH 24/24] reflect another code review --- e2e/tests/test_e2e.py | 11 +++++------ pkg/cluster/sync.go | 15 ++++++--------- pkg/cluster/util_test.go | 2 +- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index bee14acdd..bd7dfef57 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -1667,19 +1667,18 @@ def test_owner_references(self): k8s.api.custom_objects_api.delete_namespaced_custom_object( "acid.zalan.do", "v1", self.test_namespace, "postgresqls", cluster_name) - # statefulset, pod disruption budget and secrets should be deleted via owner reference + # child resources with owner references should be deleted via owner references self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_label), 0, "Pods not deleted") self.eventuallyEqual(lambda: k8s.count_statefulsets_with_label(cluster_label), 0, "Statefulset not deleted") + self.eventuallyEqual(lambda: k8s.count_services_with_label(cluster_label), 0, "Services not deleted") + self.eventuallyEqual(lambda: k8s.count_endpoints_with_label(cluster_label), 0, "Endpoints not deleted") self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted") self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 0, "Secrets were not deleted") - time.sleep(5) # wait for the operator to also delete the leftovers + time.sleep(5) # wait for the operator to also delete the PVCs - # pvcs and Patroni config service/endpoint should not be affected by owner reference - # but deleted by the operator almost immediately + # pvcs do not have an owner reference but will deleted by the operator almost immediately self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 0, "PVCs not deleted") - self.eventuallyEqual(lambda: k8s.count_services_with_label(cluster_label), 0, "Patroni config service not deleted") - self.eventuallyEqual(lambda: k8s.count_endpoints_with_label(cluster_label), 0, "Patroni config endpoint not deleted") # disable owner references in config disable_owner_refs = { diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 792db435f..59aee34e6 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -240,9 +240,8 @@ func (c *Cluster) syncPatroniConfigMap(suffix string) error { } c.PatroniConfigMaps[suffix] = cm } - } - // if config map does not exist yet, Patroni should create it - if err != nil && !k8sutil.ResourceNotFound(err) { + } else if !k8sutil.ResourceNotFound(err) { + // if config map does not exist yet, Patroni should create it return fmt.Errorf("could not get %s config map: %v", configMapName, err) } @@ -285,9 +284,8 @@ func (c *Cluster) syncPatroniEndpoint(suffix string) error { } c.PatroniEndpoints[suffix] = ep } - } - // if endpoint does not exist yet, Patroni should create it - if err != nil && !k8sutil.ResourceNotFound(err) { + } else if !k8sutil.ResourceNotFound(err) { + // if endpoint does not exist yet, Patroni should create it return fmt.Errorf("could not get %s endpoint: %v", endpointName, err) } @@ -329,9 +327,8 @@ func (c *Cluster) syncPatroniService() error { } c.Services[Patroni] = svc } - } - // if config service does not exist yet, Patroni should create it - if err != nil && !k8sutil.ResourceNotFound(err) { + } else if !k8sutil.ResourceNotFound(err) { + // if config service does not exist yet, Patroni should create it return fmt.Errorf("could not get %s service: %v", serviceName, err) } diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index 76311a353..0176ea005 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -70,7 +70,7 @@ func checkResourcesInheritedAnnotations(cluster *Cluster, resultAnnotations map[ // helper functions containsAnnotations := func(expected map[string]string, actual map[string]string, objName string, objType string) error { if !util.MapContains(actual, expected) { - return fmt.Errorf("%s %v expected annotations: %#v, got: %#v", objType, objName, expected, actual) + return fmt.Errorf("%s %v expected annotations %#v to be contained in %#v", objType, objName, expected, actual) } return nil }