Skip to content

Commit 7f9d0a1

Browse files
Add topologySpreadConstraints configuration to pod spec.
1 parent 746df0d commit 7f9d0a1

File tree

7 files changed

+158
-30
lines changed

7 files changed

+158
-30
lines changed

e2e/tests/test_e2e.py

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ def compare_config():
559559

560560
pg_patch_config["spec"]["patroni"]["slots"][slot_to_change]["database"] = "bar"
561561
del pg_patch_config["spec"]["patroni"]["slots"][slot_to_remove]
562-
562+
563563
k8s.api.custom_objects_api.patch_namespaced_custom_object(
564564
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_delete_slot_patch)
565565

@@ -576,7 +576,7 @@ def compare_config():
576576

577577
self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", get_slot_query%("database", slot_to_change))[0], "bar",
578578
"The replication slot cannot be updated", 10, 5)
579-
579+
580580
# make sure slot from Patroni didn't get deleted
581581
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_slot_query%("slot_name", patroni_slot))), 1,
582582
"The replication slot from Patroni gets deleted", 10, 5)
@@ -932,7 +932,7 @@ def test_ignored_annotations(self):
932932
},
933933
}
934934
}
935-
935+
936936
old_sts_creation_timestamp = sts.metadata.creation_timestamp
937937
k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch)
938938
old_svc_creation_timestamp = svc.metadata.creation_timestamp
@@ -1369,7 +1369,7 @@ def test_persistent_volume_claim_retention_policy(self):
13691369
}
13701370
k8s.update_config(patch_scaled_policy_retain)
13711371
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
1372-
1372+
13731373
# decrease the number of instances
13741374
k8s.api.custom_objects_api.patch_namespaced_custom_object(
13751375
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', pg_patch_scale_down_instances)
@@ -1646,7 +1646,6 @@ def test_node_readiness_label(self):
16461646
# toggle pod anti affinity to move replica away from master node
16471647
self.assert_distributed_pods(master_nodes)
16481648

1649-
16501649
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
16511650
def test_overwrite_pooler_deployment(self):
16521651
pooler_name = 'acid-minimal-cluster-pooler'
@@ -1799,7 +1798,7 @@ def test_password_rotation(self):
17991798
},
18001799
}
18011800
k8s.api.core_v1.patch_namespaced_secret(
1802-
name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do",
1801+
name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do",
18031802
namespace="default",
18041803
body=secret_fake_rotation)
18051804

@@ -1816,7 +1815,7 @@ def test_password_rotation(self):
18161815
"enable_password_rotation": "true",
18171816
"inherited_annotations": "environment",
18181817
"password_rotation_interval": "30",
1819-
"password_rotation_user_retention": "30", # should be set to 60
1818+
"password_rotation_user_retention": "30", # should be set to 60
18201819
},
18211820
}
18221821
k8s.update_config(enable_password_rotation)
@@ -1885,7 +1884,7 @@ def test_password_rotation(self):
18851884
self.assertTrue("environment" in db_user_secret.metadata.annotations, "Added annotation was not propagated to secret")
18861885

