-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Allow drop slots when it gets deleted from the manifest #2089
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
5a4bad4
267a161
7370258
29d09d9
530a36c
2feeb65
5ae08d7
86dfc8c
8c0c0ca
21ba57f
02923ca
a7c3240
21608ca
c05d0e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -395,12 +395,24 @@ def test_config_update(self): | |
| "spec": { | ||
| "postgresql": { | ||
| "parameters": { | ||
| "max_connections": new_max_connections_value | ||
| "max_connections": new_max_connections_value, | ||
| "wal_level": "logical" | ||
| } | ||
| }, | ||
| "patroni": { | ||
| "users": { | ||
| "test_user": [] | ||
| }, | ||
| "databases": { | ||
| "foo": "test_user" | ||
| }, | ||
| "patroni": { | ||
| "slots": { | ||
| "test_slot": { | ||
| "type": "logical", | ||
| "database": "foo", | ||
| "plugin": "pgoutput" | ||
| }, | ||
| "test_slot_2": { | ||
| "type": "physical" | ||
| } | ||
| }, | ||
|
|
@@ -497,6 +509,65 @@ def compare_config(): | |
| self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, | ||
| "Previous max_connections setting not applied on replica", 10, 5) | ||
|
|
||
| # patch new slot via Patroni REST | ||
| patroni_slot = "test_patroni_slot" | ||
| patch_slot_command = """curl -s -XPATCH -d '{"slots": {"test_patroni_slot": {"type": "physical"}}}' localhost:8008/config""" | ||
| k8s.exec_with_kubectl(leader.metadata.name, patch_slot_command) | ||
|
|
||
| self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||
| self.eventuallyTrue(compare_config, "Postgres config not applied") | ||
|
|
||
| # delete test_slot_2 from config and change the plugin type for test_slot | ||
| slot_to_change = "test_slot" | ||
| slot_to_remove = "test_slot_2" | ||
| pg_patch_slots = { | ||
| "spec": { | ||
| "patroni": { | ||
| "slots": { | ||
| "test_slot": { | ||
| "type": "logical", | ||
| "database": "foo", | ||
| "plugin": "wal2json" | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| k8s.api.custom_objects_api.patch_namespaced_custom_object( | ||
| "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_slots) | ||
|
|
||
| self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||
| self.eventuallyTrue(compare_config, "Postgres config not applied") | ||
|
|
||
| deleted_slot_query = """ | ||
| SELECT slot_name | ||
| FROM pg_replication_slots | ||
| WHERE slot_name = '%s'; | ||
| """ % (slot_to_remove) | ||
|
||
|
|
||
| self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", deleted_slot_query)), 0, | ||
| "The replication slot cannot be deleted", 10, 5) | ||
|
|
||
| changed_slot_query = """ | ||
| SELECT plugin | ||
| FROM pg_replication_slots | ||
| WHERE slot_name = '%s'; | ||
| """ % (slot_to_change) | ||
|
|
||
| self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", changed_slot_query)[0], "wal2json", | ||
| "The replication slot cannot be updated", 10, 5) | ||
|
|
||
| # make sure slot from Patroni didn't get deleted | ||
| patroni_slot_query = """ | ||
| SELECT slot_name | ||
| FROM pg_replication_slots | ||
| WHERE slot_name = '%s'; | ||
| """ % (patroni_slot) | ||
|
|
||
| self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", patroni_slot_query)), 1, | ||
| "The replication slot from Patroni gets deleted", 10, 5) | ||
|
|
||
| except timeout_decorator.TimeoutError: | ||
| print('Operator log: {}'.format(k8s.get_operator_log())) | ||
| raise | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -579,15 +579,26 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv | |
| } | ||
| } | ||
|
|
||
| slotsToSet := make(map[string]interface{}) | ||
| // check if there is any slot deletion | ||
| for slotName, effectiveSlot := range c.replicationSlots { | ||
| if desiredSlot, exists := desiredPatroniConfig.Slots[slotName]; exists { | ||
| if reflect.DeepEqual(effectiveSlot, desiredSlot) { | ||
| continue | ||
| } | ||
| } | ||
| slotsToSet[slotName] = nil | ||
idanovinda marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| delete(c.replicationSlots, slotName) | ||
| } | ||
| // check if specified slots exist in config and if they differ | ||
| slotsToSet := make(map[string]map[string]string) | ||
| for slotName, desiredSlot := range desiredPatroniConfig.Slots { | ||
| if effectiveSlot, exists := effectivePatroniConfig.Slots[slotName]; exists { | ||
| if reflect.DeepEqual(desiredSlot, effectiveSlot) { | ||
| continue | ||
| } | ||
| } | ||
| slotsToSet[slotName] = desiredSlot | ||
| c.replicationSlots[slotName] = desiredSlot | ||
| } | ||
| if len(slotsToSet) > 0 { | ||
| configToSet["slots"] = slotsToSet | ||
|
|
@@ -614,7 +625,7 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv | |
| } | ||
|
|
||
| // check if there exist only config updates that require a restart of the primary | ||
| if !util.SliceContains(restartPrimary, false) && len(configToSet) == 0 { | ||
| if len(restartPrimary) > 0 && !util.SliceContains(restartPrimary, false) && len(configToSet) == 0 { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if there's nothing to set |
||
| requiresMasterRestart = true | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.