Skip to content
1 change: 1 addition & 0 deletions manifests/operator-service-account-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ rules:
- delete
- get
- update
- patch
# to check nodes for node readiness label
- apiGroups:
- ""
Expand Down
87 changes: 43 additions & 44 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apipolicyv1 "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -400,7 +401,7 @@ func (c *Cluster) Create() (err error) {

if len(c.Spec.Streams) > 0 {
// creating streams requires syncing the statefulset first
err = c.syncStatefulSet()
err = c.syncStatefulSet(true)
if err != nil {
return fmt.Errorf("could not sync statefulset: %v", err)
}
Expand Down Expand Up @@ -433,6 +434,12 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
reasons = append(reasons, "new statefulset's pod management policy do not match")
}

if c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy == nil {
c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy = &appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
WhenScaled: appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
}
}
if !reflect.DeepEqual(c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy, statefulSet.Spec.PersistentVolumeClaimRetentionPolicy) {
match = false
needsReplace = true
Expand Down Expand Up @@ -493,7 +500,6 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations); changed {
match = false
needsReplace = true
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason)
}
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.SecurityContext, statefulSet.Spec.Template.Spec.SecurityContext) {
Expand All @@ -513,9 +519,9 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i))
continue
}
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) {
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations); changed {
needsReplace = true
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one", name))
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one: ", name)+reason)
}
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) {
name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name
Expand Down Expand Up @@ -780,10 +786,6 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
}
}

if changed, reason := c.compareAnnotations(old.Annotations, new.Annotations); changed {
return !changed, "new service's annotations does not match the current one:" + reason
}

return true, ""
}

Expand All @@ -801,6 +803,12 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
newImage, curImage)
}

newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations
curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations
if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed {
return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason)
}

newPgVersion := getPgVersion(new)
curPgVersion := getPgVersion(cur)
if newPgVersion != curPgVersion {
Expand All @@ -818,6 +826,17 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
return true, ""
}

func (c *Cluster) ComparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) {
//TODO: improve comparison
if match := reflect.DeepEqual(new.Spec, cur.Spec); !match {
return false, "new PDB spec does not match the current one"
}
if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations); changed {
return false, "new PDB's annotations does not match the current one:" + reason
}
return true, ""
}

func getPgVersion(cronJob *batchv1.CronJob) string {
envs := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env
for _, env := range envs {
Expand Down Expand Up @@ -922,12 +941,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}

// Service
if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) ||
!reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) {
if err := c.syncServices(); err != nil {
c.logger.Errorf("could not sync services: %v", err)
updateFailed = true
}
if err := c.syncServices(); err != nil {
c.logger.Errorf("could not sync services: %v", err)
updateFailed = true
}

// Users
Expand All @@ -946,15 +962,19 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
// only when streams were not specified in oldSpec but in newSpec
needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0

if !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser {
annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations)

initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser
if initUsers {
c.logger.Debugf("initialize users")
if err := c.initUsers(); err != nil {
c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err)
userInitFailed = true
updateFailed = true
return
}

}
if initUsers || annotationsChanged {
c.logger.Debugf("syncing secrets")
//TODO: mind the secrets of the deleted/new users
if err := c.syncSecrets(); err != nil {
Expand All @@ -968,7 +988,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
if c.OpConfig.StorageResizeMode != "off" {
c.syncVolumes()
} else {
c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume sync.")
c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume size sync.")
}

// streams configuration
Expand All @@ -978,29 +998,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {

// Statefulset
func() {
oldSs, err := c.generateStatefulSet(&oldSpec.Spec)
if err != nil {
c.logger.Errorf("could not generate old statefulset spec: %v", err)
updateFailed = true
return
}

newSs, err := c.generateStatefulSet(&newSpec.Spec)
if err != nil {
c.logger.Errorf("could not generate new statefulset spec: %v", err)
if err := c.syncStatefulSet(syncStatefulSet); err != nil {
c.logger.Errorf("could not sync statefulsets: %v", err)
updateFailed = true
return
}

if syncStatefulSet || !reflect.DeepEqual(oldSs, newSs) {
c.logger.Debugf("syncing statefulsets")
syncStatefulSet = false
// TODO: avoid generating the StatefulSet object twice by passing it to syncStatefulSet
if err := c.syncStatefulSet(); err != nil {
c.logger.Errorf("could not sync statefulsets: %v", err)
updateFailed = true
}
}
syncStatefulSet = false
}()

// add or remove standby_cluster section from Patroni config depending on changes in standby section
Expand All @@ -1011,12 +1013,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}

// pod disruption budget
if oldSpec.Spec.NumberOfInstances != newSpec.Spec.NumberOfInstances {
c.logger.Debug("syncing pod disruption budgets")
if err := c.syncPodDisruptionBudget(true); err != nil {
c.logger.Errorf("could not sync pod disruption budget: %v", err)
updateFailed = true
}
if err := c.syncPodDisruptionBudget(true); err != nil {
c.logger.Errorf("could not sync pod disruption budget: %v", err)
updateFailed = true
}

// logical backup job
Expand Down
Loading