Skip to content

Commit 4d21e44

Browse files
committed
Initial inherited annotations fix implementation
1 parent d70cdf1 commit 4d21e44

File tree

11 files changed

+605
-272
lines changed

11 files changed

+605
-272
lines changed

manifests/operator-service-account-rbac.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ rules:
173173
- get
174174
- list
175175
- patch
176+
- update
176177
# to CRUD cron jobs for logical backups
177178
- apiGroups:
178179
- batch

pkg/cluster/cluster.go

Lines changed: 41 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
appsv1 "k8s.io/api/apps/v1"
3131
batchv1 "k8s.io/api/batch/v1"
3232
v1 "k8s.io/api/core/v1"
33+
apipolicyv1 "k8s.io/api/policy/v1"
3334
policyv1 "k8s.io/api/policy/v1"
3435
rbacv1 "k8s.io/api/rbac/v1"
3536
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -400,7 +401,7 @@ func (c *Cluster) Create() (err error) {
400401

401402
if len(c.Spec.Streams) > 0 {
402403
// creating streams requires syncing the statefulset first
403-
err = c.syncStatefulSet()
404+
err = c.syncStatefulSet(true)
404405
if err != nil {
405406
return fmt.Errorf("could not sync statefulset: %v", err)
406407
}
@@ -493,7 +494,6 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
493494
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations); changed {
494495
match = false
495496
needsReplace = true
496-
needsRollUpdate = true
497497
reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason)
498498
}
499499
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.SecurityContext, statefulSet.Spec.Template.Spec.SecurityContext) {
@@ -513,9 +513,9 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
513513
reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i))
514514
continue
515515
}
516-
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) {
516+
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations); changed {
517517
needsReplace = true
518-
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one", name))
518+
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one: ", name)+reason)
519519
}
520520
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) {
521521
name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name
@@ -764,6 +764,16 @@ func (c *Cluster) compareAnnotations(old, new map[string]string) (bool, string)
764764

765765
}
766766

