Skip to content
Merged
100 changes: 97 additions & 3 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,12 +395,13 @@ def test_config_update(self):
"spec": {
"postgresql": {
"parameters": {
"max_connections": new_max_connections_value
"max_connections": new_max_connections_value,
"wal_level": "logical"
}
},
"patroni": {
"patroni": {
"slots": {
"test_slot": {
"first_slot": {
"type": "physical"
}
},
Expand Down Expand Up @@ -437,6 +438,8 @@ def compare_config():
"synchronous_mode not updated")
self.assertEqual(desired_config["failsafe_mode"], effective_config["failsafe_mode"],
"failsafe_mode not updated")
self.assertEqual(desired_config["slots"], effective_config["slots"],
"slots not updated")
return True

# check if Patroni config has been updated
Expand Down Expand Up @@ -497,6 +500,97 @@ def compare_config():
self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value,
"Previous max_connections setting not applied on replica", 10, 5)

# patch new slot via Patroni REST
patroni_slot = "test_patroni_slot"
patch_slot_command = """curl -s -XPATCH -d '{"slots": {"test_patroni_slot": {"type": "physical"}}}' localhost:8008/config"""
pg_patch_config["spec"]["patroni"]["slots"][patroni_slot] = {"type": "physical"}

k8s.exec_with_kubectl(leader.metadata.name, patch_slot_command)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyTrue(compare_config, "Postgres config not applied")

# test adding new slots
pg_add_new_slots_patch = {
"spec": {
"patroni": {
"slots": {
"test_slot": {
"type": "logical",
"database": "foo",
"plugin": "pgoutput"
},
"test_slot_2": {
"type": "physical"
}
}
}
}
}

for slot_name, slot_details in pg_add_new_slots_patch["spec"]["patroni"]["slots"].items():
pg_patch_config["spec"]["patroni"]["slots"][slot_name] = slot_details

k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_add_new_slots_patch)

self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyTrue(compare_config, "Postgres config not applied")

# delete test_slot_2 from config and change the database type for test_slot
slot_to_change = "test_slot"
slot_to_remove = "test_slot_2"
pg_delete_slot_patch = {
"spec": {
"patroni": {
"slots": {
"test_slot": {
"type": "logical",
"database": "bar",
"plugin": "pgoutput"
},
"test_slot_2": None
}
}
}
}

pg_patch_config["spec"]["patroni"]["slots"][slot_to_change]["database"] = "bar"
del pg_patch_config["spec"]["patroni"]["slots"][slot_to_remove]

k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_delete_slot_patch)

self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyTrue(compare_config, "Postgres config not applied")

deleted_slot_query = """
SELECT slot_name
FROM pg_replication_slots
WHERE slot_name = '%s';
""" % (slot_to_remove)
Copy link
Member

@FxKu FxKu Dec 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the slot query not be defined once so we do formatting inside the assertions (self.eventuallyEqual)


self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", deleted_slot_query)), 0,
"The replication slot cannot be deleted", 10, 5)

changed_slot_query = """
SELECT database
FROM pg_replication_slots
WHERE slot_name = '%s';
""" % (slot_to_change)

self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", changed_slot_query)[0], "bar",
"The replication slot cannot be updated", 10, 5)

# make sure slot from Patroni didn't get deleted
patroni_slot_query = """
SELECT slot_name
FROM pg_replication_slots
WHERE slot_name = '%s';
""" % (patroni_slot)

self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", patroni_slot_query)), 1,
"The replication slot from Patroni gets deleted", 10, 5)

except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log()))
raise
Expand Down
6 changes: 6 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Cluster struct {
userSyncStrategy spec.UserSyncer
deleteOptions metav1.DeleteOptions
podEventsQueue *cache.FIFO
replicationSlots map[string]interface{}

teamsAPIClient teams.Interface
oauthTokenGetter OAuthTokenGetter
Expand Down Expand Up @@ -141,6 +142,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
podEventsQueue: podEventsQueue,
KubeClient: kubeClient,
currentMajorVersion: 0,
replicationSlots: make(map[string]interface{}),
}
cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName())
cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger)
Expand Down Expand Up @@ -375,6 +377,10 @@ func (c *Cluster) Create() error {
}
}

for slotName, desiredSlot := range c.Spec.Patroni.Slots {
c.replicationSlots[slotName] = desiredSlot
}

return nil
}

Expand Down
15 changes: 13 additions & 2 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,15 +579,26 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv
}
}

slotsToSet := make(map[string]interface{})
// check if there is any slot deletion
for slotName, effectiveSlot := range c.replicationSlots {
if desiredSlot, exists := desiredPatroniConfig.Slots[slotName]; exists {
if reflect.DeepEqual(effectiveSlot, desiredSlot) {
continue
}
}
slotsToSet[slotName] = nil
delete(c.replicationSlots, slotName)
}
// check if specified slots exist in config and if they differ
slotsToSet := make(map[string]map[string]string)
for slotName, desiredSlot := range desiredPatroniConfig.Slots {
if effectiveSlot, exists := effectivePatroniConfig.Slots[slotName]; exists {
if reflect.DeepEqual(desiredSlot, effectiveSlot) {
continue
}
}
slotsToSet[slotName] = desiredSlot
c.replicationSlots[slotName] = desiredSlot
}
if len(slotsToSet) > 0 {
configToSet["slots"] = slotsToSet
Expand All @@ -614,7 +625,7 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv
}

