From 5a4bad47fc14229269827101739048ff37144b90 Mon Sep 17 00:00:00 2001 From: idanovinda Date: Mon, 24 Oct 2022 16:32:07 +0200 Subject: [PATCH 01/14] Allow drop slots when it gets deleted from the manifest --- pkg/cluster/sync.go | 39 +++++++++++++++++++++++++++++ pkg/util/patroni/patroni.go | 50 +++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 76c9fd12a..1b64de72d 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -541,6 +541,45 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv configPatched := false requiresMasterRestart := false + for slotName, _ := range effectivePatroniConfig.Slots { + if _, exists := desiredPatroniConfig.Slots[slotName]; exists { + continue + } + + configToRewrite := make(map[string]interface{}) + configToRewrite["loop_wait"] = effectivePatroniConfig.LoopWait + configToRewrite["maximum_lag_on_failover"] = effectivePatroniConfig.MaximumLagOnFailover + configToRewrite["pg_hba"] = effectivePatroniConfig.PgHba + configToRewrite["retry_timeout"] = effectivePatroniConfig.RetryTimeout + configToRewrite["synchronous_mode"] = effectivePatroniConfig.SynchronousMode + configToRewrite["synchronous_mode_strict"] = effectivePatroniConfig.SynchronousModeStrict + configToRewrite["ttl"] = effectivePatroniConfig.TTL + configToRewrite["postgresql"] = map[string]interface{}{constants.PatroniPGParametersParameterName: effectivePgParameters} + + slotsToRewrite := make(map[string]map[string]string) + for slotName, desiredSlot := range desiredPatroniConfig.Slots { + slotsToRewrite[slotName] = desiredSlot + } + + if len(slotsToRewrite) > 0 { + configToRewrite["slots"] = slotsToRewrite + } + + configToRewriteJson, err := json.Marshal(configToRewrite) + if err != nil { + c.logger.Debugf("could not convert config rewrite to JSON: %v", err) + } + + podName := util.NameFromMeta(pod.ObjectMeta) + c.logger.Debugf("rewrite Postgres config via Patroni API on pod %s with following options: %s", + podName, configToRewriteJson) + if err = c.patroni.RewriteConfig(pod, configToRewrite); err != nil { + return configPatched, requiresMasterRestart, fmt.Errorf("could not rewrite postgres parameters within pod %s: %v", podName, err) + } + + break + } + // compare effective and desired Patroni config options if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != effectivePatroniConfig.LoopWait { configToSet["loop_wait"] = desiredPatroniConfig.LoopWait diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index 7f8f63374..c1f9367e3 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -38,6 +38,7 @@ type Interface interface { Restart(server *v1.Pod) error GetConfig(server *v1.Pod) (acidv1.Patroni, map[string]string, error) SetConfig(server *v1.Pod, config map[string]interface{}) error + RewriteConfig(server *v1.Pod, config map[string]interface{}) error } // Patroni API client @@ -113,6 +114,42 @@ func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer) return nil } +func (p *Patroni) httpPut(method string, url string, body *bytes.Buffer) (err error) { + request, err := http.NewRequest(method, url, body) + if err != nil { + return fmt.Errorf("could not create request: %v", err) + } + + if p.logger != nil { + p.logger.Debugf("making %s http request: %s", method, request.URL.String()) + } + + resp, err := p.httpClient.Do(request) + if err != nil { + return fmt.Errorf("could not make request: %v", err) + } + defer func() { + if err2 := resp.Body.Close(); err2 != nil { + if err != nil { + err = fmt.Errorf("could not close request: %v, prior error: %v", err2, err) + } else { + err = fmt.Errorf("could not close request: %v", err2) + } + return + } + }() + + if resp.StatusCode != http.StatusOK { + bodyBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("could not read response: %v", err) + } + + return fmt.Errorf("patroni returned '%s'", string(bodyBytes)) + } + return nil +} + func (p *Patroni) httpGet(url string) (string, error) { p.logger.Debugf("making GET http request: %s", url) @@ -178,6 +215,19 @@ func (p *Patroni) SetConfig(server *v1.Pod, config map[string]interface{}) error return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf) } +func (p *Patroni) RewriteConfig(server *v1.Pod, config map[string]interface{}) error { + buf := &bytes.Buffer{} + err := json.NewEncoder(buf).Encode(config) + if err != nil { + return fmt.Errorf("could not encode json: %v", err) + } + apiURLString, err := apiURL(server) + if err != nil { + return err + } + return p.httpPut(http.MethodPut, apiURLString+configPath, buf) +} + // ClusterMembers array of cluster members from Patroni API type ClusterMembers struct { Members []ClusterMember `json:"members"` From 267a1613142e0809857153a13a0254cc92376f2d Mon Sep 17 00:00:00 2001 From: idanovinda Date: Fri, 28 Oct 2022 15:44:04 +0200 Subject: [PATCH 02/14] remove slot using patch --- pkg/cluster/sync.go | 50 ++++++++----------------------------- pkg/util/patroni/patroni.go | 50 ------------------------------------- 2 files changed, 10 insertions(+), 90 deletions(-) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 1b64de72d..5b6f4b4e1 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -541,45 +541,6 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv configPatched := false requiresMasterRestart := false - for slotName, _ := range effectivePatroniConfig.Slots { - if _, exists := desiredPatroniConfig.Slots[slotName]; exists { - continue - } - - configToRewrite := make(map[string]interface{}) - configToRewrite["loop_wait"] = effectivePatroniConfig.LoopWait - configToRewrite["maximum_lag_on_failover"] = effectivePatroniConfig.MaximumLagOnFailover - configToRewrite["pg_hba"] = effectivePatroniConfig.PgHba - configToRewrite["retry_timeout"] = effectivePatroniConfig.RetryTimeout - configToRewrite["synchronous_mode"] = effectivePatroniConfig.SynchronousMode - configToRewrite["synchronous_mode_strict"] = effectivePatroniConfig.SynchronousModeStrict - configToRewrite["ttl"] = effectivePatroniConfig.TTL - configToRewrite["postgresql"] = map[string]interface{}{constants.PatroniPGParametersParameterName: effectivePgParameters} - - slotsToRewrite := make(map[string]map[string]string) - for slotName, desiredSlot := range desiredPatroniConfig.Slots { - slotsToRewrite[slotName] = desiredSlot - } - - if len(slotsToRewrite) > 0 { - configToRewrite["slots"] = slotsToRewrite - } - - configToRewriteJson, err := json.Marshal(configToRewrite) - if err != nil { - c.logger.Debugf("could not convert config rewrite to JSON: %v", err) - } - - podName := util.NameFromMeta(pod.ObjectMeta) - c.logger.Debugf("rewrite Postgres config via Patroni API on pod %s with following options: %s", - podName, configToRewriteJson) - if err = c.patroni.RewriteConfig(pod, configToRewrite); err != nil { - return configPatched, requiresMasterRestart, fmt.Errorf("could not rewrite postgres parameters within pod %s: %v", podName, err) - } - - break - } - // compare effective and desired Patroni config options if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != effectivePatroniConfig.LoopWait { configToSet["loop_wait"] = desiredPatroniConfig.LoopWait @@ -618,8 +579,8 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv } } + slotsToSet := make(map[string]interface{}) // check if specified slots exist in config and if they differ - slotsToSet := make(map[string]map[string]string) for slotName, desiredSlot := range desiredPatroniConfig.Slots { if effectiveSlot, exists := effectivePatroniConfig.Slots[slotName]; exists { if reflect.DeepEqual(desiredSlot, effectiveSlot) { @@ -628,6 +589,15 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv } slotsToSet[slotName] = desiredSlot } + // check if there is any slot deletion + for slotName, effectiveSlot := range effectivePatroniConfig.Slots { + if desiredSlot, exists := desiredPatroniConfig.Slots[slotName]; exists { + if reflect.DeepEqual(effectiveSlot, desiredSlot) { + continue + } + } + slotsToSet[slotName] = nil + } if len(slotsToSet) > 0 { configToSet["slots"] = slotsToSet } diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index c1f9367e3..7f8f63374 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -38,7 +38,6 @@ type Interface interface { Restart(server *v1.Pod) error GetConfig(server *v1.Pod) (acidv1.Patroni, map[string]string, error) SetConfig(server *v1.Pod, config map[string]interface{}) error - RewriteConfig(server *v1.Pod, config map[string]interface{}) error } // Patroni API client @@ -114,42 +113,6 @@ func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer) return nil } -func (p *Patroni) httpPut(method string, url string, body *bytes.Buffer) (err error) { - request, err := http.NewRequest(method, url, body) - if err != nil { - return fmt.Errorf("could not create request: %v", err) - } - - if p.logger != nil { - p.logger.Debugf("making %s http request: %s", method, request.URL.String()) - } - - resp, err := p.httpClient.Do(request) - if err != nil { - return fmt.Errorf("could not make request: %v", err) - } - defer func() { - if err2 := resp.Body.Close(); err2 != nil { - if err != nil { - err = fmt.Errorf("could not close request: %v, prior error: %v", err2, err) - } else { - err = fmt.Errorf("could not close request: %v", err2) - } - return - } - }() - - if resp.StatusCode != http.StatusOK { - bodyBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("could not read response: %v", err) - } - - return fmt.Errorf("patroni returned '%s'", string(bodyBytes)) - } - return nil -} - func (p *Patroni) httpGet(url string) (string, error) { p.logger.Debugf("making GET http request: %s", url) @@ -215,19 +178,6 @@ func (p *Patroni) SetConfig(server *v1.Pod, config map[string]interface{}) error return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf) } -func (p *Patroni) RewriteConfig(server *v1.Pod, config map[string]interface{}) error { - buf := &bytes.Buffer{} - err := json.NewEncoder(buf).Encode(config) - if err != nil { - return fmt.Errorf("could not encode json: %v", err) - } - apiURLString, err := apiURL(server) - if err != nil { - return err - } - return p.httpPut(http.MethodPut, apiURLString+configPath, buf) -} - // ClusterMembers array of cluster members from Patroni API type ClusterMembers struct { Members []ClusterMember `json:"members"` From 7370258b63d50b3d15dd142659b72096aca66079 Mon Sep 17 00:00:00 2001 From: idanovinda Date: Mon, 31 Oct 2022 14:01:34 +0100 Subject: [PATCH 03/14] swap deletion and insertion order --- pkg/cluster/sync.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 5b6f4b4e1..4cb273f9e 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -580,15 +580,6 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv } slotsToSet := make(map[string]interface{}) - // check if specified slots exist in config and if they differ - for slotName, desiredSlot := range desiredPatroniConfig.Slots { - if effectiveSlot, exists := effectivePatroniConfig.Slots[slotName]; exists { - if reflect.DeepEqual(desiredSlot, effectiveSlot) { - continue - } - } - slotsToSet[slotName] = desiredSlot - } // check if there is any slot deletion for slotName, effectiveSlot := range effectivePatroniConfig.Slots { if desiredSlot, exists := desiredPatroniConfig.Slots[slotName]; exists { @@ -598,6 +589,15 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv } slotsToSet[slotName] = nil } + // check if specified slots exist in config and if they differ + for slotName, desiredSlot := range desiredPatroniConfig.Slots { + if effectiveSlot, exists := effectivePatroniConfig.Slots[slotName]; exists { + if reflect.DeepEqual(desiredSlot, effectiveSlot) { + continue + } + } + slotsToSet[slotName] = desiredSlot + } if len(slotsToSet) > 0 { configToSet["slots"] = slotsToSet } From 29d09d977cf67a8eaecc338d61cc9d4182ecf332 Mon Sep 17 00:00:00 2001 From: idanovinda Date: Mon, 31 Oct 2022 14:02:22 +0100 Subject: [PATCH 04/14] add e2e slot removal --- e2e/tests/test_e2e.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 638cd05b2..1b95b6532 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -402,6 +402,9 @@ def test_config_update(self): "slots": { "test_slot": { "type": "physical" + }, + "test_slot_2": { + "type": "physical" } }, "ttl": 29, @@ -497,6 +500,34 @@ def compare_config(): self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, "Previous max_connections setting not applied on replica", 10, 5) + # delete test_slot_2 from config + slot_to_remove = "test_slot_2" + pg_patch_slots = { + "spec": { + "patroni": { + "slots": { + "test_slot": { + "type": "physical" + } + } + } + } + } + + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_slots) + + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + deleted_slot_query = """ + SELECT count(*) + FROM pg_replication_slots + WHERE slot_name = '%s'; + """ % (slot_to_remove) + + self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", deleted_slot_query)[0], 0, + "The replication slot cannot be deleted") + except timeout_decorator.TimeoutError: print('Operator log: {}'.format(k8s.get_operator_log())) raise From 530a36c690367660cfe2d04e0bf110ead109c0c0 Mon Sep 17 00:00:00 2001 From: idanovinda Date: Mon, 31 Oct 2022 14:09:56 +0100 Subject: [PATCH 05/14] add retries and interval --- e2e/tests/test_e2e.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 1b95b6532..3e23b5010 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -526,7 +526,7 @@ def compare_config(): """ % (slot_to_remove) self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", deleted_slot_query)[0], 0, - "The replication slot cannot be deleted") + "The replication slot cannot be deleted", 10, 5) except timeout_decorator.TimeoutError: print('Operator log: {}'.format(k8s.get_operator_log())) From 2feeb657f3249baf7e549da2c087279143070962 Mon Sep 17 00:00:00 2001 From: idanovinda Date: Mon, 31 Oct 2022 14:28:48 +0100 Subject: [PATCH 06/14] update query and assertion --- e2e/tests/test_e2e.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 3e23b5010..3a82d959b 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -520,12 +520,12 @@ def compare_config(): self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") deleted_slot_query = """ - SELECT count(*) + SELECT slot_name FROM pg_replication_slots WHERE slot_name = '%s'; """ % (slot_to_remove) - self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", deleted_slot_query)[0], 0, + self.eventuallyEqual(lambda: len(self.query_database(replica.metadata.name, "postgres", deleted_slot_query)), 0, "The replication slot cannot be deleted", 10, 5) except timeout_decorator.TimeoutError: From 5ae08d754bee194e25005ea92367b56fad424a74 Mon Sep 17 00:00:00 2001 From: idanovinda Date: Fri, 4 Nov 2022 17:33:41 +0100 Subject: [PATCH 07/14] apply feedback --- pkg/cluster/cluster.go | 8 ++++++++ pkg/cluster/sync.go | 4 +++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 812965854..4ac4b3a1f 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -84,6 +84,7 @@ type Cluster struct { userSyncStrategy spec.UserSyncer deleteOptions metav1.DeleteOptions podEventsQueue *cache.FIFO + replicationSlots map[string]interface{} teamsAPIClient teams.Interface oauthTokenGetter OAuthTokenGetter @@ -141,6 +142,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres podEventsQueue: podEventsQueue, KubeClient: kubeClient, currentMajorVersion: 0, + replicationSlots: make(map[string]interface{}), } cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName()) cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger) @@ -375,6 +377,12 @@ func (c *Cluster) Create() error { } } + if len(c.Spec.Patroni.Slots) > 0 { + for slotName, desiredSlot := range c.Spec.Patroni.Slots { + c.replicationSlots[slotName] = desiredSlot + } + } + return nil } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 4cb273f9e..7a7d16a81 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -581,13 +581,14 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv slotsToSet := make(map[string]interface{}) // check if there is any slot deletion - for slotName, effectiveSlot := range effectivePatroniConfig.Slots { + for slotName, effectiveSlot := range c.replicationSlots { if desiredSlot, exists := desiredPatroniConfig.Slots[slotName]; exists { if reflect.DeepEqual(effectiveSlot, desiredSlot) { continue } } slotsToSet[slotName] = nil + delete(c.replicationSlots, slotName) } // check if specified slots exist in config and if they differ for slotName, desiredSlot := range desiredPatroniConfig.Slots { @@ -597,6 +598,7 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv } } slotsToSet[slotName] = desiredSlot + c.replicationSlots[slotName] = desiredSlot } if len(slotsToSet) > 0 { configToSet["slots"] = slotsToSet From 86dfc8c20c973d7bce588004b534729003b8b849 Mon Sep 17 00:00:00 2001 From: idanovinda Date: Wed, 14 Dec 2022 11:44:26 +0100 Subject: [PATCH 08/14] reflect feedback --- e2e/tests/test_e2e.py | 48 +++++++++++++++++++++++++++---- pkg/cluster/cluster.go | 6 ++-- pkg/cluster/sync_test.go | 61 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 105 insertions(+), 10 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 3a82d959b..863f2e879 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -395,13 +395,22 @@ def test_config_update(self): "spec": { "postgresql": { "parameters": { - "max_connections": new_max_connections_value + "max_connections": new_max_connections_value, + "wal_level": "logical" } }, - "patroni": { + "users": { + "test_user": [] + }, + "databases": { + "foo": "test_user" + }, + "patroni": { "slots": { "test_slot": { - "type": "physical" + "type": "logical", + "database": "foo", + "plugin": "pgoutput" }, "test_slot_2": { "type": "physical" @@ -500,14 +509,22 @@ def compare_config(): self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, "Previous max_connections setting not applied on replica", 10, 5) - # delete test_slot_2 from config + # patch new slot via Patroni REST + patroni_slot = "test_patroni_slot" + patch_slot_command = """curl -s -XPATCH -d '{"slots": {"test_patroni_slot": {"type": "physical"}}}' http://localhost:8008/config""" + k8s.exec_with_kubectl(replica.metadata.name, patch_slot_command) + + # delete test_slot_2 from config and change the plugin type for test_slot slot_to_remove = "test_slot_2" + slot_to_change = "test_slot" pg_patch_slots = { "spec": { "patroni": { "slots": { "test_slot": { - "type": "physical" + "type": "logical", + "database": "foo", + "plugin": "wal2json" } } } @@ -526,7 +543,26 @@ def compare_config(): """ % (slot_to_remove) self.eventuallyEqual(lambda: len(self.query_database(replica.metadata.name, "postgres", deleted_slot_query)), 0, - "The replication slot cannot be deleted", 10, 5) + "The replication slot cannot be deleted", 10, 5) + + changed_slot_query = """ + SELECT plugin + FROM pg_replication_slots + WHERE slot_name = '%s'; + """ % (slot_to_change) + + self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", changed_slot_query)[0], "wal2json", + "The replication slot cannot be updated", 10, 5) + + # make sure slot from Patroni didn't get deleted + patroni_slot_query = """ + SELECT slot_name + FROM pg_replication_slots + WHERE slot_name = '%s'; + """ % (patroni_slot) + + self.eventuallyEqual(lambda: len(self.query_database(replica.metadata.name, "postgres", patroni_slot_query)), 1, + "The replication slot from Patroni gets deleted", 10, 5) except timeout_decorator.TimeoutError: print('Operator log: {}'.format(k8s.get_operator_log())) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 4ac4b3a1f..e93b0cc3d 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -377,10 +377,8 @@ func (c *Cluster) Create() error { } } - if len(c.Spec.Patroni.Slots) > 0 { - for slotName, desiredSlot := range c.Spec.Patroni.Slots { - c.replicationSlots[slotName] = desiredSlot - } + for slotName, desiredSlot := range c.Spec.Patroni.Slots { + c.replicationSlots[slotName] = desiredSlot } return nil diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index 4d50b791f..99fc61301 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -144,6 +144,13 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { client, _ := newFakeK8sSyncClient() clusterName := "acid-test-cluster" namespace := "default" + testSlots := map[string]map[string]string{ + "slot1": { + "type": "logical", + "plugin": "wal2json", + "database": "foo", + }, + } ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -210,6 +217,7 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { tests := []struct { subtest string patroni acidv1.Patroni + slots map[string]map[string]string pgParams map[string]string restartPrimary bool }{ @@ -251,9 +259,62 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { }, restartPrimary: true, }, + { + subtest: "slot does not exist but is desired", + patroni: acidv1.Patroni{ + TTL: 20, + }, + slots: testSlots, + pgParams: map[string]string{ + "log_min_duration_statement": "200", + "max_connections": "50", + }, + restartPrimary: false, + }, + { + subtest: "slot exist, nothing specified in manifest", + patroni: acidv1.Patroni{ + TTL: 20, + Slots: map[string]map[string]string{ + "slot1": { + "type": "logical", + "plugin": "pgoutput", + "database": "foo", + }, + }, + }, + pgParams: map[string]string{ + "log_min_duration_statement": "200", + "max_connections": "50", + }, + restartPrimary: false, + }, + { + subtest: "slot plugin differs", + patroni: acidv1.Patroni{ + TTL: 20, + Slots: map[string]map[string]string{ + "slot1": { + "type": "logical", + "plugin": "pgoutput", + "database": "foo", + }, + }, + }, + slots: testSlots, + pgParams: map[string]string{ + "log_min_duration_statement": "200", + "max_connections": "50", + }, + restartPrimary: false, + }, } for _, tt := range tests { + if tt.slots != nil { + cluster.Spec.Patroni.Slots = tt.slots + } + configPatched, requirePrimaryRestart, err := cluster.checkAndSetGlobalPostgreSQLConfiguration(mockPod, tt.patroni, cluster.Spec.Patroni, tt.pgParams, cluster.Spec.Parameters) assert.NoError(t, err) if configPatched != true { From 8c0c0ca0690b6c01a8c0e45da6192dc4f1ec73e0 Mon Sep 17 00:00:00 2001 From: idanovinda Date: Fri, 16 Dec 2022 14:40:05 +0100 Subject: [PATCH 09/14] use leader instead replica to query slots --- e2e/tests/test_e2e.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 863f2e879..cee2e54b9 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -511,12 +511,15 @@ def compare_config(): # patch new slot via Patroni REST patroni_slot = "test_patroni_slot" - patch_slot_command = """curl -s -XPATCH -d '{"slots": {"test_patroni_slot": {"type": "physical"}}}' http://localhost:8008/config""" - k8s.exec_with_kubectl(replica.metadata.name, patch_slot_command) + patch_slot_command = """curl -s -XPATCH -d '{"slots": {"test_patroni_slot": {"type": "physical"}}}' localhost:8008/config""" + k8s.exec_with_kubectl(leader.metadata.name, patch_slot_command) + + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + self.eventuallyTrue(compare_config, "Postgres config not applied") # delete test_slot_2 from config and change the plugin type for test_slot - slot_to_remove = "test_slot_2" slot_to_change = "test_slot" + slot_to_remove = "test_slot_2" pg_patch_slots = { "spec": { "patroni": { @@ -535,14 +538,15 @@ def compare_config(): "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_slots) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + self.eventuallyTrue(compare_config, "Postgres config not applied") deleted_slot_query = """ SELECT slot_name FROM pg_replication_slots WHERE slot_name = '%s'; """ % (slot_to_remove) - - self.eventuallyEqual(lambda: len(self.query_database(replica.metadata.name, "postgres", deleted_slot_query)), 0, + + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", deleted_slot_query)), 0, "The replication slot cannot be deleted", 10, 5) changed_slot_query = """ @@ -551,7 +555,7 @@ def compare_config(): WHERE slot_name = '%s'; """ % (slot_to_change) - self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", changed_slot_query)[0], "wal2json", + self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", changed_slot_query)[0], "wal2json", "The replication slot cannot be updated", 10, 5) # make sure slot from Patroni didn't get deleted @@ -561,7 +565,7 @@ def compare_config(): WHERE slot_name = '%s'; """ % (patroni_slot) - self.eventuallyEqual(lambda: len(self.query_database(replica.metadata.name, "postgres", patroni_slot_query)), 1, + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", patroni_slot_query)), 1, "The replication slot from Patroni gets deleted", 10, 5) except timeout_decorator.TimeoutError: From 21ba57faadc096535f375253a4821f9bd4d5287a Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 16 Dec 2022 18:05:26 +0100 Subject: [PATCH 10/14] fix and extend unit tests for config update checks --- pkg/cluster/sync.go | 2 +- pkg/cluster/sync_test.go | 86 +++++++++++++++++++++++++++++++--------- 2 files changed, 69 insertions(+), 19 deletions(-) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 7a7d16a81..af85eb076 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -625,7 +625,7 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv } // check if there exist only config updates that require a restart of the primary - if !util.SliceContains(restartPrimary, false) && len(configToSet) == 0 { + if len(restartPrimary) > 0 && !util.SliceContains(restartPrimary, false) && len(configToSet) == 0 { requiresMasterRestart = true } diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index 99fc61301..3cd3d3f28 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -215,12 +215,26 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { // simulate existing config that differs from cluster.Spec tests := []struct { - subtest string - patroni acidv1.Patroni - slots map[string]map[string]string - pgParams map[string]string - restartPrimary bool + subtest string + patroni acidv1.Patroni + desiredSlots map[string]map[string]string + removedSlots map[string]map[string]string + pgParams map[string]string + shouldBePatched bool + restartPrimary bool }{ + { + subtest: "Patroni and Postgresql.Parameters do not differ", + patroni: acidv1.Patroni{ + TTL: 20, + }, + pgParams: map[string]string{ + "log_min_duration_statement": "200", + "max_connections": "50", + }, + shouldBePatched: false, + restartPrimary: false, + }, { subtest: "Patroni and Postgresql.Parameters differ - restart replica first", patroni: acidv1.Patroni{ @@ -230,7 +244,8 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { "log_min_duration_statement": "500", // desired 200 "max_connections": "100", // desired 50 }, - restartPrimary: false, + shouldBePatched: true, + restartPrimary: false, }, { subtest: "multiple Postgresql.Parameters differ - restart replica first", @@ -239,7 +254,8 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { "log_min_duration_statement": "500", // desired 200 "max_connections": "100", // desired 50 }, - restartPrimary: false, + shouldBePatched: true, + restartPrimary: false, }, { subtest: "desired max_connections bigger - restart replica first", @@ -248,7 +264,8 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { "log_min_duration_statement": "200", "max_connections": "30", // desired 50 }, - restartPrimary: false, + shouldBePatched: true, + restartPrimary: false, }, { subtest: "desired max_connections smaller - restart master first", @@ -257,19 +274,21 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { "log_min_duration_statement": "200", "max_connections": "100", // desired 50 }, - restartPrimary: true, + shouldBePatched: true, + restartPrimary: true, }, { subtest: "slot does not exist but is desired", patroni: acidv1.Patroni{ TTL: 20, }, - slots: testSlots, + desiredSlots: testSlots, pgParams: map[string]string{ "log_min_duration_statement": "200", "max_connections": "50", }, - restartPrimary: false, + shouldBePatched: true, + restartPrimary: false, }, { subtest: "slot exist, nothing specified in manifest", @@ -287,7 +306,28 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { "log_min_duration_statement": "200", "max_connections": "50", }, - restartPrimary: false, + shouldBePatched: false, + restartPrimary: false, + }, + { + subtest: "slot is removed from manifest", + patroni: acidv1.Patroni{ + TTL: 20, + Slots: map[string]map[string]string{ + "slot1": { + "type": "logical", + "plugin": "pgoutput", + "database": "foo", + }, + }, + }, + removedSlots: testSlots, + pgParams: map[string]string{ + "log_min_duration_statement": "200", + "max_connections": "50", + }, + shouldBePatched: true, + restartPrimary: false, }, { subtest: "slot plugin differs", @@ -301,28 +341,38 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { }, }, }, - slots: testSlots, + desiredSlots: testSlots, pgParams: map[string]string{ "log_min_duration_statement": "200", "max_connections": "50", }, - restartPrimary: false, + shouldBePatched: true, + restartPrimary: false, }, } for _, tt := range tests { - if tt.slots != nil { - cluster.Spec.Patroni.Slots = tt.slots + if len(tt.desiredSlots) > 0 { + cluster.Spec.Patroni.Slots = tt.desiredSlots + } + if len(tt.removedSlots) > 0 { + for slotName, removedSlot := range tt.removedSlots { + cluster.replicationSlots[slotName] = removedSlot + } } configPatched, requirePrimaryRestart, err := cluster.checkAndSetGlobalPostgreSQLConfiguration(mockPod, tt.patroni, cluster.Spec.Patroni, tt.pgParams, cluster.Spec.Parameters) assert.NoError(t, err) - if configPatched != true { + if configPatched != tt.shouldBePatched { t.Errorf("%s - %s: expected config update did not happen", testName, tt.subtest) } if requirePrimaryRestart != tt.restartPrimary { t.Errorf("%s - %s: wrong master restart strategy, got restart %v, expected restart %v", testName, tt.subtest, requirePrimaryRestart, tt.restartPrimary) } + + // reset slots for next tests + cluster.Spec.Patroni.Slots = nil + cluster.replicationSlots = make(map[string]interface{}) } testsFailsafe := []struct { @@ -403,7 +453,7 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { effectiveVal: util.True(), desiredVal: true, shouldBePatched: false, // should not require patching - restartPrimary: true, + restartPrimary: false, }, } From 02923caa0660912ea12ba93370711ef956dd9557 Mon Sep 17 00:00:00 2001 From: idanovinda Date: Wed, 21 Dec 2022 10:27:51 +0100 Subject: [PATCH 11/14] fix e2e test --- e2e/tests/test_e2e.py | 62 ++++++++++++++++++++++++++++++++----------- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index cee2e54b9..52c3af722 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -403,16 +403,12 @@ def test_config_update(self): "test_user": [] }, "databases": { - "foo": "test_user" + "foo": "test_user", + "bar": "test_user" }, "patroni": { "slots": { - "test_slot": { - "type": "logical", - "database": "foo", - "plugin": "pgoutput" - }, - "test_slot_2": { + "first_slot": { "type": "physical" } }, @@ -449,6 +445,8 @@ def compare_config(): "synchronous_mode not updated") self.assertEqual(desired_config["failsafe_mode"], effective_config["failsafe_mode"], "failsafe_mode not updated") + self.assertEqual(desired_config["slots"], effective_config["slots"], + "slots not updated") return True # check if Patroni config has been updated @@ -512,31 +510,63 @@ def compare_config(): # patch new slot via Patroni REST patroni_slot = "test_patroni_slot" patch_slot_command = """curl -s -XPATCH -d '{"slots": {"test_patroni_slot": {"type": "physical"}}}' localhost:8008/config""" + pg_patch_config["spec"]["patroni"]["slots"][patroni_slot] = {"type": "physical"} + k8s.exec_with_kubectl(leader.metadata.name, patch_slot_command) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + self.eventuallyTrue(compare_config, "Postgres config not applied") + + # test adding new slots + pg_add_new_slots_patch = { + "spec": { + "patroni": { + "slots": { + "test_slot": { + "type": "logical", + "database": "foo", + "plugin": "pgoutput" + }, + "test_slot_2": { + "type": "physical" + } + } + } + } + } + + for slot_name, slot_details in pg_add_new_slots_patch["spec"]["patroni"]["slots"].items(): + pg_patch_config["spec"]["patroni"]["slots"][slot_name] = slot_details + + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_add_new_slots_patch) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") self.eventuallyTrue(compare_config, "Postgres config not applied") - # delete test_slot_2 from config and change the plugin type for test_slot + # delete test_slot_2 from config and change the database type for test_slot slot_to_change = "test_slot" slot_to_remove = "test_slot_2" - pg_patch_slots = { + pg_delete_slot_patch = { "spec": { "patroni": { "slots": { "test_slot": { "type": "logical", - "database": "foo", - "plugin": "wal2json" - } + "database": "bar", + "plugin": "pgoutput" + }, + "test_slot_2": None } } } } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_slots) + pg_patch_config["spec"]["patroni"]["slots"][slot_to_change]["database"] = "bar" + del pg_patch_config["spec"]["patroni"]["slots"][slot_to_remove] + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_delete_slot_patch) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") self.eventuallyTrue(compare_config, "Postgres config not applied") @@ -550,12 +580,12 @@ def compare_config(): "The replication slot cannot be deleted", 10, 5) changed_slot_query = """ - SELECT plugin + SELECT database FROM pg_replication_slots WHERE slot_name = '%s'; """ % (slot_to_change) - self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", changed_slot_query)[0], "wal2json", + self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", changed_slot_query)[0], "bar", "The replication slot cannot be updated", 10, 5) # make sure slot from Patroni didn't get deleted From a7c3240819672b114d8ee394eff183f63d44b139 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Wed, 28 Dec 2022 10:33:10 +0100 Subject: [PATCH 12/14] Update e2e/tests/test_e2e.py --- e2e/tests/test_e2e.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 52c3af722..65a445762 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -399,13 +399,6 @@ def test_config_update(self): "wal_level": "logical" } }, - "users": { - "test_user": [] - }, - "databases": { - "foo": "test_user", - "bar": "test_user" - }, "patroni": { "slots": { "first_slot": { From 21608ca16db6fb31b4e14d3da18fef7ef15e6ba2 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Wed, 28 Dec 2022 10:33:38 +0100 Subject: [PATCH 13/14] Update e2e/tests/test_e2e.py --- e2e/tests/test_e2e.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 65a445762..6e4c88945 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -399,7 +399,7 @@ def test_config_update(self): "wal_level": "logical" } }, - "patroni": { + "patroni": { "slots": { "first_slot": { "type": "physical" From c05d0e20460fe1a07acb8cc7abf62dc2e7f124aa Mon Sep 17 00:00:00 2001 From: idanovinda Date: Mon, 2 Jan 2023 17:02:05 +0100 Subject: [PATCH 14/14] formatting slot query --- e2e/tests/test_e2e.py | 25 ++++++------------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 6e4c88945..39fd45323 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -563,32 +563,19 @@ def compare_config(): self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") self.eventuallyTrue(compare_config, "Postgres config not applied") - deleted_slot_query = """ - SELECT slot_name + get_slot_query = """ + SELECT %s FROM pg_replication_slots WHERE slot_name = '%s'; - """ % (slot_to_remove) - - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", deleted_slot_query)), 0, + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_slot_query%("slot_name", slot_to_remove))), 0, "The replication slot cannot be deleted", 10, 5) - changed_slot_query = """ - SELECT database - FROM pg_replication_slots - WHERE slot_name = '%s'; - """ % (slot_to_change) - - self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", changed_slot_query)[0], "bar", + self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", get_slot_query%("database", slot_to_change))[0], "bar", "The replication slot cannot be updated", 10, 5) # make sure slot from Patroni didn't get deleted - patroni_slot_query = """ - SELECT slot_name - FROM pg_replication_slots - WHERE slot_name = '%s'; - """ % (patroni_slot) - - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", patroni_slot_query)), 1, + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_slot_query%("slot_name", patroni_slot))), 1, "The replication slot from Patroni gets deleted", 10, 5) except timeout_decorator.TimeoutError: