Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9c932cc
Enable slot and publication deletion when stream application is removed
idanovinda Jul 2, 2024
3f4d446
fixing typo with major upgrade test
idanovinda Jul 3, 2024
bad33e7
check slot and publication creation
idanovinda Jul 5, 2024
cb861b3
convert stream section as a list
idanovinda Jul 5, 2024
5c3e735
add fes crd and fix stream
idanovinda Jul 5, 2024
73fcf09
list all existing database for iteration
idanovinda Jul 8, 2024
95889b4
fix unittest
idanovinda Jul 8, 2024
829a3fe
remove debug message and change test name
idanovinda Jul 8, 2024
5cb67d1
use minimal fes crd and 1 more secret
idanovinda Jul 8, 2024
b64c39e
modify served section in fes crd
idanovinda Jul 8, 2024
3d60260
apply feedback
idanovinda Jul 11, 2024
5418d89
fix bug of removing manifest slots in syncStreams
FxKu Jul 12, 2024
6ed0e70
fix overwrite because of pass by reference value
idanovinda Jul 17, 2024
2a90be3
improve e2etest make sure patroni slot is not deleted
idanovinda Jul 17, 2024
7747151
e2e test: should not delete nonstream publication
idanovinda Jul 19, 2024
952c8a8
grant permission to create publication for foo_user
idanovinda Jul 22, 2024
35990d0
add check if the publication is created successfully
idanovinda Jul 22, 2024
fb6dac9
fix e2e test: reset search path foo_user
idanovinda Jul 22, 2024
15e57f7
apply feedback
idanovinda Jul 23, 2024
5d037e2
not patching cluster role
idanovinda Jul 23, 2024
134f49c
e2e test: patch with append to not overwrite
idanovinda Jul 23, 2024
fc05c13
allow all verbs for fes resources
idanovinda Jul 23, 2024
d2f8179
Update e2e/tests/test_e2e.py
idanovinda Jul 24, 2024
ceca99a
Update e2e/tests/test_e2e.py
idanovinda Jul 24, 2024
e4df22c
Update e2e/tests/test_e2e.py
idanovinda Jul 24, 2024
47767aa
Update e2e/tests/test_e2e.py
idanovinda Jul 24, 2024
7d2987e
Update pkg/cluster/database.go
idanovinda Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,62 @@ def test_additional_owner_roles(self):
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", owner_query)), 3,
"Not all additional users found in database", 10, 5)

@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_stream_resources(self):
'''
Create a Postgres cluster with streaming resources and check them.
'''
k8s = self.k8s

self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"},
"Operator does not get in sync")
leader = k8s.get_cluster_leader_pod()

# create a table in one of the database of acid-minimal-cluster
create_stream_table = """
CREATE TABLE data.test_table (id int, payload jsonb);
"""
self.query_database(leader.metadata.name, "foo", create_stream_table)

# update the manifest with the streaming section
patch_streaming_config = {
"spec": {
"streams": {
"applicationId": "test-app",
"batchSize": 100,
"database": "foo",
"enableRecovery": True,
"tables": {
"data.test_table": {
"eventType": "test-event",
"idColumn": "id",
"payloadColumn": "payload",
"recoveryEventType": "test-event-dlq"
}
}
}
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config)

# check if publication, slot, and fes resource are created
get_publication_query = """
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app';
"""
get_slot_query = """
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app';
"""
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1,
"Publication is not created", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1,
"Replication slot is not created", 10, 5)

# remove the streaming section from the manifest

# check if publication, slot, and fes resource are removed


