Skip to content

Commit ac3990c

Browse files
committed
Annotate logical backup jobs/pods, endpoints
1 parent 12a4113 commit ac3990c

File tree

4 files changed

+59
-5
lines changed

4 files changed

+59
-5
lines changed

pkg/cluster/cluster.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,12 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
797797
newImage, curImage)
798798
}
799799

800+
newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations
801+
curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations
802+
if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed {
803+
return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason)
804+
}
805+
800806
newPgVersion := getPgVersion(new)
801807
curPgVersion := getPgVersion(cur)
802808
if newPgVersion != curPgVersion {

pkg/cluster/k8sres.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2048,9 +2048,10 @@ func (c *Cluster) getCustomServiceAnnotations(role PostgresRole, spec *acidv1.Po
20482048
func (c *Cluster) generateEndpoint(role PostgresRole, subsets []v1.EndpointSubset) *v1.Endpoints {
20492049
endpoints := &v1.Endpoints{
20502050
ObjectMeta: metav1.ObjectMeta{
2051-
Name: c.endpointName(role),
2052-
Namespace: c.Namespace,
2053-
Labels: c.roleLabelsSet(true, role),
2051+
Name: c.endpointName(role),
2052+
Namespace: c.Namespace,
2053+
Annotations: c.annotationsSet(nil),
2054+
Labels: c.roleLabelsSet(true, role),
20542055
},
20552056
}
20562057
if len(subsets) > 0 {

pkg/cluster/sync.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,17 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error {
239239
c.setProcessName("syncing %s endpoint", role)
240240

241241
if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), c.endpointName(role), metav1.GetOptions{}); err == nil {
242-
// TODO: No syncing of endpoints here, is this covered completely by updateService?
242+
desiredEp := c.generateEndpoint(role, ep.Subsets)
243+
if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations); changed {
244+
patchData, err := metaAnnotationsPatch(desiredEp.Annotations)
245+
if err != nil {
246+
return fmt.Errorf("could not form patch for %s endpoint: %v", role, err)
247+
}
248+
ep, err = c.KubeClient.Endpoints(c.Namespace).Patch(context.TODO(), c.endpointName(role), types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
249+
if err != nil {
250+
return fmt.Errorf("could not patch annotations of %s endpoint: %v", role, err)
251+
}
252+
}
243253
c.Endpoints[role] = ep
244254
return nil
245255
}
@@ -1410,6 +1420,16 @@ func (c *Cluster) syncLogicalBackupJob() error {
14101420
}
14111421
c.logger.Info("the logical backup job is synced")
14121422
}
1423+
if changed, _ := c.compareAnnotations(job.Annotations, desiredJob.Annotations); changed {
1424+
patchData, err := metaAnnotationsPatch(desiredJob.Annotations)
1425+
if err != nil {
1426+
return fmt.Errorf("could not form patch for the logical backup job %q: %v", jobName, err)
1427+
}
1428+
_, err = c.KubeClient.CronJobs(c.Namespace).Patch(context.TODO(), jobName, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
1429+
if err != nil {
1430+
return fmt.Errorf("could not patch annotations of the logical backup job %q: %v", jobName, err)
1431+
}
1432+
}
14131433
return nil
14141434
}
14151435
if !k8sutil.ResourceNotFound(err) {

pkg/cluster/util_test.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,21 @@ func checkResourcesInheritedAnnotations(cluster *Cluster, resultAnnotations map[
180180
return nil
181181
}
182182

183+
checkEndpoints := func(annotations map[string]string) error {
184+
endpointsList, err := cluster.KubeClient.Endpoints(namespace).List(context.TODO(), clusterOptions)
185+
if err != nil {
186+
return err
187+
}
188+
for _, ep := range endpointsList.Items {
189+
if err := containsAnnotations(annotations, ep.Annotations, ep.Name, "Endpoints"); err != nil {
190+
return err
191+
}
192+
}
193+
return nil
194+
}
195+
183196
checkFuncs := []func(map[string]string) error{
184-
checkSts, checkPods, checkSvc, checkPdb, checkPooler, checkPvc, checkSecrets,
197+
checkSts, checkPods, checkSvc, checkPdb, checkPooler, checkPvc, checkSecrets, checkEndpoints,
185198
}
186199
for _, f := range checkFuncs {
187200
if err := f(resultAnnotations); err != nil {
@@ -232,6 +245,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster,
232245
Volume: acidv1.Volume{
233246
Size: "1Gi",
234247
},
248+
NumberOfInstances: 2,
235249
},
236250
}
237251

@@ -260,6 +274,8 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster,
260274
PodRoleLabel: "spilo-role",
261275
ResourceCheckInterval: time.Duration(testResourceCheckInterval),
262276
ResourceCheckTimeout: time.Duration(testResourceCheckTimeout),
277+
MinInstances: -1,
278+
MaxInstances: -1,
263279
},
264280
},
265281
}, client, pg, logger, eventRecorder)
@@ -379,6 +395,17 @@ func annotateResources(cluster *Cluster) error {
379395
return err
380396
}
381397
}
398+
399+
endpoints, err := cluster.KubeClient.Endpoints(namespace).List(context.TODO(), clusterOptions)
400+
if err != nil {
401+
return err
402+
}
403+
for _, ep := range endpoints.Items {
404+
ep.Annotations = externalAnnotations
405+
if _, err = cluster.KubeClient.Endpoints(namespace).Update(context.TODO(), &ep, metav1.UpdateOptions{}); err != nil {
406+
return err
407+
}
408+
}
382409
return nil
383410
}
384411

0 commit comments

Comments
 (0)