18871886
# disable password rotation for all other users (foo_user)
1888-
# and pick smaller intervals to see if the third fake rotation user is dropped
1887+
# and pick smaller intervals to see if the third fake rotation user is dropped
18891888
enable_password_rotation = {
18901889
"data": {
18911890
"enable_password_rotation": "false",
@@ -2385,6 +2384,56 @@ def test_taint_based_eviction(self):
23852384
# toggle pod anti affinity to move replica away from master node
23862385
self.assert_distributed_pods(master_nodes)
23872386

2387+
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
2388+
def test_topology_spread_constraints(self):
2389+
'''
2390+
Enable topologySpreadConstraints for pods
2391+
'''
2392+
k8s = self.k8s
2393+
cluster_labels = "application=spilo,cluster-name=acid-minimal-cluster"
2394+
2395+
# Verify we are in good state from potential previous tests
2396+
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running")
2397+
2398+
master_nodes, replica_nodes = k8s.get_cluster_nodes()
2399+
self.assertNotEqual(master_nodes, [])
2400+
self.assertNotEqual(replica_nodes, [])
2401+
2402+
# Patch label to nodes for topologySpreadConstraints
2403+
patch_node_label = {
2404+
"metadata": {
2405+
"labels": {
2406+
"topology.kubernetes.io/zone": "zalando"
2407+
}
2408+
}
2409+
}
2410+
k8s.api.core_v1.patch_node(master_nodes[0], patch_node_label)
2411+
k8s.api.core_v1.patch_node(replica_nodes[0], patch_node_label)
2412+
2413+
# Scale-out postgresql pods
2414+
k8s.api.custom_objects_api.patch_namespaced_custom_object("acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster",
2415+
{"spec": {"numberOfInstances": 6}})
2416+
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
2417+
self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_labels), 6, "Postgresql StatefulSet are scale to 6")
2418+
self.eventuallyEqual(lambda: k8s.count_running_pods(), 6, "All pods are running")
2419+
2420+
worker_node_1 = 0
2421+
worker_node_2 = 0
2422+
pods = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_labels)
2423+
for pod in pods.items:
2424+
if pod.spec.node_name == 'postgres-operator-e2e-tests-worker':
2425+
worker_node_1 += 1
2426+
elif pod.spec.node_name == 'postgres-operator-e2e-tests-worker2':
2427+
worker_node_2 += 1
2428+
2429+
self.assertEqual(worker_node_1, worker_node_2)
2430+
self.assertEqual(worker_node_1, 3)
2431+
self.assertEqual(worker_node_2, 3)
2432+
2433+
# Scale-it postgresql pods to previous replicas
2434+
k8s.api.custom_objects_api.patch_namespaced_custom_object("acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster",
2435+
{"spec": {"numberOfInstances": 2}})
2436+
23882437
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
23892438
def test_zz_cluster_deletion(self):
23902439
'''
@@ -2460,7 +2509,7 @@ def test_zz_cluster_deletion(self):
24602509
self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted")
24612510
self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted")
24622511
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config")
2463-
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config")
2512+
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 6, "PVCs were deleted although disabled in config")
24642513

24652514
except timeout_decorator.TimeoutError:
24662515
print('Operator log: {}'.format(k8s.get_operator_log()))
@@ -2502,7 +2551,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci
25022551

25032552
# if nodes are different we can quit here
25042553
if master_nodes[0] not in replica_nodes:
2505-
return True
2554+
return True
25062555

25072556
# enable pod anti affintiy in config map which should trigger movement of replica
25082557
patch_enable_antiaffinity = {
@@ -2526,7 +2575,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci
25262575
}
25272576
k8s.update_config(patch_disable_antiaffinity, "disable antiaffinity")
25282577
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
2529-
2578+
25302579
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_labels)
25312580
k8s.wait_for_running_pods(cluster_labels, 2)
25322581

@@ -2537,7 +2586,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci
25372586
# if nodes are different we can quit here
25382587
for target_node in target_nodes:
25392588
if (target_node not in master_nodes or target_node not in replica_nodes) and master_nodes[0] in replica_nodes:
2540-
print('Pods run on the same node')
2589+
print('Pods run on the same node')
25412590
return False
25422591

25432592
except timeout_decorator.TimeoutError:

manifests/postgresql.crd.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,12 @@ spec:
582582
- PreferNoSchedule
583583
tolerationSeconds:
584584
type: integer
585+
topologySpreadConstraints:
586+
type: array
587+
nullable: true
588+
items:
589+
type: object
590+
x-kubernetes-preserve-unknown-fields: true
585591
useLoadBalancer:
586592
type: boolean
587593
description: deprecated

pkg/apis/acid.zalan.do/v1/crds.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -895,6 +895,16 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
895895
},
896896
},
897897
},
898+
"topologySpreadConstraints": {
899+
Type: "array",
900+
Nullable: true,
901+
Items: &apiextv1.JSONSchemaPropsOrArray{
902+
Schema: &apiextv1.JSONSchemaProps{
903+
Type: "object",
904+
XPreserveUnknownFields: util.True(),
905+
},
906+
},
907+
},
898908
"useLoadBalancer": {
899909
Type: "boolean",
900910
Description: "deprecated",

pkg/apis/acid.zalan.do/v1/postgresql_type.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -63,24 +63,25 @@ type PostgresSpec struct {
6363
UsersWithSecretRotation []string `json:"usersWithSecretRotation,omitempty"`
6464
UsersWithInPlaceSecretRotation []string `json:"usersWithInPlaceSecretRotation,omitempty"`
6565

66-
NumberOfInstances int32 `json:"numberOfInstances"`
67-
MaintenanceWindows []MaintenanceWindow `json:"maintenanceWindows,omitempty"`
68-
Clone *CloneDescription `json:"clone,omitempty"`
69-
Databases map[string]string `json:"databases,omitempty"`
70-
PreparedDatabases map[string]PreparedDatabase `json:"preparedDatabases,omitempty"`
71-
SchedulerName *string `json:"schedulerName,omitempty"`
72-
NodeAffinity *v1.NodeAffinity `json:"nodeAffinity,omitempty"`
73-
Tolerations []v1.Toleration `json:"tolerations,omitempty"`
74-
Sidecars []Sidecar `json:"sidecars,omitempty"`
75-
InitContainers []v1.Container `json:"initContainers,omitempty"`
76-
PodPriorityClassName string `json:"podPriorityClassName,omitempty"`
77-
ShmVolume *bool `json:"enableShmVolume,omitempty"`
78-
EnableLogicalBackup bool `json:"enableLogicalBackup,omitempty"`
79-
LogicalBackupRetention string `json:"logicalBackupRetention,omitempty"`
80-
LogicalBackupSchedule string `json:"logicalBackupSchedule,omitempty"`
81-
StandbyCluster *StandbyDescription `json:"standby,omitempty"`
82-
PodAnnotations map[string]string `json:"podAnnotations,omitempty"`
83-
ServiceAnnotations map[string]string `json:"serviceAnnotations,omitempty"`
66+
NumberOfInstances int32 `json:"numberOfInstances"`
67+
MaintenanceWindows []MaintenanceWindow `json:"maintenanceWindows,omitempty"`
68+
Clone *CloneDescription `json:"clone,omitempty"`
69+
Databases map[string]string `json:"databases,omitempty"`
70+
PreparedDatabases map[string]PreparedDatabase `json:"preparedDatabases,omitempty"`
71+
SchedulerName *string `json:"schedulerName,omitempty"`
72+
NodeAffinity *v1.NodeAffinity `json:"nodeAffinity,omitempty"`
73+
TopologySpreadConstraints []v1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"`
74+
Tolerations []v1.Toleration `json:"tolerations,omitempty"`
75+
Sidecars []Sidecar `json:"sidecars,omitempty"`
76+
InitContainers []v1.Container `json:"initContainers,omitempty"`
77+
PodPriorityClassName string `json:"podPriorityClassName,omitempty"`
78+
ShmVolume *bool `json:"enableShmVolume,omitempty"`
79+
EnableLogicalBackup bool `json:"enableLogicalBackup,omitempty"`
80+
LogicalBackupRetention string `json:"logicalBackupRetention,omitempty"`
81+
LogicalBackupSchedule string `json:"logicalBackupSchedule,omitempty"`
82+
StandbyCluster *StandbyDescription `json:"standby,omitempty"`
83+
PodAnnotations map[string]string `json:"podAnnotations,omitempty"`
84+
ServiceAnnotations map[string]string `json:"serviceAnnotations,omitempty"`
8485
// MasterServiceAnnotations takes precedence over ServiceAnnotations for master role if not empty
8586
MasterServiceAnnotations map[string]string `json:"masterServiceAnnotations,omitempty"`
8687
// ReplicaServiceAnnotations takes precedence over ServiceAnnotations for replica role if not empty

pkg/cluster/cluster.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,11 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
499499
needsRollUpdate = true
500500
reasons = append(reasons, "new statefulset's pod affinity does not match the current one")
501501
}
502+
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.TopologySpreadConstraints, statefulSet.Spec.Template.Spec.TopologySpreadConstraints) {
503+
needsReplace = true
504+
needsRollUpdate = true
505+
reasons = append(reasons, "new statefulset's pod topologySpreadConstraints does not match the current one")
506+
}
502507
if len(c.Statefulset.Spec.Template.Spec.Tolerations) != len(statefulSet.Spec.Template.Spec.Tolerations) {
503508
needsReplace = true
504509
needsRollUpdate = true

pkg/cluster/k8sres.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,13 @@ func generatePodAntiAffinity(podAffinityTerm v1.PodAffinityTerm, preferredDuring
604604
return podAntiAffinity
605605
}
606606

607+
func generateTopologySpreadConstraints(labels labels.Set, topologySpreadConstraints []v1.TopologySpreadConstraint) []v1.TopologySpreadConstraint {
608+
for _, topologySpreadConstraint := range topologySpreadConstraints {
609+
topologySpreadConstraint.LabelSelector = &metav1.LabelSelector{MatchLabels: labels}
610+
}
611+
return topologySpreadConstraints
612+
}
613+
607614
func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]string) []v1.Toleration {
608615
// allow to override tolerations by postgresql manifest
609616
if len(*tolerationsSpec) > 0 {
@@ -809,6 +816,7 @@ func (c *Cluster) generatePodTemplate(
809816
initContainers []v1.Container,
810817
sidecarContainers []v1.Container,
811818
sharePgSocketWithSidecars *bool,
819+
topologySpreadConstraintsSpec []v1.TopologySpreadConstraint,
812820
tolerationsSpec *[]v1.Toleration,
813821
spiloRunAsUser *int64,
814822
spiloRunAsGroup *int64,
@@ -878,6 +886,10 @@ func (c *Cluster) generatePodTemplate(
878886
podSpec.PriorityClassName = priorityClassName
879887
}
880888

889+
if len(topologySpreadConstraintsSpec) > 0 {
890+
podSpec.TopologySpreadConstraints = generateTopologySpreadConstraints(labels, topologySpreadConstraintsSpec)
891+
}
892+
881893
if sharePgSocketWithSidecars != nil && *sharePgSocketWithSidecars {
882894
addVarRunVolume(&podSpec)
883895
}
@@ -1466,6 +1478,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
14661478
initContainers,
14671479
sidecarContainers,
14681480
c.OpConfig.SharePgSocketWithSidecars,
1481+
spec.TopologySpreadConstraints,
14691482
&tolerationSpec,
14701483
effectiveRunAsUser,
14711484
effectiveRunAsGroup,
@@ -2353,6 +2366,7 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1.CronJob, error) {
23532366
[]v1.Container{},
23542367
[]v1.Container{},
23552368
util.False(),
2369+
[]v1.TopologySpreadConstraint{},
23562370
&tolerationsSpec,
23572371
nil,
23582372
nil,

pkg/cluster/k8sres_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3984,3 +3984,46 @@ func TestGenerateCapabilities(t *testing.T) {
39843984
}
39853985
}
39863986
}
3987+
3988+
func TestTopologySpreadConstraints(t *testing.T) {
3989+
clusterName := "acid-test-cluster"
3990+
namespace := "default"
3991+
3992+
pg := acidv1.Postgresql{
3993+
ObjectMeta: metav1.ObjectMeta{
3994+
Name: clusterName,
3995+
Namespace: namespace,
3996+
},
3997+
Spec: acidv1.PostgresSpec{
3998+
NumberOfInstances: 1,
3999+
Resources: &acidv1.Resources{
4000+
ResourceRequests: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")},
4001+
ResourceLimits: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")},
4002+
},
4003+
Volume: acidv1.Volume{
4004+
Size: "1G",
4005+
},
4006+
},
4007+
}
4008+
4009+
cluster := New(
4010+
Config{
4011+
OpConfig: config.Config{
4012+
PodManagementPolicy: "ordered_ready",
4013+
},
4014+
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
4015+
cluster.Name = clusterName
4016+
cluster.Namespace = namespace
4017+
cluster.labelsSet(true)
4018+
4019+
s, err := cluster.generateStatefulSet(&pg.Spec)
4020+
assert.NoError(t, err)
4021+
assert.Contains(t, s.Spec.Template.Spec.TopologySpreadConstraints, v1.TopologySpreadConstraint{
4022+
MaxSkew: int32(1),
4023+
TopologyKey: "topology.kubernetes.io/zone",
4024+
WhenUnsatisfiable: v1.DoNotSchedule,
4025+
LabelSelector: &metav1.LabelSelector{
4026+
MatchLabels: cluster.labelsSet(true),
4027+
},
4028+
})
4029+
}

0 commit comments

Comments
 (0)