767+
func (c *Cluster) extractIgnoredAnnotations(annoList map[string]string) map[string]string {
768+
result := make(map[string]string)
769+
for _, ignore := range c.OpConfig.IgnoredAnnotations {
770+
if _, ok := annoList[ignore]; ok {
771+
result[ignore] = annoList[ignore]
772+
}
773+
}
774+
return result
775+
}
776+
767777
func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
768778
if old.Spec.Type != new.Spec.Type {
769779
return false, fmt.Sprintf("new service's type %q does not match the current one %q",
@@ -818,6 +828,17 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
818828
return true, ""
819829
}
820830

831+
func (c *Cluster) ComparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) {
832+
//TODO: improve comparison
833+
if match := reflect.DeepEqual(new.Spec, cur.Spec); !match {
834+
return false, "new PDB spec does not match the current one"
835+
}
836+
if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations); changed {
837+
return false, "new PDB's annotations does not match the current one:" + reason
838+
}
839+
return true, ""
840+
}
841+
821842
func getPgVersion(cronJob *batchv1.CronJob) string {
822843
envs := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env
823844
for _, env := range envs {
@@ -922,12 +943,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
922943
}
923944

924945
// Service
925-
if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) ||
926-
!reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) {
927-
if err := c.syncServices(); err != nil {
928-
c.logger.Errorf("could not sync services: %v", err)
929-
updateFailed = true
930-
}
946+
if err := c.syncServices(); err != nil {
947+
c.logger.Errorf("could not sync services: %v", err)
948+
updateFailed = true
931949
}
932950

933951
// Users
@@ -946,15 +964,19 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
946964
// only when streams were not specified in oldSpec but in newSpec
947965
needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0
948966

949-
if !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser {
967+
annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations)
968+
969+
initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser
970+
if initUsers {
950971
c.logger.Debugf("initialize users")
951972
if err := c.initUsers(); err != nil {
952973
c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err)
953974
userInitFailed = true
954975
updateFailed = true
955976
return
956977
}
957-
978+
}
979+
if initUsers || annotationsChanged {
958980
c.logger.Debugf("syncing secrets")
959981
//TODO: mind the secrets of the deleted/new users
960982
if err := c.syncSecrets(); err != nil {
@@ -968,7 +990,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
968990
if c.OpConfig.StorageResizeMode != "off" {
969991
c.syncVolumes()
970992
} else {
971-
c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume sync.")
993+
c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume size sync.")
972994
}
973995

974996
// streams configuration
@@ -978,29 +1000,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
9781000

9791001
// Statefulset
9801002
func() {
981-
oldSs, err := c.generateStatefulSet(&oldSpec.Spec)
982-
if err != nil {
983-
c.logger.Errorf("could not generate old statefulset spec: %v", err)
984-
updateFailed = true
985-
return
986-
}
987-
988-
newSs, err := c.generateStatefulSet(&newSpec.Spec)
989-
if err != nil {
990-
c.logger.Errorf("could not generate new statefulset spec: %v", err)
1003+
if err := c.syncStatefulSet(syncStatefulSet); err != nil {
1004+
c.logger.Errorf("could not sync statefulsets: %v", err)
9911005
updateFailed = true
992-
return
993-
}
994-
995-
if syncStatefulSet || !reflect.DeepEqual(oldSs, newSs) {
996-
c.logger.Debugf("syncing statefulsets")
997-
syncStatefulSet = false
998-
// TODO: avoid generating the StatefulSet object twice by passing it to syncStatefulSet
999-
if err := c.syncStatefulSet(); err != nil {
1000-
c.logger.Errorf("could not sync statefulsets: %v", err)
1001-
updateFailed = true
1002-
}
10031006
}
1007+
syncStatefulSet = false
10041008
}()
10051009

10061010
// add or remove standby_cluster section from Patroni config depending on changes in standby section
@@ -1011,12 +1015,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
10111015
}
10121016

10131017
// pod disruption budget
1014-
if oldSpec.Spec.NumberOfInstances != newSpec.Spec.NumberOfInstances {
1015-
c.logger.Debug("syncing pod disruption budgets")
1016-
if err := c.syncPodDisruptionBudget(true); err != nil {
1017-
c.logger.Errorf("could not sync pod disruption budget: %v", err)
1018-
updateFailed = true
1019-
}
1018+
if err := c.syncPodDisruptionBudget(true); err != nil {
1019+
c.logger.Errorf("could not sync pod disruption budget: %v", err)
1020+
updateFailed = true
10201021
}
10211022

10221023
// logical backup job

pkg/cluster/connection_pooler.go

Lines changed: 34 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/sirupsen/logrus"
1111
acidzalando "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do"
1212
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
13+
"golang.org/x/exp/maps"
1314
appsv1 "k8s.io/api/apps/v1"
1415
v1 "k8s.io/api/core/v1"
1516
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -691,26 +692,6 @@ func updateConnectionPoolerDeployment(KubeClient k8sutil.KubernetesClient, newDe
691692
return deployment, nil
692693
}
693694

694-
// updateConnectionPoolerAnnotations updates the annotations of connection pooler deployment
695-
func updateConnectionPoolerAnnotations(KubeClient k8sutil.KubernetesClient, deployment *appsv1.Deployment, annotations map[string]string) (*appsv1.Deployment, error) {
696-
patchData, err := metaAnnotationsPatch(annotations)
697-
if err != nil {
698-
return nil, fmt.Errorf("could not form patch for the connection pooler deployment metadata: %v", err)
699-
}
700-
result, err := KubeClient.Deployments(deployment.Namespace).Patch(
701-
context.TODO(),
702-
deployment.Name,
703-
types.MergePatchType,
704-
[]byte(patchData),
705-
metav1.PatchOptions{},
706-
"")
707-
if err != nil {
708-
return nil, fmt.Errorf("could not patch connection pooler annotations %q: %v", patchData, err)
709-
}
710-
return result, nil
711-
712-
}
713-
714695
// Test if two connection pooler configuration needs to be synced. For simplicity
715696
// compare not the actual K8S objects, but the configuration itself and request
716697
// sync if there is any difference.
@@ -1022,6 +1003,25 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
10221003
syncReason = append(syncReason, specReason...)
10231004
}
10241005

1006+
newPodAnnotations := c.annotationsSet(c.generatePodAnnotations(&c.Spec))
1007+
if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations); changed {
1008+
specSync = true
1009+
syncReason = append(syncReason, []string{"new connection pooler's pod template annotations do not match the current one: " + reason}...)
1010+
deployment.Spec.Template.Annotations = newPodAnnotations
1011+
}
1012+
newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations
1013+
ignoredAnnotations := c.extractIgnoredAnnotations(deployment.Annotations)
1014+
maps.Copy(newAnnotations, ignoredAnnotations)
1015+
if changed, reason := c.compareAnnotations(deployment.Annotations, newAnnotations); changed {
1016+
deployment.Annotations = newAnnotations
1017+
c.logger.Infof("new connection pooler deployments's annotations does not match the current one:" + reason)
1018+
c.logger.Debug("updating connection pooler deployments's annotations")
1019+
deployment, err = c.KubeClient.Deployments(deployment.Namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{})
1020+
if err != nil {
1021+
return nil, fmt.Errorf("could not update connection pooler annotations %q: %v", deployment.Name, err)
1022+
}
1023+
}
1024+
10251025
defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(&c.Config, newConnectionPooler, deployment)
10261026
syncReason = append(syncReason, defaultsReason...)
10271027

@@ -1042,15 +1042,6 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
10421042
}
10431043
}
10441044

