Skip to content
33 changes: 19 additions & 14 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,22 +909,8 @@ def test_ignored_annotations(self):
'''
k8s = self.k8s

annotation_patch = {
"metadata": {
"annotations": {
"k8s-status": "healthy"
},
}
}

try:
sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default')
old_sts_creation_timestamp = sts.metadata.creation_timestamp
k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch)
svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default')
old_svc_creation_timestamp = svc.metadata.creation_timestamp
k8s.api.core_v1.patch_namespaced_service(svc.metadata.name, svc.metadata.namespace, annotation_patch)

patch_config_ignored_annotations = {
"data": {
"ignored_annotations": "k8s-status",
Expand All @@ -933,6 +919,25 @@ def test_ignored_annotations(self):
k8s.update_config(patch_config_ignored_annotations)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default')
svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default')

annotation_patch = {
"metadata": {
"annotations": {
"k8s-status": "healthy"
},
}
}

old_sts_creation_timestamp = sts.metadata.creation_timestamp
k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch)
old_svc_creation_timestamp = svc.metadata.creation_timestamp
k8s.api.core_v1.patch_namespaced_service(svc.metadata.name, svc.metadata.namespace, annotation_patch)

k8s.delete_operator_pod()
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default')
new_sts_creation_timestamp = sts.metadata.creation_timestamp
svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default')
Expand Down
1 change: 1 addition & 0 deletions manifests/operator-service-account-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ rules:
- delete
- get
- update
- patch
# to check nodes for node readiness label
- apiGroups:
- ""
Expand Down
91 changes: 41 additions & 50 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apipolicyv1 "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -433,6 +434,12 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
reasons = append(reasons, "new statefulset's pod management policy do not match")
}

if c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy == nil {
c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy = &appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
WhenScaled: appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
}
}
if !reflect.DeepEqual(c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy, statefulSet.Spec.PersistentVolumeClaimRetentionPolicy) {
match = false
needsReplace = true
Expand Down Expand Up @@ -493,7 +500,6 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations); changed {
match = false
needsReplace = true
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason)
}
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.SecurityContext, statefulSet.Spec.Template.Spec.SecurityContext) {
Expand All @@ -513,9 +519,9 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i))
continue
}
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) {
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations); changed {
needsReplace = true
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one", name))
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one: %s", name, reason))
}
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) {
name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name
Expand Down Expand Up @@ -780,10 +786,6 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
}
}

if changed, reason := c.compareAnnotations(old.Annotations, new.Annotations); changed {
return !changed, "new service's annotations does not match the current one:" + reason
}

return true, ""
}

Expand All @@ -801,6 +803,12 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
newImage, curImage)
}

newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations
curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations
if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed {
return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason)
}

newPgVersion := getPgVersion(new)
curPgVersion := getPgVersion(cur)
if newPgVersion != curPgVersion {
Expand All @@ -818,6 +826,17 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
return true, ""
}

func (c *Cluster) comparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) {
//TODO: improve comparison
if match := reflect.DeepEqual(new.Spec, cur.Spec); !match {
return false, "new PDB spec does not match the current one"
}
if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations); changed {
return false, "new PDB's annotations does not match the current one:" + reason
}
return true, ""
}

func getPgVersion(cronJob *batchv1.CronJob) string {
envs := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env
for _, env := range envs {
Expand Down Expand Up @@ -883,7 +902,6 @@ func (c *Cluster) hasFinalizer() bool {
func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
updateFailed := false
userInitFailed := false
syncStatefulSet := false

c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -914,20 +932,16 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
if IsBiggerPostgresVersion(oldSpec.Spec.PostgresqlParam.PgVersion, c.GetDesiredMajorVersion()) {
c.logger.Infof("postgresql version increased (%s -> %s), depending on config manual upgrade needed",
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
syncStatefulSet = true
} else {
c.logger.Infof("postgresql major version unchanged or smaller, no changes needed")
// sticking with old version, this will also advance GetDesiredVersion next time.
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
}

// Service
if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) ||
!reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) {
if err := c.syncServices(); err != nil {
c.logger.Errorf("could not sync services: %v", err)
updateFailed = true
}
if err := c.syncServices(); err != nil {
c.logger.Errorf("could not sync services: %v", err)
updateFailed = true
}

// Users
Expand All @@ -946,15 +960,19 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
// only when streams were not specified in oldSpec but in newSpec
needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0

if !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser {
annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations)

initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser
if initUsers {
c.logger.Debugf("initialize users")
if err := c.initUsers(); err != nil {
c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err)
userInitFailed = true
updateFailed = true
return
}

}
if initUsers || annotationsChanged {
c.logger.Debugf("syncing secrets")
//TODO: mind the secrets of the deleted/new users
if err := c.syncSecrets(); err != nil {
Expand All @@ -968,38 +986,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
if c.OpConfig.StorageResizeMode != "off" {
c.syncVolumes()
} else {
c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume sync.")
}

// streams configuration
if len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0 {
syncStatefulSet = true
c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume size sync.")
}

// Statefulset
func() {
oldSs, err := c.generateStatefulSet(&oldSpec.Spec)
if err != nil {
c.logger.Errorf("could not generate old statefulset spec: %v", err)
if err := c.syncStatefulSet(); err != nil {
c.logger.Errorf("could not sync statefulsets: %v", err)
updateFailed = true
return
}

newSs, err := c.generateStatefulSet(&newSpec.Spec)
if err != nil {
c.logger.Errorf("could not generate new statefulset spec: %v", err)
updateFailed = true
return
}

if syncStatefulSet || !reflect.DeepEqual(oldSs, newSs) {
c.logger.Debugf("syncing statefulsets")
syncStatefulSet = false
// TODO: avoid generating the StatefulSet object twice by passing it to syncStatefulSet
if err := c.syncStatefulSet(); err != nil {
c.logger.Errorf("could not sync statefulsets: %v", err)
updateFailed = true
}
}
}()

Expand All @@ -1011,12 +1005,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}

// pod disruption budget
if oldSpec.Spec.NumberOfInstances != newSpec.Spec.NumberOfInstances {
c.logger.Debug("syncing pod disruption budgets")
if err := c.syncPodDisruptionBudget(true); err != nil {
c.logger.Errorf("could not sync pod disruption budget: %v", err)
updateFailed = true
}
if err := c.syncPodDisruptionBudget(true); err != nil {
c.logger.Errorf("could not sync pod disruption budget: %v", err)
updateFailed = true
}

// logical backup job
Expand Down
Loading