Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9c932cc
Enable slot and publication deletion when stream application is removed
idanovinda Jul 2, 2024
3f4d446
fixing typo with major upgrade test
idanovinda Jul 3, 2024
bad33e7
check slot and publication creation
idanovinda Jul 5, 2024
cb861b3
convert stream section as a list
idanovinda Jul 5, 2024
5c3e735
add fes crd and fix stream
idanovinda Jul 5, 2024
73fcf09
list all existing database for iteration
idanovinda Jul 8, 2024
95889b4
fix unittest
idanovinda Jul 8, 2024
829a3fe
remove debug message and change test name
idanovinda Jul 8, 2024
5cb67d1
use minimal fes crd and 1 more secret
idanovinda Jul 8, 2024
b64c39e
modify served section in fes crd
idanovinda Jul 8, 2024
3d60260
apply feedback
idanovinda Jul 11, 2024
5418d89
fix bug of removing manifest slots in syncStreams
FxKu Jul 12, 2024
6ed0e70
fix overwrite because of pass by reference value
idanovinda Jul 17, 2024
2a90be3
improve e2etest make sure patroni slot is not deleted
idanovinda Jul 17, 2024
7747151
e2e test: should not delete nonstream publication
idanovinda Jul 19, 2024
952c8a8
grant permission to create publication for foo_user
idanovinda Jul 22, 2024
35990d0
add check if the publication is created successfully
idanovinda Jul 22, 2024
fb6dac9
fix e2e test: reset search path foo_user
idanovinda Jul 22, 2024
15e57f7
apply feedback
idanovinda Jul 23, 2024
5d037e2
not patching cluster role
idanovinda Jul 23, 2024
134f49c
e2e test: patch with append to not overwrite
idanovinda Jul 23, 2024
fc05c13
allow all verbs for fes resources
idanovinda Jul 23, 2024
d2f8179
Update e2e/tests/test_e2e.py
idanovinda Jul 24, 2024
ceca99a
Update e2e/tests/test_e2e.py
idanovinda Jul 24, 2024
e4df22c
Update e2e/tests/test_e2e.py
idanovinda Jul 24, 2024
47767aa
Update e2e/tests/test_e2e.py
idanovinda Jul 24, 2024
7d2987e
Update pkg/cluster/database.go
idanovinda Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions e2e/tests/k8s_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def __init__(self):

self.config = config.load_kube_config()
self.k8s_client = client.ApiClient()
self.rbac_api = client.RbacAuthorizationV1Api()

self.core_v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
Expand Down
125 changes: 122 additions & 3 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ def setUpClass(cls):
"infrastructure-roles.yaml",
"infrastructure-roles-new.yaml",
"custom-team-membership.yaml",
"e2e-storage-class.yaml"]:
"e2e-storage-class.yaml",
"fes.crd.yaml"]:
result = k8s.create_with_kubectl("manifests/" + filename)
print("stdout: {}, stderr: {}".format(result.stdout, result.stderr))

Expand Down Expand Up @@ -199,6 +200,7 @@ def test_additional_owner_roles(self):
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", owner_query)), 3,
"Not all additional users found in database", 10, 5)