// check if there exist only config updates that require a restart of the primary
if !util.SliceContains(restartPrimary, false) && len(configToSet) == 0 {
if len(restartPrimary) > 0 && !util.SliceContains(restartPrimary, false) && len(configToSet) == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there's nothing to set requiresMasterRestart would become true here which is wrong

requiresMasterRestart = true
}

Expand Down
131 changes: 121 additions & 10 deletions pkg/cluster/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
client, _ := newFakeK8sSyncClient()
clusterName := "acid-test-cluster"
namespace := "default"
testSlots := map[string]map[string]string{
"slot1": {
"type": "logical",
"plugin": "wal2json",
"database": "foo",
},
}

ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -208,11 +215,26 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {

// simulate existing config that differs from cluster.Spec
tests := []struct {
subtest string
patroni acidv1.Patroni
pgParams map[string]string
restartPrimary bool
subtest string
patroni acidv1.Patroni
desiredSlots map[string]map[string]string
removedSlots map[string]map[string]string
pgParams map[string]string
shouldBePatched bool
restartPrimary bool
}{
{
subtest: "Patroni and Postgresql.Parameters do not differ",
patroni: acidv1.Patroni{
TTL: 20,
},
pgParams: map[string]string{
"log_min_duration_statement": "200",
"max_connections": "50",
},
shouldBePatched: false,
restartPrimary: false,
},
{
subtest: "Patroni and Postgresql.Parameters differ - restart replica first",
patroni: acidv1.Patroni{
Expand All @@ -222,7 +244,8 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
"log_min_duration_statement": "500", // desired 200
"max_connections": "100", // desired 50
},
restartPrimary: false,
shouldBePatched: true,
restartPrimary: false,
},
{
subtest: "multiple Postgresql.Parameters differ - restart replica first",
Expand All @@ -231,7 +254,8 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
"log_min_duration_statement": "500", // desired 200
"max_connections": "100", // desired 50
},
restartPrimary: false,
shouldBePatched: true,
restartPrimary: false,
},
{
subtest: "desired max_connections bigger - restart replica first",
Expand All @@ -240,7 +264,8 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
"log_min_duration_statement": "200",
"max_connections": "30", // desired 50
},
restartPrimary: false,
shouldBePatched: true,
restartPrimary: false,
},
{
subtest: "desired max_connections smaller - restart master first",
Expand All @@ -249,19 +274,105 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
"log_min_duration_statement": "200",
"max_connections": "100", // desired 50
},
restartPrimary: true,
shouldBePatched: true,
restartPrimary: true,
},
{
subtest: "slot does not exist but is desired",
patroni: acidv1.Patroni{
TTL: 20,
},
desiredSlots: testSlots,
pgParams: map[string]string{
"log_min_duration_statement": "200",
"max_connections": "50",
},
shouldBePatched: true,
restartPrimary: false,
},
{
subtest: "slot exist, nothing specified in manifest",
patroni: acidv1.Patroni{
TTL: 20,
Slots: map[string]map[string]string{
"slot1": {
"type": "logical",
"plugin": "pgoutput",
"database": "foo",
},
},
},
pgParams: map[string]string{
"log_min_duration_statement": "200",
"max_connections": "50",
},
shouldBePatched: false,
restartPrimary: false,
},
{
subtest: "slot is removed from manifest",
patroni: acidv1.Patroni{
TTL: 20,
Slots: map[string]map[string]string{
"slot1": {
"type": "logical",
"plugin": "pgoutput",
"database": "foo",
},
},
},
removedSlots: testSlots,
pgParams: map[string]string{
"log_min_duration_statement": "200",
"max_connections": "50",
},
shouldBePatched: true,
restartPrimary: false,
},
{
subtest: "slot plugin differs",
patroni: acidv1.Patroni{
TTL: 20,
Slots: map[string]map[string]string{
"slot1": {
"type": "logical",
"plugin": "pgoutput",
"database": "foo",
},
},
},
desiredSlots: testSlots,
pgParams: map[string]string{
"log_min_duration_statement": "200",
"max_connections": "50",
},
shouldBePatched: true,
restartPrimary: false,
},
}

for _, tt := range tests {
if len(tt.desiredSlots) > 0 {
cluster.Spec.Patroni.Slots = tt.desiredSlots
}
if len(tt.removedSlots) > 0 {
for slotName, removedSlot := range tt.removedSlots {
cluster.replicationSlots[slotName] = removedSlot
}
}

configPatched, requirePrimaryRestart, err := cluster.checkAndSetGlobalPostgreSQLConfiguration(mockPod, tt.patroni, cluster.Spec.Patroni, tt.pgParams, cluster.Spec.Parameters)
assert.NoError(t, err)
if configPatched != true {
if configPatched != tt.shouldBePatched {
t.Errorf("%s - %s: expected config update did not happen", testName, tt.subtest)
}
if requirePrimaryRestart != tt.restartPrimary {
t.Errorf("%s - %s: wrong master restart strategy, got restart %v, expected restart %v", testName, tt.subtest, requirePrimaryRestart, tt.restartPrimary)
}

// reset slots for next tests
cluster.Spec.Patroni.Slots = nil
cluster.replicationSlots = make(map[string]interface{})
}

testsFailsafe := []struct {
Expand Down Expand Up @@ -342,7 +453,7 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
effectiveVal: util.True(),
desiredVal: true,
shouldBePatched: false, // should not require patching
restartPrimary: true,
restartPrimary: false,
},
}

Expand Down