1045-
newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(c.ConnectionPooler[role].Deployment.Annotations))
1046-
if newAnnotations != nil {
1047-
deployment, err = updateConnectionPoolerAnnotations(c.KubeClient, c.ConnectionPooler[role].Deployment, newAnnotations)
1048-
if err != nil {
1049-
return nil, err
1050-
}
1051-
c.ConnectionPooler[role].Deployment = deployment
1052-
}
1053-
10541045
// check if pooler pods must be replaced due to secret update
10551046
listOptions := metav1.ListOptions{
10561047
LabelSelector: labels.Set(c.connectionPoolerLabels(role, true).MatchLabels).String(),
@@ -1076,22 +1067,27 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
10761067
if err != nil {
10771068
return nil, fmt.Errorf("could not delete pooler pod: %v", err)
10781069
}
1070+
} else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed {
1071+
newAnnotations := c.extractIgnoredAnnotations(pod.Annotations)
1072+
maps.Copy(newAnnotations, deployment.Spec.Template.Annotations)
1073+
pod.Annotations = newAnnotations
1074+
c.logger.Debugf("updating annotations for connection pooler's pod %q", pod.Name)
1075+
_, err := c.KubeClient.Pods(pod.Namespace).Update(context.TODO(), &pod, metav1.UpdateOptions{})
1076+
if err != nil {
1077+
return nil, fmt.Errorf("could not update annotations for pod %q: %v", pod.Name, err)
1078+
}
10791079
}
10801080
}
10811081

10821082
if service, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil {
10831083
c.ConnectionPooler[role].Service = service
10841084
desiredSvc := c.generateConnectionPoolerService(c.ConnectionPooler[role])
1085-
if match, reason := c.compareServices(service, desiredSvc); !match {
1086-
syncReason = append(syncReason, reason)
1087-
c.logServiceChanges(role, service, desiredSvc, false, reason)
1088-
newService, err = c.updateService(role, service, desiredSvc)
1089-
if err != nil {
1090-
return syncReason, fmt.Errorf("could not update %s service to match desired state: %v", role, err)
1091-
}
1092-
c.ConnectionPooler[role].Service = newService
1093-
c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta))
1085+
newService, err = c.updateService(role, service, desiredSvc)
1086+
if err != nil {
1087+
return syncReason, fmt.Errorf("could not update %s service to match desired state: %v", role, err)
10941088
}
1089+
c.ConnectionPooler[role].Service = newService
1090+
c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta))
10951091
return NoSync, nil
10961092
}
10971093

pkg/cluster/k8sres_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3306,7 +3306,7 @@ func TestGenerateResourceRequirements(t *testing.T) {
33063306
cluster.Namespace = namespace
33073307
_, err := cluster.createStatefulSet()
33083308
if k8sutil.ResourceAlreadyExists(err) {
3309-
err = cluster.syncStatefulSet()
3309+
err = cluster.syncStatefulSet(true)
33103310
}
33113311
assert.NoError(t, err)
33123312

pkg/cluster/resources.go

Lines changed: 10 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -291,51 +291,27 @@ func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newSe
291291
err error
292292
)
293293

294+
match, reason := c.compareServices(oldService, newService)
295+
if match {
296+
return oldService, nil
297+
}
298+
299+
c.logServiceChanges(role, oldService, newService, false, reason)
294300
c.setProcessName("updating %v service", role)
295301

296302
serviceName := util.NameFromMeta(oldService.ObjectMeta)
297303

298-
// update the service annotation in order to propagate ELB notation.
299-
if len(newService.ObjectMeta.Annotations) > 0 {
300-
if annotationsPatchData, err := metaAnnotationsPatch(newService.ObjectMeta.Annotations); err == nil {
301-
_, err = c.KubeClient.Services(serviceName.Namespace).Patch(
302-
context.TODO(),
303-
serviceName.Name,
304-
types.MergePatchType,
305-
[]byte(annotationsPatchData),
306-
metav1.PatchOptions{},
307-
"")
308-
309-
if err != nil {
310-
return nil, fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err)
311-
}
312-
} else {
313-
return nil, fmt.Errorf("could not form patch for the service metadata: %v", err)
314-
}
315-
}
316-
317304
// now, patch the service spec, but when disabling LoadBalancers do update instead
318305
// patch does not work because of LoadBalancerSourceRanges field (even if set to nil)
319306
oldServiceType := oldService.Spec.Type
320307
newServiceType := newService.Spec.Type
321308
if newServiceType == "ClusterIP" && newServiceType != oldServiceType {
322309
newService.ResourceVersion = oldService.ResourceVersion
323310
newService.Spec.ClusterIP = oldService.Spec.ClusterIP
324-
svc, err = c.KubeClient.Services(serviceName.Namespace).Update(context.TODO(), newService, metav1.UpdateOptions{})
325-
if err != nil {
326-
return nil, fmt.Errorf("could not update service %q: %v", serviceName, err)
327-
}
328-
} else {
329-
patchData, err := specPatch(newService.Spec)
330-
if err != nil {
331-
return nil, fmt.Errorf("could not form patch for the service %q: %v", serviceName, err)
332-
}
333-
334-
svc, err = c.KubeClient.Services(serviceName.Namespace).Patch(
335-
context.TODO(), serviceName.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "")
336-
if err != nil {
337-
return nil, fmt.Errorf("could not patch service %q: %v", serviceName, err)
338-
}
311+
}
312+
svc, err = c.KubeClient.Services(serviceName.Namespace).Update(context.TODO(), newService, metav1.UpdateOptions{})
313+
if err != nil {
314+
return nil, fmt.Errorf("could not update service %q: %v", serviceName, err)
339315
}
340316

341317
return svc, nil

0 commit comments

Comments
 (0)