@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_additional_pod_capabilities(self):
'''
Expand Down Expand Up @@ -1203,7 +1205,7 @@ def check_version_14():
version = p["server_version"][0:2]
return version

self.evantuallyEqual(check_version_14, "14", "Version was not upgrade to 14")
self.eventuallyEqual(check_version_14, "14", "Version was not upgrade to 14")

@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_persistent_volume_claim_retention_policy(self):
Expand Down Expand Up @@ -1989,6 +1991,123 @@ def test_standby_cluster(self):
"acid.zalan.do", "v1", "default", "postgresqls", "acid-standby-cluster")
time.sleep(5)

@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_stream_resources(self):
'''
Create and delete fabric event streaming resources.
'''
k8s = self.k8s
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"},
"Operator does not get in sync")
leader = k8s.get_cluster_leader_pod()

# patch ClusterRole with CRUD privileges on FES resources
cluster_role = k8s.api.rbac_api.read_cluster_role("postgres-operator")
fes_cluster_role_rule = client.V1PolicyRule(
api_groups=["zalando.org"],
resources=["fabriceventstreams"],
verbs=["create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"]
)
cluster_role.rules.append(fes_cluster_role_rule)
k8s.api.rbac_api.patch_cluster_role("postgres-operator", cluster_role)

# create a table in one of the database of acid-minimal-cluster
create_stream_table = """
CREATE TABLE test_table (id int, payload jsonb);
"""
self.query_database(leader.metadata.name, "foo", create_stream_table)

# update the manifest with the streams section
patch_streaming_config = {
"spec": {
"patroni": {
"slots": {
"manual_slot": {
"type": "physical"
}
}
},
"streams": [
{
"applicationId": "test-app",
"batchSize": 100,
"database": "foo",
"enableRecovery": True,
"tables": {
"test_table": {
"eventType": "test-event",
"idColumn": "id",
"payloadColumn": "payload",
"recoveryEventType": "test-event-dlq"
}
}
}
]
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

# check if publication, slot, and fes resource are created
get_publication_query = """
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app';
"""
get_slot_query = """
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app';
"""
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1,
"Publication is not created", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1,
"Replication slot is not created", 10, 5)
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
"zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1,
"Could not find Fabric Event Stream resource", 10, 5)

# grant create and ownership of test_table to foo_user, reset search path to default
grant_permission_foo_user = """
GRANT CREATE ON DATABASE foo TO foo_user;
ALTER TABLE test_table OWNER TO foo_user;
ALTER ROLE foo_user RESET search_path;
"""
self.query_database(leader.metadata.name, "foo", grant_permission_foo_user)
# non-postgres user creates a publication
create_nonstream_publication = """
CREATE PUBLICATION mypublication FOR TABLE test_table;
"""
self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user")

# remove the streams section from the manifest
patch_streaming_config_removal = {
"spec": {
"streams": []
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

# check if publication, slot, and fes resource are removed
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
"zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 0,
'Could not delete Fabric Event Stream resource', 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0,
"Publication is not deleted", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0,
"Replication slot is not deleted", 10, 5)

# check the manual_slot and mypublication should not get deleted
get_manual_slot_query = """
SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot';
"""
get_nonstream_publication_query = """
SELECT * FROM pg_publication WHERE pubname = 'mypublication';
"""
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1,
"Slot defined in patroni config is deleted", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1,
"Publication defined not in stream section is deleted", 10, 5)

@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_taint_based_eviction(self):
'''
Expand Down Expand Up @@ -2115,7 +2234,7 @@ def test_zz_cluster_deletion(self):
self.eventuallyEqual(lambda: k8s.count_statefulsets_with_label(cluster_label), 0, "Statefulset not deleted")
self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted")
self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 7, "Secrets were deleted although disabled in config")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config")
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config")

except timeout_decorator.TimeoutError:
Expand Down
23 changes: 23 additions & 0 deletions manifests/fes.crd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: fabriceventstreams.zalando.org
spec:
group: zalando.org
names:
kind: FabricEventStream
listKind: FabricEventStreamList
plural: fabriceventstreams
singular: fabriceventstream
shortNames:
- fes
categories:
- all
scope: Namespaced
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
6 changes: 6 additions & 0 deletions pkg/apis/zalando.org/v1/fabriceventstream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1

import (
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -89,3 +90,8 @@ type DBAuth struct {
UserKey string `json:"userKey,omitempty"`
PasswordKey string `json:"passwordKey,omitempty"`
}

type Slot struct {
Slot map[string]string `json:"slot"`
Publication map[string]acidv1.StreamTable `json:"publication"`
}
10 changes: 10 additions & 0 deletions pkg/cluster/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ const (
getPublicationsSQL = `SELECT p.pubname, string_agg(pt.schemaname || '.' || pt.tablename, ', ' ORDER BY pt.schemaname, pt.tablename)
FROM pg_publication p
LEFT JOIN pg_publication_tables pt ON pt.pubname = p.pubname
WHERE p.pubowner = 10
GROUP BY p.pubname;`
createPublicationSQL = `CREATE PUBLICATION "%s" FOR TABLE %s WITH (publish = 'insert, update');`
alterPublicationSQL = `ALTER PUBLICATION "%s" SET TABLE %s;`
dropPublicationSQL = `DROP PUBLICATION "%s";`

globalDefaultPrivilegesSQL = `SET ROLE TO "%s";
ALTER DEFAULT PRIVILEGES GRANT USAGE ON SCHEMAS TO "%s","%s";
Expand Down Expand Up @@ -628,6 +630,14 @@ func (c *Cluster) getPublications() (publications map[string]string, err error)
return dbPublications, err
}

func (c *Cluster) executeDropPublication(pubName string) error {
c.logger.Infof("dropping publication %q", pubName)
if _, err := c.pgDb.Exec(fmt.Sprintf(dropPublicationSQL, pubName)); err != nil {
return fmt.Errorf("could not execute drop publication: %v", err)
}
return nil
}

// executeCreatePublication creates new publication for given tables
// The caller is responsible for opening and closing the database connection.
func (c *Cluster) executeCreatePublication(pubName, tableList string) error {
Expand Down
Loading