@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_additional_pod_capabilities(self):
'''
Expand Down Expand Up @@ -1203,7 +1259,7 @@ def check_version_14():
version = p["server_version"][0:2]
return version

self.evantuallyEqual(check_version_14, "14", "Version was not upgrade to 14")
self.eventuallyEqual(check_version_14, "14", "Version was not upgrade to 14")

@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_persistent_volume_claim_retention_policy(self):
Expand Down
9 changes: 9 additions & 0 deletions pkg/cluster/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
GROUP BY p.pubname;`
createPublicationSQL = `CREATE PUBLICATION "%s" FOR TABLE %s WITH (publish = 'insert, update');`
alterPublicationSQL = `ALTER PUBLICATION "%s" SET TABLE %s;`
dropPublicationSQL = `DROP PUBLICATION "%s";`

globalDefaultPrivilegesSQL = `SET ROLE TO "%s";
ALTER DEFAULT PRIVILEGES GRANT USAGE ON SCHEMAS TO "%s","%s";
Expand Down Expand Up @@ -628,6 +629,14 @@ func (c *Cluster) getPublications() (publications map[string]string, err error)
return dbPublications, err
}

func (c *Cluster) executeDropPublication(pubName string) error {
c.logger.Infof("dropping publication %q", pubName)
if _, err := c.pgDb.Exec(fmt.Sprintf(dropPublicationSQL, pubName)); err != nil {
return fmt.Errorf("could not execute drop publication: %v", err)
}
return nil
}

// executeCreatePublication creates new publication for given tables
// The caller is responsible for opening and closing the database connection.
func (c *Cluster) executeCreatePublication(pubName, tableList string) error {
Expand Down
125 changes: 90 additions & 35 deletions pkg/cluster/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) er
return nil
}

func (c *Cluster) deleteStream(stream *zalandov1.FabricEventStream) error {
c.setProcessName("deleting event stream")

err := c.KubeClient.FabricEventStreams(stream.Namespace).Delete(context.TODO(), stream.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("could not delete event stream %q: %v", stream.Name, err)
}
return nil
}

func (c *Cluster) deleteStreams() error {
c.setProcessName("deleting event streams")

Expand All @@ -61,7 +71,7 @@ func (c *Cluster) deleteStreams() error {
return fmt.Errorf("could not list of FabricEventStreams: %v", err)
}
for _, stream := range streams.Items {
err = c.KubeClient.FabricEventStreams(stream.Namespace).Delete(context.TODO(), stream.Name, metav1.DeleteOptions{})
err := c.deleteStream(&stream)
if err != nil {
errors = append(errors, fmt.Sprintf("could not delete event stream %q: %v", stream.Name, err))
}
Expand All @@ -85,9 +95,13 @@ func gatherApplicationIds(streams []acidv1.Stream) []string {
return appIds
}

func (c *Cluster) syncPublication(publication, dbName string, tables map[string]acidv1.StreamTable) error {
func (c *Cluster) syncPublication(slots map[string]map[string]string, publications map[string]map[string]acidv1.StreamTable, dbName string, slotNames []string) (map[string]map[string]string, error) {
createPublications := make(map[string]string)
alterPublications := make(map[string]string)
deletePublications := make(map[string]string)
slotsToSync := make(map[string]map[string]string)

c.logger.Debug("is this even going here????")

defer func() {
if err := c.closeDbConn(); err != nil {
Expand All @@ -97,47 +111,65 @@ func (c *Cluster) syncPublication(publication, dbName string, tables map[string]

// check for existing publications
if err := c.initDbConnWithName(dbName); err != nil {
return fmt.Errorf("could not init database connection")
return nil, fmt.Errorf("could not init database connection: %v", err)
}

currentPublications, err := c.getPublications()
if err != nil {
return fmt.Errorf("could not get current publications: %v", err)
}

tableNames := make([]string, len(tables))
i := 0
for t := range tables {
tableName, schemaName := getTableSchema(t)
tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName)
i++
return nil, fmt.Errorf("could not get current publications: %v", err)
}

// gather list of required publications
for _, slotName := range slotNames {
tables := publications[slotName]
tableNames := make([]string, len(publications[slotName]))
i := 0
for t := range tables {
tableName, schemaName := getTableSchema(t)
tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName)
i++
}
sort.Strings(tableNames)
tableList := strings.Join(tableNames, ", ")

currentTables, exists := currentPublications[slotName]
if !exists {
createPublications[slotName] = tableList
} else if currentTables != tableList {
alterPublications[slotName] = tableList
}
slotsToSync[slotName] = slots[slotName]
}
sort.Strings(tableNames)
tableList := strings.Join(tableNames, ", ")

currentTables, exists := currentPublications[publication]
if !exists {
createPublications[publication] = tableList
} else if currentTables != tableList {
alterPublications[publication] = tableList
// check if there is any deletion
for slotName, tables := range currentPublications {
if _, exists := publications[slotName]; !exists {
deletePublications[slotName] = tables
}
}

if len(createPublications)+len(alterPublications) == 0 {
return nil
if len(createPublications)+len(alterPublications)+len(deletePublications) == 0 {
return nil, nil
}

for publicationName, tables := range createPublications {
if err = c.executeCreatePublication(publicationName, tables); err != nil {
return fmt.Errorf("creation of publication %q failed: %v", publicationName, err)
return nil, fmt.Errorf("creation of publication %q failed: %v", publicationName, err)
}
}
for publicationName, tables := range alterPublications {
if err = c.executeAlterPublication(publicationName, tables); err != nil {
return fmt.Errorf("update of publication %q failed: %v", publicationName, err)
return nil, fmt.Errorf("update of publication %q failed: %v", publicationName, err)
}
}
for publicationName, _ := range deletePublications {
slotsToSync[publicationName] = nil
if err = c.executeDropPublication(publicationName); err != nil {
return nil, fmt.Errorf("deletion of publication %q failed: %v", publicationName, err)
}
}

return nil
return slotsToSync, nil
}

func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream {
Expand Down Expand Up @@ -283,12 +315,13 @@ func (c *Cluster) syncStreams() error {
slotsToSync := make(map[string]map[string]string)
publications := make(map[string]map[string]acidv1.StreamTable)
requiredPatroniConfig := c.Spec.Patroni
databases := make(map[string][]string)

if len(requiredPatroniConfig.Slots) > 0 {
slots = requiredPatroniConfig.Slots
}

// gather list of required slots and publications
// gather list of required slots and publications, group by database
for _, stream := range c.Spec.Streams {
slot := map[string]string{
"database": stream.Database,
Expand All @@ -308,27 +341,37 @@ func (c *Cluster) syncStreams() error {
}
publications[slotName] = streamTables
}
// save the slotName in the database list
if _, exists := databases[stream.Database]; !exists {
databases[stream.Database] = []string{slotName}
} else {
if !util.SliceContains(databases[stream.Database], slotName) {
databases[stream.Database] = append(databases[stream.Database], slotName)
}
}
}

// create publications to each created slot
// sync publication in a database
c.logger.Debug("syncing database publications")
for publication, tables := range publications {
// but first check for existing publications
dbName := slots[publication]["database"]
err = c.syncPublication(publication, dbName, tables)
for dbName, slotNames := range databases {
c.logger.Debug(dbName, slotNames)
slotsToSyncDb, err := c.syncPublication(slots, publications, dbName, slotNames)
c.logger.Debug(slotsToSyncDb, err)
if err != nil {
c.logger.Warningf("could not sync publication %q in database %q: %v", publication, dbName, err)
c.logger.Warningf("could not sync publications in database %q: %v", dbName, err)
continue
}
slotsToSync[publication] = slots[publication]
// if it's not exists in the slotsToSync, add it
for slotName, slotSection := range slotsToSyncDb {
if _, exists := slotsToSync[slotName]; !exists {
slotsToSync[slotName] = slotSection
}
}
}

// no slots to sync = no streams defined or publications created
if len(slotsToSync) > 0 {
requiredPatroniConfig.Slots = slotsToSync
} else {
// try to delete existing stream resources
return c.deleteStreams()
}

c.logger.Debug("syncing logical replication slots")
Expand Down Expand Up @@ -398,6 +441,18 @@ func (c *Cluster) createOrUpdateStreams() error {
}
}

// check if there is any deletion
for _, stream := range streams.Items {
if !util.SliceContains(appIds, stream.Spec.ApplicationId) {
c.logger.Infof("event streams with applicationId %s do not exist, create it", stream.Spec.ApplicationId)
err := c.deleteStream(&stream)
if err != nil {
return fmt.Errorf("failed deleting event streams with applicationId %s: %v", stream.Spec.ApplicationId, err)
}
c.logger.Infof("event streams %q have been successfully deleted", stream.Name)
}
}

return nil
}

Expand Down