From 9c932ccb8e11a91bfc999f7606b789d85682cb21 Mon Sep 17 00:00:00 2001 From: idanovinda Date: Tue, 2 Jul 2024 14:36:55 +0200 Subject: [PATCH 01/27] Enable slot and publication deletion when stream application is removed --- pkg/cluster/database.go | 9 +++ pkg/cluster/streams.go | 125 +++++++++++++++++++++++++++++----------- 2 files changed, 99 insertions(+), 35 deletions(-) diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index cc203eef5..a89f9fbdc 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -52,6 +52,7 @@ const ( GROUP BY p.pubname;` createPublicationSQL = `CREATE PUBLICATION "%s" FOR TABLE %s WITH (publish = 'insert, update');` alterPublicationSQL = `ALTER PUBLICATION "%s" SET TABLE %s;` + dropPublicationSQL = `DROP PUBLICATION "%s";` globalDefaultPrivilegesSQL = `SET ROLE TO "%s"; ALTER DEFAULT PRIVILEGES GRANT USAGE ON SCHEMAS TO "%s","%s"; @@ -628,6 +629,14 @@ func (c *Cluster) getPublications() (publications map[string]string, err error) return dbPublications, err } +func (c *Cluster) executeDropPublication(pubName string) error { + c.logger.Infof("dropping publication %q", pubName) + if _, err := c.pgDb.Exec(fmt.Sprintf(dropPublicationSQL, pubName)); err != nil { + return fmt.Errorf("could not execute drop publication: %v", err) + } + return nil +} + // executeCreatePublication creates new publication for given tables // The caller is responsible for opening and closing the database connection. func (c *Cluster) executeCreatePublication(pubName, tableList string) error { diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index ec4221b4b..8f4f206d8 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -43,6 +43,16 @@ func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) er return nil } +func (c *Cluster) deleteStream(stream *zalandov1.FabricEventStream) error { + c.setProcessName("deleting event stream") + + err := c.KubeClient.FabricEventStreams(stream.Namespace).Delete(context.TODO(), stream.Name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("could not delete event stream %q: %v", stream.Name, err) + } + return nil +} + func (c *Cluster) deleteStreams() error { c.setProcessName("deleting event streams") @@ -61,7 +71,7 @@ func (c *Cluster) deleteStreams() error { return fmt.Errorf("could not list of FabricEventStreams: %v", err) } for _, stream := range streams.Items { - err = c.KubeClient.FabricEventStreams(stream.Namespace).Delete(context.TODO(), stream.Name, metav1.DeleteOptions{}) + err := c.deleteStream(&stream) if err != nil { errors = append(errors, fmt.Sprintf("could not delete event stream %q: %v", stream.Name, err)) } @@ -85,9 +95,13 @@ func gatherApplicationIds(streams []acidv1.Stream) []string { return appIds } -func (c *Cluster) syncPublication(publication, dbName string, tables map[string]acidv1.StreamTable) error { +func (c *Cluster) syncPublication(slots map[string]map[string]string, publications map[string]map[string]acidv1.StreamTable, dbName string, slotNames []string) (map[string]map[string]string, error) { createPublications := make(map[string]string) alterPublications := make(map[string]string) + deletePublications := make(map[string]string) + slotsToSync := make(map[string]map[string]string) + + c.logger.Debug("is this even going here????") defer func() { if err := c.closeDbConn(); err != nil { @@ -97,47 +111,65 @@ func (c *Cluster) syncPublication(publication, dbName string, tables map[string] // check for existing publications if err := c.initDbConnWithName(dbName); err != nil { - return fmt.Errorf("could not init database connection") + return nil, fmt.Errorf("could not init database connection: %v", err) } currentPublications, err := c.getPublications() if err != nil { - return fmt.Errorf("could not get current publications: %v", err) - } - - tableNames := make([]string, len(tables)) - i := 0 - for t := range tables { - tableName, schemaName := getTableSchema(t) - tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName) - i++ + return nil, fmt.Errorf("could not get current publications: %v", err) + } + + // gather list of required publications + for _, slotName := range slotNames { + tables := publications[slotName] + tableNames := make([]string, len(publications[slotName])) + i := 0 + for t := range tables { + tableName, schemaName := getTableSchema(t) + tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName) + i++ + } + sort.Strings(tableNames) + tableList := strings.Join(tableNames, ", ") + + currentTables, exists := currentPublications[slotName] + if !exists { + createPublications[slotName] = tableList + } else if currentTables != tableList { + alterPublications[slotName] = tableList + } + slotsToSync[slotName] = slots[slotName] } - sort.Strings(tableNames) - tableList := strings.Join(tableNames, ", ") - currentTables, exists := currentPublications[publication] - if !exists { - createPublications[publication] = tableList - } else if currentTables != tableList { - alterPublications[publication] = tableList + // check if there is any deletion + for slotName, tables := range currentPublications { + if _, exists := publications[slotName]; !exists { + deletePublications[slotName] = tables + } } - if len(createPublications)+len(alterPublications) == 0 { - return nil + if len(createPublications)+len(alterPublications)+len(deletePublications) == 0 { + return nil, nil } for publicationName, tables := range createPublications { if err = c.executeCreatePublication(publicationName, tables); err != nil { - return fmt.Errorf("creation of publication %q failed: %v", publicationName, err) + return nil, fmt.Errorf("creation of publication %q failed: %v", publicationName, err) } } for publicationName, tables := range alterPublications { if err = c.executeAlterPublication(publicationName, tables); err != nil { - return fmt.Errorf("update of publication %q failed: %v", publicationName, err) + return nil, fmt.Errorf("update of publication %q failed: %v", publicationName, err) + } + } + for publicationName, _ := range deletePublications { + slotsToSync[publicationName] = nil + if err = c.executeDropPublication(publicationName); err != nil { + return nil, fmt.Errorf("deletion of publication %q failed: %v", publicationName, err) } } - return nil + return slotsToSync, nil } func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream { @@ -283,12 +315,13 @@ func (c *Cluster) syncStreams() error { slotsToSync := make(map[string]map[string]string) publications := make(map[string]map[string]acidv1.StreamTable) requiredPatroniConfig := c.Spec.Patroni + databases := make(map[string][]string) if len(requiredPatroniConfig.Slots) > 0 { slots = requiredPatroniConfig.Slots } - // gather list of required slots and publications + // gather list of required slots and publications, group by database for _, stream := range c.Spec.Streams { slot := map[string]string{ "database": stream.Database, @@ -308,27 +341,37 @@ func (c *Cluster) syncStreams() error { } publications[slotName] = streamTables } + // save the slotName in the database list + if _, exists := databases[stream.Database]; !exists { + databases[stream.Database] = []string{slotName} + } else { + if !util.SliceContains(databases[stream.Database], slotName) { + databases[stream.Database] = append(databases[stream.Database], slotName) + } + } } - // create publications to each created slot + // sync publication in a database c.logger.Debug("syncing database publications") - for publication, tables := range publications { - // but first check for existing publications - dbName := slots[publication]["database"] - err = c.syncPublication(publication, dbName, tables) + for dbName, slotNames := range databases { + c.logger.Debug(dbName, slotNames) + slotsToSyncDb, err := c.syncPublication(slots, publications, dbName, slotNames) + c.logger.Debug(slotsToSyncDb, err) if err != nil { - c.logger.Warningf("could not sync publication %q in database %q: %v", publication, dbName, err) + c.logger.Warningf("could not sync publications in database %q: %v", dbName, err) continue } - slotsToSync[publication] = slots[publication] + // if it's not exists in the slotsToSync, add it + for slotName, slotSection := range slotsToSyncDb { + if _, exists := slotsToSync[slotName]; !exists { + slotsToSync[slotName] = slotSection + } + } } // no slots to sync = no streams defined or publications created if len(slotsToSync) > 0 { requiredPatroniConfig.Slots = slotsToSync - } else { - // try to delete existing stream resources - return c.deleteStreams() } c.logger.Debug("syncing logical replication slots") @@ -398,6 +441,18 @@ func (c *Cluster) createOrUpdateStreams() error { } } + // check if there is any deletion + for _, stream := range streams.Items { + if !util.SliceContains(appIds, stream.Spec.ApplicationId) { + c.logger.Infof("event streams with applicationId %s do not exist, create it", stream.Spec.ApplicationId) + err := c.deleteStream(&stream) + if err != nil { + return fmt.Errorf("failed deleting event streams with applicationId %s: %v", stream.Spec.ApplicationId, err) + } + c.logger.Infof("event streams %q have been successfully deleted", stream.Name) + } + } + return nil } From 3f4d446d8302d085d64ed6fb77d13027dd61cd44 Mon Sep 17 00:00:00 2001 From: idanovinda Date: Wed, 3 Jul 2024 17:13:58 +0200 Subject: [PATCH 02/27] fixing typo with major upgrade test --- e2e/tests/test_e2e.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 43dd467b5..c8024c803 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -1179,7 +1179,6 @@ def get_docker_image(): self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members("acid-minimal-cluster-0")), 2, "Postgres status did not enter running") @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - @unittest.skip("Skipping this test until fixed") def test_major_version_upgrade(self): k8s = self.k8s result = k8s.create_with_kubectl("manifests/minimal-postgres-manifest-12.yaml") @@ -1203,7 +1202,7 @@ def check_version_14(): version = p["server_version"][0:2] return version - self.evantuallyEqual(check_version_14, "14", "Version was not upgrade to 14") + self.eventuallyEqual(check_version_14, "14", "Version was not upgrade to 14") @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_persistent_volume_claim_retention_policy(self): From bad33e7c1914758b5af898b7e458887ebc5259be Mon Sep 17 00:00:00 2001 From: idanovinda Date: Fri, 5 Jul 2024 14:55:00 +0200 Subject: [PATCH 03/27] check slot and publication creation --- e2e/tests/test_e2e.py | 57 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index c8024c803..8cdc95afc 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -199,6 +199,62 @@ def test_additional_owner_roles(self): self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", owner_query)), 3, "Not all additional users found in database", 10, 5) + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_stream_resources(self): + ''' + Create a Postgres cluster with streaming resources and check them. + ''' + k8s = self.k8s + + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, + "Operator does not get in sync") + leader = k8s.get_cluster_leader_pod() + + # create a table in one of the database of acid-minimal-cluster + create_stream_table = """ + CREATE TABLE data.test_table (id int, payload jsonb); + """ + self.query_database(leader.metadata.name, "foo", create_stream_table) + + # update the manifest with the streaming section + patch_streaming_config = { + "spec": { + "streams": { + "applicationId": "test-app", + "batchSize": 100, + "database": "foo", + "enableRecovery": True, + "tables": { + "data.test_table": { + "eventType": "test-event", + "idColumn": "id", + "payloadColumn": "payload", + "recoveryEventType": "test-event-dlq" + } + } + } + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config) + + # check if publication, slot, and fes resource are created + get_publication_query = """ + SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app'; + """ + get_slot_query = """ + SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app'; + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1, + "Publication is not created", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1, + "Replication slot is not created", 10, 5) + + # remove the streaming section from the manifest + + # check if publication, slot, and fes resource are removed + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_additional_pod_capabilities(self): ''' @@ -1179,6 +1235,7 @@ def get_docker_image(): self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members("acid-minimal-cluster-0")), 2, "Postgres status did not enter running") @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + @unittest.skip("Skipping this test until fixed") def test_major_version_upgrade(self): k8s = self.k8s result = k8s.create_with_kubectl("manifests/minimal-postgres-manifest-12.yaml") From cb861b3ba20c2d9b616c893dbd5bdb654ce1f09f Mon Sep 17 00:00:00 2001 From: idanovinda Date: Fri, 5 Jul 2024 15:27:12 +0200 Subject: [PATCH 04/27] convert stream section as a list --- e2e/tests/test_e2e.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 8cdc95afc..57c7823c1 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -200,7 +200,7 @@ def test_additional_owner_roles(self): "Not all additional users found in database", 10, 5) @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_stream_resources(self): + def test_aa_stream_resources(self): ''' Create a Postgres cluster with streaming resources and check them. ''' @@ -219,20 +219,22 @@ def test_stream_resources(self): # update the manifest with the streaming section patch_streaming_config = { "spec": { - "streams": { - "applicationId": "test-app", - "batchSize": 100, - "database": "foo", - "enableRecovery": True, - "tables": { - "data.test_table": { - "eventType": "test-event", - "idColumn": "id", - "payloadColumn": "payload", - "recoveryEventType": "test-event-dlq" + "streams": [ + { + "applicationId": "test-app", + "batchSize": 100, + "database": "foo", + "enableRecovery": True, + "tables": { + "data.test_table": { + "eventType": "test-event", + "idColumn": "id", + "payloadColumn": "payload", + "recoveryEventType": "test-event-dlq" + } } } - } + ] } } k8s.api.custom_objects_api.patch_namespaced_custom_object( From 5c3e735e7c459c253444261581056246343c952d Mon Sep 17 00:00:00 2001 From: idanovinda Date: Fri, 5 Jul 2024 19:10:14 +0200 Subject: [PATCH 05/27] add fes crd and fix stream --- e2e/tests/test_e2e.py | 7 +- manifests/fes.crd.yaml | 1311 ++++++++++++++++++++++++++++++++++++++++ pkg/cluster/streams.go | 7 +- 3 files changed, 1317 insertions(+), 8 deletions(-) create mode 100644 manifests/fes.crd.yaml diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 57c7823c1..a66a9e3f0 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -129,7 +129,8 @@ def setUpClass(cls): "infrastructure-roles.yaml", "infrastructure-roles-new.yaml", "custom-team-membership.yaml", - "e2e-storage-class.yaml"]: + "e2e-storage-class.yaml", + "fes.crd.yaml"]: result = k8s.create_with_kubectl("manifests/" + filename) print("stdout: {}, stderr: {}".format(result.stdout, result.stderr)) @@ -212,7 +213,7 @@ def test_aa_stream_resources(self): # create a table in one of the database of acid-minimal-cluster create_stream_table = """ - CREATE TABLE data.test_table (id int, payload jsonb); + CREATE TABLE test_table (id int, payload jsonb); """ self.query_database(leader.metadata.name, "foo", create_stream_table) @@ -226,7 +227,7 @@ def test_aa_stream_resources(self): "database": "foo", "enableRecovery": True, "tables": { - "data.test_table": { + "test_table": { "eventType": "test-event", "idColumn": "id", "payloadColumn": "payload", diff --git a/manifests/fes.crd.yaml b/manifests/fes.crd.yaml new file mode 100644 index 000000000..58450aa79 --- /dev/null +++ b/manifests/fes.crd.yaml @@ -0,0 +1,1311 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: fabriceventstreams.zalando.org +spec: + group: zalando.org + names: + kind: FabricEventStream + listKind: FabricEventStreamList + plural: fabriceventstreams + singular: fabriceventstream + shortNames: + - fes + categories: + - all + scope: Namespaced + conversion: + strategy: None + versions: + - name: v1alpha1 + served: true + storage: false + subresources: + status: {} + additionalPrinterColumns: + - name: ApplicationID + type: string + jsonPath: .spec.applicationId + - name: Age + type: date + jsonPath: .metadata.creationTimestamp + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + required: + - eventStreams + - applicationId + properties: + applicationId: + type: string + description: The application name that these streams belong to as registered in YourTurn + example: donut + eventStreams: + type: array + items: + type: object + required: + - source + - sink + properties: + uniqueStreamId: + type: string + description: | + A unique-per-namespace ID supplied by a human to use for the deployment name. + Used to help distinguish streams from each other in telemetry output. + Ignored if multiple streams share the same source specification + maxLength: 20 + source: + description: | + This section configures the source of the data which will be pushed onto Fabric Event Scheduler's work queue. + type: object + properties: + type: + type: string + jdbcConnection: + type: object + description: Represents a JDBC connection to a replication slot. + required: + - slotName + - jdbcUrl + - databaseAuthentication + properties: + slotName: + type: string + description: The name of an existing replication slot to connect to. + pattern: "^[0-9a-zA-Z_]+$" + maxLength: 63 + example: my_slot + pluginType: + type: string + description: Postgres WAL plugin. Defaults to wal2json initially, later migrating to pgoutput + example: wal2json + publicationName: + type: string + description: | + The name of the Publication to subscribe to when using `pgoutput` plugin type. + https://www.postgresql.org/docs/14/logical-replication-publication.html + The recommendation is to manually create the publication, tailored to your CDC needs. + If absent, FES will attempt to create this Publication (requires SUPERUSER permission). + pattern: "^[0-9a-zA-Z_]+$" + example: my_fes_publication + jdbcUrl: + type: string + description: JDBC url of the database to connect to. + example: jdbc:postgresql://host:port/database?sslmode=require + databaseAuthentication: + type: object + description: Represents a Kubernetes secret. + required: + - type + - name + - userKey + - passwordKey + properties: + type: + type: string + pattern: "^DatabaseAuthenticationSecret$" + name: + type: string + description: | + Name of the secret to extract the username and password from. + userKey: + type: string + description: | + The key in the secret which contains the username to be used for logical replication. + passwordKey: + type: string + description: | + The key in the secret which contains the password to be used for logical replication. + oneOf: + - required: + - type + - name + - userKey + - passwordKey + properties: + type: + pattern: "^DatabaseAuthenticationSecret$" + schema: + type: string + description: Name of the datbase schema to monitor. Defaults to `public` + example: public + table: + type: object + description: Which table should be monitored. Row updates to this table will be the data source. + required: + - name + properties: + name: + type: string + description: Table name. + example: mytable + idColumn: + type: string + description: The id column in the source table to use in telemetry. Defaults to `id` + example: id + subscription: + type: object + description: DynamoDb stream arn and lease table to use for a subscription + required: + - streamArn + - leaseTableName + properties: + leaseTableName: + type: string + description: | + The lease table used by the the KCL connector to keep track of shard positions in the DynamoDb Stream. + Defaults to the fes resource name if not specified + streamArn: + type: string + description: DynamoDb stream name to connect to + stsAssumeRoleArn: + type: string + description: | + ARN of a role to be assumed to access DynamoDB and DynamoDB Stream + (needed for a cross-account access) + arnResolutionMode: + type: string + description: | + EXACT (default) - requires the exact stream ARN to be specified + DYNAMIC - Allows a wildcard (*) at the end of the ARN to enable dynamic fetching of the stream ID based on the tableName. + default: "EXACT" + enum: + - "EXACT" + - "DYNAMIC" + nakadiSubscription: + type: object + description: Description of the stream used to receive events from Nakadi + required: + - eventTypeName + - consumerGroup + - adminTeams + properties: + eventTypeName: + type: string + description: The event type to receive + consumerGroup: + type: string + description: What consumer group to use when creating or connecting to the stream + adminTeams: + type: array + description: Names of the teams that will be added as admin permissions on the subscription + items: + type: string + batchLimit: + type: integer + description: The maximum number of events to fetch in a single batch + example: "100" + maxUncommittedEvents: + type: integer + description: The maximum number of events that can be fetched but not yet committed + example: "1000" + readFrom: + description: By default, new subscription start reading the end, use this to change to begin + example: "end" + type: string + enum: + - "begin" + - "end" + filter: + description: JsonPath can be used to filter entities from the event stream. For example, to only emit events if an entity had its status set to active after the change OR if its status was active before the change. See example + example: + "[?(@.after.status=='active' || @.before.status=='active')]" + type: string + format: jsonpath + minReplicas: + description: Min replicas can be used to specify the number of replicas to use for the component to scale horizontally + example: "3" + type: integer + operations: + description: | + List of source operations that should be processed. + Default behaviour is to only skip DynamoDB REMOVE and Postgres DELETE operations. + example: ["INSERT", "UPDATE", "DELETE"] + type: array + items: + type: string + enum: + - "INSERT" + - "UPDATE" + - "MODIFY" + - "DELETE" + - "REMOVE" + - "C" + - "U" + - "D" + - "S" + oneOf: + - required: + - type + - jdbcConnection + - table + properties: + type: + pattern: "^PostgresLogicalReplication$" + - required: + - type + - subscription + properties: + type: + pattern: "^DynamoDbStreamsSubscription$" + - required: + - type + - nakadiSubscription + properties: + type: + pattern: "^NakadiStreamsSubscription$" + flow: + type: object + description: | + This section configures how the data from the source will be processed or transformed. + properties: + type: + type: string + callHomeUrl: + type: string + description: The value to put in the callHomeUri property. + callHomeIdColumn: + type: string + description: Name of column holding the String value to put in entityId property. + example: id + flowIdColumn: + description: Name of the flow id column in the source table if any. + type: string + spanCtxColumn: + description: Name of the span context column in the source table if any. + type: string + dataTypeColumn: + type: string + description: Name of the column holding the String value to put in the data_type property. + example: data_type + dataType: + type: string + description: Value of the data_type field in the Nakadi event. + example: change_data_capture + dataOpColumn: + type: string + description: Name of the column holding the String value to put in the data_op property. + example: data_op + metadataColumn: + type: string + description: Name of the column holding the Json payload to put in the metadata property. + example: metadata + dataColumn: + type: string + description: Name of the column holding the Json payload to put in the data property. + example: data + payloadColumn: + type: string + description: Name of the column holding the Json payload for the Nakadi event. Defaults to 'payload' + example: my_business_event + redisKeyPrefix: + type: string + description: The prefix to add to the constructed key value to be added to redis. Defaults to the empty string. + example: "prefix:" + redisKeyColumns: + type: array + items: + type: string + description: The names of the columns who's values will be concatenated together seperated by redisKeySeparator and prefixed by redisKeyPrefix + example: ["id", "sub_id"] + redisValueColumn: + type: string + description: Name of the column holding the value of the key that will be stored in redis. Defaults to 'payload' + example: value_column + redisKeySeparator: + type: string + description: The separator character used to seperated redisKeyColumns value(s) when constructing a key. Defaults to '.' + example: . + lambdaFunctionArn: + type: string + description: Name of the lambda function holding the data transformation logic from the Stream. + example: "arn:aws:lambda:eu-central-1:000000000000:function:convert-account-events" + kafkaValueColumn: + type: string + description: Name of the column holding value for the kafka sink. + example: data_col + kafkaKeyColumn: + type: string + description: Name of the column holding the key for the kafka sink. + example: key_col + kafkaTopicColumn: + type: string + description: Name of the column holding the topic, overrides value in the kafka sink if present. + example: topic_col + oneOf: + - required: + - type + - callHomeUrl + - callHomeIdColumn + properties: + type: + pattern: "^PostgresWalToApiCallHomeEvent$" + - required: + - type + properties: + type: + pattern: "^PostgresWalToNakadiDataEvent$" + - required: + - type + properties: + type: + pattern: "^PostgresWalToGenericNakadiEvent$|^DynamoDbStreamToGenericNakadiEvent$" + - required: + - type + properties: + type: + pattern: "^PostgresWalFromNakadiProducerToGenericNakadiEvent$" + - required: + - type + - lambdaFunctionArn + properties: + type: + pattern: "^AwsLambdaConversion$" + - required: + - type + properties: + type: + pattern: "^PostgresWalToNakadiCdcDataEvent$" + - required: + - type + - redisKeyColumns + properties: + type: + pattern: "^PostgresWalToRedisKvEvent$" + - required: + - type + - kafkaValueColumn + properties: + type: + pattern: "^PostgresWalToKafkaEvent$" + sink: + description: A sink belonging to an event stream. + type: object + properties: + type: + type: string + queueName: + type: string + queueUrl: + type: string + maxBatchSize: + type: integer + minimum: 1 + eventType: + type: string + description: The nakadi event type. + nakadiBaseUri: + type: string + description: The Nakadi base URI is defaulted per cluster environment. This property can be used to override it. + format: uri + example: https://nakadi-live.nakadi.zalan.do + partitionKeyFields: + type: array + description: | + The fields in the payload to be used by FES to partition the Events prior to publishing. + Builders should define the same fields here as in the event type definition. + If FES can locate non-null values for these fields, it will consistently partition events for parallel publishing. + The declared fields should consistently identify an entity, such that logical event ordering can be preserved. + If set incorrectly or null values are found, FES will not attempt partitioning and will publish in sequential batches. + Example: ["user_id", "order.order_id"] + items: + type: string + bucket: + type: string + description: The name of the S3 bucket. + key: + type: object + description: Defines how FES should derive the s3 key for each event. + properties: + type: + type: string + keyPrefix: + type: string + description: The prefix of the generated key. + example: "fes/output/" + oneOf: + - required: + - type + properties: + type: + pattern: "^GeneratedUuid" + tableName: + type: string + kafkaTopicName: + description: The kafka topic which events will be published to + type: string + kafkaBootstrapServers: + type: string + description: | + The kafka bootstrap servers. See https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#bootstrap-servers + redisClusterEnabled: + type: boolean + description: | + If the redis instances being connected to are a redis cluster, then this needs to specified as true. + Non clustered redis instances operating in a main node/replica configuration do not need to specify this property. + redisConnectionUrls: + type: array + description: | + The redis connection urls used to connect to the redis instance(s) to sink to. + Example: ["rediss://instance1.myredis-host.com:6379/0", "rediss://instance2.myredis-host.com:6379/0"] + items: + type: string + redisAuthentication: + type: object + description: Represents a Kubernetes secret. + required: + - type + - name + - passwordKey + - userKey + properties: + type: + type: string + pattern: "^Redis.*AuthenticationSecret$" + name: + type: string + description: | + Name of the secret to extract the username and password from. + userKey: + type: string + description: | + The key in the secret which contains the username to be used for redis ACL authentication. + (Not required when using RedisLegacyAuthenticationSecret) + passwordKey: + type: string + description: | + The key in the secret which contains the password to be used for redis authentication (ACL or legacy) + oneOf: + - required: + - type + - name + - passwordKey + properties: + type: + pattern: "^RedisLegacyAuthenticationSecret$" + - required: + - type + - name + - userKey + - passwordKey + properties: + type: + pattern: "^RedisACLAuthenticationSecret$" + oneOf: + - required: + - type + - queueName + properties: + type: + pattern: "^SqsStandard$" + - required: + - type + - queueName + properties: + type: + pattern: "^SqsFifo$" + - required: + - type + properties: + type: + pattern: "^Nakadi$" + - required: + - type + - bucket + - key + properties: + type: + pattern: "^S3" + - required: + - type + - tableName + properties: + type: + pattern: "^DynamoDb$" + - required: + - type + - redisConnectionUrls + properties: + type: + pattern: "^Redis$" + - required: + - type + - kafkaTopicName + - kafkaBootstrapServers + properties: + type: + pattern: "^Kafka$" + recovery: + type: object + description: This section configures recovery strategy for publishing errors. + properties: + type: + type: string + sink: + type: object + description: The dead letter sink. + properties: + type: + type: string + queueName: + type: string + queueUrl: + type: string + maxBatchSize: + type: integer + minimum: 1 + eventType: + type: string + description: The nakadi event type. + nakadiBaseUri: + type: string + description: The Nakadi base URI is defaulted per cluster environment. This property can be used to override it. + format: uri + example: https://nakadi-live.nakadi.zalan.do + partitionKeyFields: + type: array + description: | + The fields in the payload to be used by FES to partition the Events prior to publishing. + Builders should define the same fields here as in the event type definition. + If FES can locate non-null values for these fields, it will consistently partition events for parallel publishing. + The declared fields should consistently identify an entity, such that logical event ordering can be preserved. + If set incorrectly or null values are found, FES will not attempt partitioning and will publish in sequential batches. + Example: ["user_id", "order.order_id"] + items: + type: string + bucket: + type: string + description: The name of the S3 bucket. + key: + type: object + description: Defines how FES should derive the s3 key for each event. + properties: + type: + type: string + keyPrefix: + type: string + description: The prefix of the generated key. + example: "fes/output/" + oneOf: + - required: + - type + properties: + type: + pattern: "^GeneratedUuid" + tableName: + type: string + kafkaTopicName: + description: The kafka topic which events will be published to + type: string + kafkaBootstrapServers: + type: string + description: | + The kafka bootstrap servers. See https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#bootstrap-servers + oneOf: + - required: + - type + - queueName + properties: + type: + pattern: "^SqsStandard$" + - required: + - type + - queueName + properties: + type: + pattern: "^SqsFifo$" + - required: + - type + properties: + type: + pattern: "^Nakadi$" + - required: + - type + - bucket + - key + properties: + type: + pattern: "^S3" + - required: + - type + - tableName + properties: + type: + pattern: "^DynamoDb$" + - required: + - type + - kafkaTopicName + - kafkaBootstrapServers + properties: + type: + pattern: "^Kafka$" + oneOf: + - required: + - type + properties: + type: + pattern: "^None" + - required: + - type + properties: + type: + pattern: "^Ignore" + - required: + - type + - sink + properties: + type: + pattern: "^DeadLetter" + status: + type: object + x-kubernetes-preserve-unknown-fields: true + - name: v1 + served: true + storage: true + subresources: + status: {} + additionalPrinterColumns: + - name: ApplicationID + type: string + jsonPath: .spec.applicationId + - name: Age + type: date + jsonPath: .metadata.creationTimestamp + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + required: + - eventStreams + - applicationId + properties: + applicationId: + type: string + description: The application name that these streams belong to as registered in YourTurn + example: donut + eventStreams: + type: array + items: + type: object + required: + - source + - sink + properties: + uniqueStreamId: + type: string + description: | + A unique-per-namespace ID supplied by a human to use for the deployment name. + Used to help distinguish streams from each other in telemetry output. + Ignored if multiple streams share the same source specification + maxLength: 20 + source: + description: | + This section configures the source of the data which will be pushed onto Fabric Event Scheduler's work queue. + type: object + properties: + type: + type: string + jdbcConnection: + type: object + description: Represents a JDBC connection to a replication slot. + required: + - slotName + - jdbcUrl + - databaseAuthentication + properties: + slotName: + type: string + description: The name of an existing replication slot to connect to. + pattern: "^[0-9a-zA-Z_]+$" + maxLength: 63 + example: my_slot + pluginType: + type: string + description: Postgres WAL plugin. Defaults to wal2json initially, later migrating to pgoutput + example: wal2json + publicationName: + type: string + description: | + The name of the Publication to subscribe to when using `pgoutput` plugin type. + https://www.postgresql.org/docs/14/logical-replication-publication.html + The recommendation is to manually create the publication, tailored to your CDC needs. + If absent, FES will attempt to create this Publication (requires SUPERUSER permission). + pattern: "^[0-9a-zA-Z_]+$" + example: my_fes_publication + jdbcUrl: + type: string + description: JDBC url of the database to connect to. + example: jdbc:postgresql://host:port/database?sslmode=require + databaseAuthentication: + type: object + description: Represents a Kubernetes secret. + required: + - type + - name + - userKey + - passwordKey + properties: + type: + type: string + pattern: "^DatabaseAuthenticationSecret$" + name: + type: string + description: | + Name of the secret to extract the username and password from. + userKey: + type: string + description: | + The key in the secret which contains the username to be used for logical replication. + passwordKey: + type: string + description: | + The key in the secret which contains the password to be used for logical replication. + oneOf: + - required: + - type + - name + - userKey + - passwordKey + properties: + type: + pattern: "^DatabaseAuthenticationSecret$" + schema: + type: string + description: Name of the datbase schema to monitor. Defaults to `public` + example: public + table: + type: object + description: Which table should be monitored. Row updates to this table will be the data source. + required: + - name + properties: + name: + type: string + description: Table name. + example: mytable + idColumn: + type: string + description: The id column in the source table to use in telemetry. Defaults to `id` + example: id + subscription: + type: object + description: DynamoDb stream arn and lease table to use for a subscription + required: + - streamArn + - leaseTableName + properties: + leaseTableName: + type: string + description: | + The lease table used by the the KCL connector to keep track of shard positions in the DynamoDb Stream. + Defaults to the fes resource name if not specified + streamArn: + type: string + description: DynamoDb stream name to connect to + stsAssumeRoleArn: + type: string + description: | + ARN of a role to be assumed to access DynamoDB and DynamoDB Stream + (needed for a cross-account access) + arnResolutionMode: + type: string + description: | + EXACT (default) - requires the exact stream ARN to be specified + DYNAMIC - Allows a wildcard (*) at the end of the ARN to enable dynamic fetching of the stream ID based on the tableName. + default: "EXACT" + enum: + - "EXACT" + - "DYNAMIC" + nakadiSubscription: + type: object + description: Nakadi event-type to subscribe to + required: + - eventTypeName + - consumerGroup + - adminTeams + properties: + eventTypeName: + type: string + description: The event type to receive + consumerGroup: + type: string + description: What consumer group to use when creating or connecting to the stream + adminTeams: + type: array + description: Names of the teams that will be added as admin permissions on the subscription + items: + type: string + batchLimit: + type: integer + description: The maximum number of events to fetch in a single batch + example: "100" + maxUncommittedEvents: + type: integer + description: The maximum number of events that can be fetched but not yet committed + example: "1000" + readFrom: + description: By default, new subscription start reading the end, use this to change to begin + example: "end" + type: string + enum: + - "begin" + - "end" + filter: + description: JsonPath can be used to filter entities from the event stream. For example, to only emit events if an entity had its status set to active after the change OR if its status was active before the change. See example + example: + "[?(@.after.status=='active' || @.before.status=='active')]" + type: string + format: jsonpath + minReplicas: + description: Min replicas can be used to specify the number of replicas to use for the component to scale horizontally + example: "3" + type: integer + operations: + description: | + List of source operations that should be processed. + Default behaviour is to only skip DynamoDB REMOVE and Postgres DELETE operations. + example: ["INSERT", "UPDATE", "DELETE"] + type: array + items: + type: string + enum: + - "INSERT" + - "UPDATE" + - "MODIFY" + - "DELETE" + - "REMOVE" + - "C" + - "U" + - "D" + - "S" + oneOf: + - required: + - type + - jdbcConnection + - table + properties: + type: + pattern: "^PostgresLogicalReplication$" + - required: + - type + - subscription + properties: + type: + pattern: "^DynamoDbStreamsSubscription$" + - required: + - type + - nakadiSubscription + properties: + type: + pattern: "^NakadiStreamsSubscription$" + flow: + type: object + description: | + This section configures how the data from the source will be processed or transformed. + properties: + type: + type: string + callHomeUrl: + type: string + description: The value to put in the callHomeUri property. + callHomeIdColumn: + type: string + description: Name of column holding the String value to put in entityId property. + example: id + flowIdColumn: + description: Name of the flow id column in the source table if any. + type: string + spanCtxColumn: + description: Name of the span context column in the source table if any. + type: string + dataTypeColumn: + type: string + description: Name of the column holding the String value to put in the data_type property. + example: data_type + dataType: + type: string + description: Value of the data_type field in the Nakadi event. + example: change_data_capture + dataOpColumn: + type: string + description: Name of the column holding the String value to put in the data_op property. + example: data_op + metadataColumn: + type: string + description: Name of the column holding the Json payload to put in the metadata property. + example: metadata + dataColumn: + type: string + description: Name of the column holding the Json payload to put in the data property. + example: data + payloadColumn: + type: string + description: Name of the column holding the Json payload for the Nakadi event. Defaults to 'payload' + example: my_business_event + redisKeyPrefix: + type: string + description: The prefix to add to the constructed key value to be added to redis. Defaults to the empty string. + example: "prefix:" + redisKeyColumns: + type: array + items: + type: string + description: The names of the columns who's values will be concatenated together seperated by redisKeySeparator and prefixed by redisKeyPrefix + example: ["id", "sub_id"] + redisValueColumn: + type: string + description: Name of the column holding the value of the key that will be stored in redis. Defaults to 'payload' + example: value_column + redisKeySeparator: + type: string + description: The separator character used to seperated redisKeyColumns value(s) when constructing a key. Defaults to '.' + example: . + lambdaFunctionArn: + type: string + description: Name of the lambda function holding the data transformation logic from the Stream. + example: "arn:aws:lambda:eu-central-1:000000000000:function:convert-account-events" + kafkaValueColumn: + type: string + description: Name of the column holding value for the kafka sink. + example: data_col + kafkaKeyColumn: + type: string + description: Name of the column holding the key for the kafka sink. + example: key_col + kafkaTopicColumn: + type: string + description: Name of the column holding the topic, overrides value in the kafka sink if present. + example: topic_col + oneOf: + - required: + - type + - callHomeUrl + - callHomeIdColumn + properties: + type: + pattern: "^PostgresWalToApiCallHomeEvent$" + - required: + - type + properties: + type: + pattern: "^PostgresWalToNakadiDataEvent$" + - required: + - type + properties: + type: + pattern: "^PostgresWalToGenericNakadiEvent$|^DynamoDbStreamToGenericNakadiEvent$" + - required: + - type + properties: + type: + pattern: "^PostgresWalFromNakadiProducerToGenericNakadiEvent$" + - required: + - type + - lambdaFunctionArn + properties: + type: + pattern: "^AwsLambdaConversion$" + - required: + - type + properties: + type: + pattern: "^PostgresWalToNakadiCdcDataEvent$" + - required: + - type + - redisKeyColumns + properties: + type: + pattern: "^PostgresWalToRedisKvEvent$" + - required: + - type + - kafkaValueColumn + properties: + type: + pattern: "^PostgresWalToKafkaEvent$" + sink: + description: A sink belonging to an event stream. + type: object + properties: + type: + type: string + queueName: + type: string + queueUrl: + type: string + maxBatchSize: + type: integer + minimum: 1 + eventType: + type: string + description: The nakadi event type. + nakadiBaseUri: + type: string + description: The Nakadi base URI is defaulted per cluster environment. This property can be used to override it. + format: uri + example: https://nakadi-live.nakadi.zalan.do + partitionKeyFields: + type: array + description: | + The fields in the payload to be used by FES to partition the Events prior to publishing. + Builders should define the same fields here as in the event type definition. + If FES can locate non-null values for these fields, it will consistently partition events for parallel publishing. + The declared fields should consistently identify an entity, such that logical event ordering can be preserved. + If set incorrectly or null values are found, FES will not attempt partitioning and will publish in sequential batches. + Example: ["user_id", "order.order_id"] + items: + type: string + bucket: + type: string + description: The name of the S3 bucket. + key: + type: object + description: Defines how FES should derive the s3 key for each event. + properties: + type: + type: string + keyPrefix: + type: string + description: The prefix of the generated key. + example: "fes/output/" + oneOf: + - required: + - type + properties: + type: + pattern: "^GeneratedUuid" + tableName: + type: string + kafkaTopicName: + description: The kafka topic which events will be published to + type: string + kafkaBootstrapServers: + type: string + description: | + The kafka bootstrap servers. See https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#bootstrap-servers + redisClusterEnabled: + type: boolean + description: | + If the redis instances being connected to are a redis cluster, then this needs to specified as true. + Non clustered redis instances operating in a main node/replica configuration do not need to specify this property. + redisConnectionUrls: + type: array + description: | + The redis connection urls used to connect to the redis instance(s) to sink to. + Example: ["rediss://instance1.myredis-host.com:6379/0", "rediss://instance2.myredis-host.com:6379/0"] + items: + type: string + redisAuthentication: + type: object + description: Represents a Kubernetes secret. + required: + - type + - name + - passwordKey + - userKey + properties: + type: + type: string + pattern: "^Redis.*AuthenticationSecret$" + name: + type: string + description: | + Name of the secret to extract the username and password from. + userKey: + type: string + description: | + The key in the secret which contains the username to be used for redis ACL authentication. + (Not required when using RedisLegacyAuthenticationSecret) + passwordKey: + type: string + description: | + The key in the secret which contains the password to be used for redis authentication (ACL or legacy) + oneOf: + - required: + - type + - name + - passwordKey + properties: + type: + pattern: "^RedisLegacyAuthenticationSecret$" + - required: + - type + - name + - userKey + - passwordKey + properties: + type: + pattern: "^RedisACLAuthenticationSecret$" + oneOf: + - required: + - type + - queueName + properties: + type: + pattern: "^SqsStandard$" + - required: + - type + - queueName + properties: + type: + pattern: "^SqsFifo$" + - required: + - type + properties: + type: + pattern: "^Nakadi$" + - required: + - type + - bucket + - key + properties: + type: + pattern: "^S3" + - required: + - type + - tableName + properties: + type: + pattern: "^DynamoDb$" + - required: + - type + - redisConnectionUrls + properties: + type: + pattern: "^Redis$" + - required: + - type + - kafkaTopicName + - kafkaBootstrapServers + properties: + type: + pattern: "^Kafka$" + recovery: + type: object + description: This section configures recovery strategy for publishing errors. + properties: + type: + type: string + sink: + type: object + description: The dead letter sink. + properties: + type: + type: string + queueName: + type: string + queueUrl: + type: string + maxBatchSize: + type: integer + minimum: 1 + eventType: + type: string + description: The nakadi event type. + nakadiBaseUri: + type: string + description: The Nakadi base URI is defaulted per cluster environment. This property can be used to override it. + format: uri + example: https://nakadi-live.nakadi.zalan.do + partitionKeyFields: + type: array + description: | + The fields in the payload to be used by FES to partition the Events prior to publishing. + Builders should define the same fields here as in the event type definition. + If FES can locate non-null values for these fields, it will consistently partition events for parallel publishing. + The declared fields should consistently identify an entity, such that logical event ordering can be preserved. + If set incorrectly or null values are found, FES will not attempt partitioning and will publish in sequential batches. + Example: ["user_id", "order.order_id"] + items: + type: string + bucket: + type: string + description: The name of the S3 bucket. + key: + type: object + description: Defines how FES should derive the s3 key for each event. + properties: + type: + type: string + keyPrefix: + type: string + description: The prefix of the generated key. + example: "fes/output/" + oneOf: + - required: + - type + properties: + type: + pattern: "^GeneratedUuid" + tableName: + type: string + kafkaTopicName: + description: The kafka topic which events will be published to + type: string + kafkaBootstrapServers: + type: string + description: | + The kafka bootstrap servers. See https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#bootstrap-servers + oneOf: + - required: + - type + - queueName + properties: + type: + pattern: "^SqsStandard$" + - required: + - type + - queueName + properties: + type: + pattern: "^SqsFifo$" + - required: + - type + properties: + type: + pattern: "^Nakadi$" + - required: + - type + - bucket + - key + properties: + type: + pattern: "^S3" + - required: + - type + - tableName + properties: + type: + pattern: "^DynamoDb$" + - required: + - type + - kafkaTopicName + - kafkaBootstrapServers + properties: + type: + pattern: "^Kafka$" + oneOf: + - required: + - type + properties: + type: + pattern: "^None" + - required: + - type + properties: + type: + pattern: "^Ignore" + - required: + - type + - sink + properties: + type: + pattern: "^DeadLetter" + status: + type: object + x-kubernetes-preserve-unknown-fields: true diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 8f4f206d8..73dd326ec 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -101,8 +101,6 @@ func (c *Cluster) syncPublication(slots map[string]map[string]string, publicatio deletePublications := make(map[string]string) slotsToSync := make(map[string]map[string]string) - c.logger.Debug("is this even going here????") - defer func() { if err := c.closeDbConn(); err != nil { c.logger.Errorf("could not close database connection: %v", err) @@ -119,7 +117,6 @@ func (c *Cluster) syncPublication(slots map[string]map[string]string, publicatio return nil, fmt.Errorf("could not get current publications: %v", err) } - // gather list of required publications for _, slotName := range slotNames { tables := publications[slotName] tableNames := make([]string, len(publications[slotName])) @@ -361,7 +358,7 @@ func (c *Cluster) syncStreams() error { c.logger.Warningf("could not sync publications in database %q: %v", dbName, err) continue } - // if it's not exists in the slotsToSync, add it + // if does not exist in the slotsToSync, add it for slotName, slotSection := range slotsToSyncDb { if _, exists := slotsToSync[slotName]; !exists { slotsToSync[slotName] = slotSection @@ -444,7 +441,7 @@ func (c *Cluster) createOrUpdateStreams() error { // check if there is any deletion for _, stream := range streams.Items { if !util.SliceContains(appIds, stream.Spec.ApplicationId) { - c.logger.Infof("event streams with applicationId %s do not exist, create it", stream.Spec.ApplicationId) + c.logger.Infof("event streams with applicationId %s do not exist in the manifest, delete it", stream.Spec.ApplicationId) err := c.deleteStream(&stream) if err != nil { return fmt.Errorf("failed deleting event streams with applicationId %s: %v", stream.Spec.ApplicationId, err) From 73fcf09a5657385dc24c544ca7e052ac5f5a7ec4 Mon Sep 17 00:00:00 2001 From: idanovinda Date: Mon, 8 Jul 2024 13:10:19 +0200 Subject: [PATCH 06/27] list all existing database for iteration --- e2e/tests/test_e2e.py | 21 +++++++++++++++- manifests/operator-service-account-rbac.yaml | 26 ++++++++++---------- pkg/cluster/streams.go | 20 +++++++++++++++ 3 files changed, 53 insertions(+), 14 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index a66a9e3f0..113c89c8f 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -240,6 +240,7 @@ def test_aa_stream_resources(self): } k8s.api.custom_objects_api.patch_namespaced_custom_object( 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") # check if publication, slot, and fes resource are created get_publication_query = """ @@ -252,10 +253,28 @@ def test_aa_stream_resources(self): "Publication is not created", 10, 5) self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1, "Replication slot is not created", 10, 5) - + self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( + "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1, + "Could not find Fabric Event Stream resource", 10, 5) + # remove the streaming section from the manifest + patch_streaming_config_removal = { + "spec": { + "streams": [] + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") # check if publication, slot, and fes resource are removed + self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( + "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 0, + 'Could not delete Fabric Event Stream resource', 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0, + "Publication is not deleted", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0, + "Replication slot is not deleted", 10, 5) @timeout_decorator.timeout(TEST_TIMEOUT_SEC) diff --git a/manifests/operator-service-account-rbac.yaml b/manifests/operator-service-account-rbac.yaml index 97629ee95..08c676089 100644 --- a/manifests/operator-service-account-rbac.yaml +++ b/manifests/operator-service-account-rbac.yaml @@ -36,19 +36,19 @@ rules: - list - watch # all verbs allowed for event streams (Zalando-internal feature) -# - apiGroups: -# - zalando.org -# resources: -# - fabriceventstreams -# verbs: -# - create -# - delete -# - deletecollection -# - get -# - list -# - patch -# - update -# - watch +- apiGroups: + - zalando.org + resources: + - fabriceventstreams + verbs: + - create + - delete + - deletecollection + - get + - list + - patch + - update + - watch # to create or get/update CRDs when starting up - apiGroups: - apiextensions.k8s.io diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 73dd326ec..cb0eda13c 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -318,6 +318,26 @@ func (c *Cluster) syncStreams() error { slots = requiredPatroniConfig.Slots } + // list existing publications + if err := c.initDbConn(); err != nil { + return fmt.Errorf("could not init database connection") + } + defer func() { + if err := c.closeDbConn(); err != nil { + c.logger.Errorf("could not close database connection: %v", err) + } + }() + listDatabases, err := c.getDatabases() + if err != nil { + return fmt.Errorf("could not get list of databases: %v", err) + } + // get database name with empty list of slot, except template0 and template1 + for dbName, _ := range listDatabases { + if dbName != "template0" && dbName != "template1" { + databases[dbName] = []string{} + } + } + // gather list of required slots and publications, group by database for _, stream := range c.Spec.Streams { slot := map[string]string{ From 95889b43990849dcef96be73a0b1cba7171ad40b Mon Sep 17 00:00:00 2001 From: idanovinda Date: Mon, 8 Jul 2024 14:22:07 +0200 Subject: [PATCH 07/27] fix unittest --- pkg/cluster/streams_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 63c38311b..5045a66fe 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -466,7 +466,7 @@ func TestUpdateFabricEventStream(t *testing.T) { assert.NoError(t, err) cluster.Postgresql.Spec = pgUpdated.Spec - cluster.syncStreams() + cluster.createOrUpdateStreams() streamList, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) if len(streamList.Items) > 0 || err != nil { From 829a3fea210e4cee29c158a8caadc54e2d62fa63 Mon Sep 17 00:00:00 2001 From: idanovinda Date: Mon, 8 Jul 2024 15:18:28 +0200 Subject: [PATCH 08/27] remove debug message and change test name --- e2e/tests/test_e2e.py | 152 ++++++++++++++++++++--------------------- pkg/cluster/streams.go | 2 - 2 files changed, 76 insertions(+), 78 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 113c89c8f..b661e3f17 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -200,82 +200,6 @@ def test_additional_owner_roles(self): self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", owner_query)), 3, "Not all additional users found in database", 10, 5) - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_aa_stream_resources(self): - ''' - Create a Postgres cluster with streaming resources and check them. - ''' - k8s = self.k8s - - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, - "Operator does not get in sync") - leader = k8s.get_cluster_leader_pod() - - # create a table in one of the database of acid-minimal-cluster - create_stream_table = """ - CREATE TABLE test_table (id int, payload jsonb); - """ - self.query_database(leader.metadata.name, "foo", create_stream_table) - - # update the manifest with the streaming section - patch_streaming_config = { - "spec": { - "streams": [ - { - "applicationId": "test-app", - "batchSize": 100, - "database": "foo", - "enableRecovery": True, - "tables": { - "test_table": { - "eventType": "test-event", - "idColumn": "id", - "payloadColumn": "payload", - "recoveryEventType": "test-event-dlq" - } - } - } - ] - } - } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - - # check if publication, slot, and fes resource are created - get_publication_query = """ - SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app'; - """ - get_slot_query = """ - SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app'; - """ - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1, - "Publication is not created", 10, 5) - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1, - "Replication slot is not created", 10, 5) - self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( - "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1, - "Could not find Fabric Event Stream resource", 10, 5) - - # remove the streaming section from the manifest - patch_streaming_config_removal = { - "spec": { - "streams": [] - } - } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - - # check if publication, slot, and fes resource are removed - self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( - "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 0, - 'Could not delete Fabric Event Stream resource', 10, 5) - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0, - "Publication is not deleted", 10, 5) - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0, - "Replication slot is not deleted", 10, 5) - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_additional_pod_capabilities(self): @@ -2067,6 +1991,82 @@ def test_standby_cluster(self): "acid.zalan.do", "v1", "default", "postgresqls", "acid-standby-cluster") time.sleep(5) + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_stream_resources(self): + ''' + Create and delete fabric event streaming resources. + ''' + k8s = self.k8s + + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, + "Operator does not get in sync") + leader = k8s.get_cluster_leader_pod() + + # create a table in one of the database of acid-minimal-cluster + create_stream_table = """ + CREATE TABLE test_table (id int, payload jsonb); + """ + self.query_database(leader.metadata.name, "foo", create_stream_table) + + # update the manifest with the streaming section + patch_streaming_config = { + "spec": { + "streams": [ + { + "applicationId": "test-app", + "batchSize": 100, + "database": "foo", + "enableRecovery": True, + "tables": { + "test_table": { + "eventType": "test-event", + "idColumn": "id", + "payloadColumn": "payload", + "recoveryEventType": "test-event-dlq" + } + } + } + ] + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + # check if publication, slot, and fes resource are created + get_publication_query = """ + SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app'; + """ + get_slot_query = """ + SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app'; + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1, + "Publication is not created", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1, + "Replication slot is not created", 10, 5) + self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( + "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1, + "Could not find Fabric Event Stream resource", 10, 5) + + # remove the streaming section from the manifest + patch_streaming_config_removal = { + "spec": { + "streams": [] + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + # check if publication, slot, and fes resource are removed + self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( + "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 0, + 'Could not delete Fabric Event Stream resource', 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0, + "Publication is not deleted", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0, + "Replication slot is not deleted", 10, 5) + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_taint_based_eviction(self): ''' diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index cb0eda13c..dd3e35240 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -371,9 +371,7 @@ func (c *Cluster) syncStreams() error { // sync publication in a database c.logger.Debug("syncing database publications") for dbName, slotNames := range databases { - c.logger.Debug(dbName, slotNames) slotsToSyncDb, err := c.syncPublication(slots, publications, dbName, slotNames) - c.logger.Debug(slotsToSyncDb, err) if err != nil { c.logger.Warningf("could not sync publications in database %q: %v", dbName, err) continue From 5cb67d13538212bcf03c0af7fd4f556e42d078db Mon Sep 17 00:00:00 2001 From: idanovinda Date: Mon, 8 Jul 2024 16:08:01 +0200 Subject: [PATCH 09/27] use minimal fes crd and 1 more secret --- e2e/tests/test_e2e.py | 2 +- manifests/fes.crd.yaml | 1289 ---------------------------------------- 2 files changed, 1 insertion(+), 1290 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index b661e3f17..b3fe8ba66 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2193,7 +2193,7 @@ def test_zz_cluster_deletion(self): self.eventuallyEqual(lambda: k8s.count_statefulsets_with_label(cluster_label), 0, "Statefulset not deleted") self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted") self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted") - self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 7, "Secrets were deleted although disabled in config") + self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config") self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config") except timeout_decorator.TimeoutError: diff --git a/manifests/fes.crd.yaml b/manifests/fes.crd.yaml index 58450aa79..d887d34f6 100644 --- a/manifests/fes.crd.yaml +++ b/manifests/fes.crd.yaml @@ -14,1298 +14,9 @@ spec: categories: - all scope: Namespaced - conversion: - strategy: None versions: - - name: v1alpha1 - served: true - storage: false - subresources: - status: {} - additionalPrinterColumns: - - name: ApplicationID - type: string - jsonPath: .spec.applicationId - - name: Age - type: date - jsonPath: .metadata.creationTimestamp - schema: - openAPIV3Schema: - type: object - properties: - spec: - type: object - required: - - eventStreams - - applicationId - properties: - applicationId: - type: string - description: The application name that these streams belong to as registered in YourTurn - example: donut - eventStreams: - type: array - items: - type: object - required: - - source - - sink - properties: - uniqueStreamId: - type: string - description: | - A unique-per-namespace ID supplied by a human to use for the deployment name. - Used to help distinguish streams from each other in telemetry output. - Ignored if multiple streams share the same source specification - maxLength: 20 - source: - description: | - This section configures the source of the data which will be pushed onto Fabric Event Scheduler's work queue. - type: object - properties: - type: - type: string - jdbcConnection: - type: object - description: Represents a JDBC connection to a replication slot. - required: - - slotName - - jdbcUrl - - databaseAuthentication - properties: - slotName: - type: string - description: The name of an existing replication slot to connect to. - pattern: "^[0-9a-zA-Z_]+$" - maxLength: 63 - example: my_slot - pluginType: - type: string - description: Postgres WAL plugin. Defaults to wal2json initially, later migrating to pgoutput - example: wal2json - publicationName: - type: string - description: | - The name of the Publication to subscribe to when using `pgoutput` plugin type. - https://www.postgresql.org/docs/14/logical-replication-publication.html - The recommendation is to manually create the publication, tailored to your CDC needs. - If absent, FES will attempt to create this Publication (requires SUPERUSER permission). - pattern: "^[0-9a-zA-Z_]+$" - example: my_fes_publication - jdbcUrl: - type: string - description: JDBC url of the database to connect to. - example: jdbc:postgresql://host:port/database?sslmode=require - databaseAuthentication: - type: object - description: Represents a Kubernetes secret. - required: - - type - - name - - userKey - - passwordKey - properties: - type: - type: string - pattern: "^DatabaseAuthenticationSecret$" - name: - type: string - description: | - Name of the secret to extract the username and password from. - userKey: - type: string - description: | - The key in the secret which contains the username to be used for logical replication. - passwordKey: - type: string - description: | - The key in the secret which contains the password to be used for logical replication. - oneOf: - - required: - - type - - name - - userKey - - passwordKey - properties: - type: - pattern: "^DatabaseAuthenticationSecret$" - schema: - type: string - description: Name of the datbase schema to monitor. Defaults to `public` - example: public - table: - type: object - description: Which table should be monitored. Row updates to this table will be the data source. - required: - - name - properties: - name: - type: string - description: Table name. - example: mytable - idColumn: - type: string - description: The id column in the source table to use in telemetry. Defaults to `id` - example: id - subscription: - type: object - description: DynamoDb stream arn and lease table to use for a subscription - required: - - streamArn - - leaseTableName - properties: - leaseTableName: - type: string - description: | - The lease table used by the the KCL connector to keep track of shard positions in the DynamoDb Stream. - Defaults to the fes resource name if not specified - streamArn: - type: string - description: DynamoDb stream name to connect to - stsAssumeRoleArn: - type: string - description: | - ARN of a role to be assumed to access DynamoDB and DynamoDB Stream - (needed for a cross-account access) - arnResolutionMode: - type: string - description: | - EXACT (default) - requires the exact stream ARN to be specified - DYNAMIC - Allows a wildcard (*) at the end of the ARN to enable dynamic fetching of the stream ID based on the tableName. - default: "EXACT" - enum: - - "EXACT" - - "DYNAMIC" - nakadiSubscription: - type: object - description: Description of the stream used to receive events from Nakadi - required: - - eventTypeName - - consumerGroup - - adminTeams - properties: - eventTypeName: - type: string - description: The event type to receive - consumerGroup: - type: string - description: What consumer group to use when creating or connecting to the stream - adminTeams: - type: array - description: Names of the teams that will be added as admin permissions on the subscription - items: - type: string - batchLimit: - type: integer - description: The maximum number of events to fetch in a single batch - example: "100" - maxUncommittedEvents: - type: integer - description: The maximum number of events that can be fetched but not yet committed - example: "1000" - readFrom: - description: By default, new subscription start reading the end, use this to change to begin - example: "end" - type: string - enum: - - "begin" - - "end" - filter: - description: JsonPath can be used to filter entities from the event stream. For example, to only emit events if an entity had its status set to active after the change OR if its status was active before the change. See example - example: - "[?(@.after.status=='active' || @.before.status=='active')]" - type: string - format: jsonpath - minReplicas: - description: Min replicas can be used to specify the number of replicas to use for the component to scale horizontally - example: "3" - type: integer - operations: - description: | - List of source operations that should be processed. - Default behaviour is to only skip DynamoDB REMOVE and Postgres DELETE operations. - example: ["INSERT", "UPDATE", "DELETE"] - type: array - items: - type: string - enum: - - "INSERT" - - "UPDATE" - - "MODIFY" - - "DELETE" - - "REMOVE" - - "C" - - "U" - - "D" - - "S" - oneOf: - - required: - - type - - jdbcConnection - - table - properties: - type: - pattern: "^PostgresLogicalReplication$" - - required: - - type - - subscription - properties: - type: - pattern: "^DynamoDbStreamsSubscription$" - - required: - - type - - nakadiSubscription - properties: - type: - pattern: "^NakadiStreamsSubscription$" - flow: - type: object - description: | - This section configures how the data from the source will be processed or transformed. - properties: - type: - type: string - callHomeUrl: - type: string - description: The value to put in the callHomeUri property. - callHomeIdColumn: - type: string - description: Name of column holding the String value to put in entityId property. - example: id - flowIdColumn: - description: Name of the flow id column in the source table if any. - type: string - spanCtxColumn: - description: Name of the span context column in the source table if any. - type: string - dataTypeColumn: - type: string - description: Name of the column holding the String value to put in the data_type property. - example: data_type - dataType: - type: string - description: Value of the data_type field in the Nakadi event. - example: change_data_capture - dataOpColumn: - type: string - description: Name of the column holding the String value to put in the data_op property. - example: data_op - metadataColumn: - type: string - description: Name of the column holding the Json payload to put in the metadata property. - example: metadata - dataColumn: - type: string - description: Name of the column holding the Json payload to put in the data property. - example: data - payloadColumn: - type: string - description: Name of the column holding the Json payload for the Nakadi event. Defaults to 'payload' - example: my_business_event - redisKeyPrefix: - type: string - description: The prefix to add to the constructed key value to be added to redis. Defaults to the empty string. - example: "prefix:" - redisKeyColumns: - type: array - items: - type: string - description: The names of the columns who's values will be concatenated together seperated by redisKeySeparator and prefixed by redisKeyPrefix - example: ["id", "sub_id"] - redisValueColumn: - type: string - description: Name of the column holding the value of the key that will be stored in redis. Defaults to 'payload' - example: value_column - redisKeySeparator: - type: string - description: The separator character used to seperated redisKeyColumns value(s) when constructing a key. Defaults to '.' - example: . - lambdaFunctionArn: - type: string - description: Name of the lambda function holding the data transformation logic from the Stream. - example: "arn:aws:lambda:eu-central-1:000000000000:function:convert-account-events" - kafkaValueColumn: - type: string - description: Name of the column holding value for the kafka sink. - example: data_col - kafkaKeyColumn: - type: string - description: Name of the column holding the key for the kafka sink. - example: key_col - kafkaTopicColumn: - type: string - description: Name of the column holding the topic, overrides value in the kafka sink if present. - example: topic_col - oneOf: - - required: - - type - - callHomeUrl - - callHomeIdColumn - properties: - type: - pattern: "^PostgresWalToApiCallHomeEvent$" - - required: - - type - properties: - type: - pattern: "^PostgresWalToNakadiDataEvent$" - - required: - - type - properties: - type: - pattern: "^PostgresWalToGenericNakadiEvent$|^DynamoDbStreamToGenericNakadiEvent$" - - required: - - type - properties: - type: - pattern: "^PostgresWalFromNakadiProducerToGenericNakadiEvent$" - - required: - - type - - lambdaFunctionArn - properties: - type: - pattern: "^AwsLambdaConversion$" - - required: - - type - properties: - type: - pattern: "^PostgresWalToNakadiCdcDataEvent$" - - required: - - type - - redisKeyColumns - properties: - type: - pattern: "^PostgresWalToRedisKvEvent$" - - required: - - type - - kafkaValueColumn - properties: - type: - pattern: "^PostgresWalToKafkaEvent$" - sink: - description: A sink belonging to an event stream. - type: object - properties: - type: - type: string - queueName: - type: string - queueUrl: - type: string - maxBatchSize: - type: integer - minimum: 1 - eventType: - type: string - description: The nakadi event type. - nakadiBaseUri: - type: string - description: The Nakadi base URI is defaulted per cluster environment. This property can be used to override it. - format: uri - example: https://nakadi-live.nakadi.zalan.do - partitionKeyFields: - type: array - description: | - The fields in the payload to be used by FES to partition the Events prior to publishing. - Builders should define the same fields here as in the event type definition. - If FES can locate non-null values for these fields, it will consistently partition events for parallel publishing. - The declared fields should consistently identify an entity, such that logical event ordering can be preserved. - If set incorrectly or null values are found, FES will not attempt partitioning and will publish in sequential batches. - Example: ["user_id", "order.order_id"] - items: - type: string - bucket: - type: string - description: The name of the S3 bucket. - key: - type: object - description: Defines how FES should derive the s3 key for each event. - properties: - type: - type: string - keyPrefix: - type: string - description: The prefix of the generated key. - example: "fes/output/" - oneOf: - - required: - - type - properties: - type: - pattern: "^GeneratedUuid" - tableName: - type: string - kafkaTopicName: - description: The kafka topic which events will be published to - type: string - kafkaBootstrapServers: - type: string - description: | - The kafka bootstrap servers. See https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#bootstrap-servers - redisClusterEnabled: - type: boolean - description: | - If the redis instances being connected to are a redis cluster, then this needs to specified as true. - Non clustered redis instances operating in a main node/replica configuration do not need to specify this property. - redisConnectionUrls: - type: array - description: | - The redis connection urls used to connect to the redis instance(s) to sink to. - Example: ["rediss://instance1.myredis-host.com:6379/0", "rediss://instance2.myredis-host.com:6379/0"] - items: - type: string - redisAuthentication: - type: object - description: Represents a Kubernetes secret. - required: - - type - - name - - passwordKey - - userKey - properties: - type: - type: string - pattern: "^Redis.*AuthenticationSecret$" - name: - type: string - description: | - Name of the secret to extract the username and password from. - userKey: - type: string - description: | - The key in the secret which contains the username to be used for redis ACL authentication. - (Not required when using RedisLegacyAuthenticationSecret) - passwordKey: - type: string - description: | - The key in the secret which contains the password to be used for redis authentication (ACL or legacy) - oneOf: - - required: - - type - - name - - passwordKey - properties: - type: - pattern: "^RedisLegacyAuthenticationSecret$" - - required: - - type - - name - - userKey - - passwordKey - properties: - type: - pattern: "^RedisACLAuthenticationSecret$" - oneOf: - - required: - - type - - queueName - properties: - type: - pattern: "^SqsStandard$" - - required: - - type - - queueName - properties: - type: - pattern: "^SqsFifo$" - - required: - - type - properties: - type: - pattern: "^Nakadi$" - - required: - - type - - bucket - - key - properties: - type: - pattern: "^S3" - - required: - - type - - tableName - properties: - type: - pattern: "^DynamoDb$" - - required: - - type - - redisConnectionUrls - properties: - type: - pattern: "^Redis$" - - required: - - type - - kafkaTopicName - - kafkaBootstrapServers - properties: - type: - pattern: "^Kafka$" - recovery: - type: object - description: This section configures recovery strategy for publishing errors. - properties: - type: - type: string - sink: - type: object - description: The dead letter sink. - properties: - type: - type: string - queueName: - type: string - queueUrl: - type: string - maxBatchSize: - type: integer - minimum: 1 - eventType: - type: string - description: The nakadi event type. - nakadiBaseUri: - type: string - description: The Nakadi base URI is defaulted per cluster environment. This property can be used to override it. - format: uri - example: https://nakadi-live.nakadi.zalan.do - partitionKeyFields: - type: array - description: | - The fields in the payload to be used by FES to partition the Events prior to publishing. - Builders should define the same fields here as in the event type definition. - If FES can locate non-null values for these fields, it will consistently partition events for parallel publishing. - The declared fields should consistently identify an entity, such that logical event ordering can be preserved. - If set incorrectly or null values are found, FES will not attempt partitioning and will publish in sequential batches. - Example: ["user_id", "order.order_id"] - items: - type: string - bucket: - type: string - description: The name of the S3 bucket. - key: - type: object - description: Defines how FES should derive the s3 key for each event. - properties: - type: - type: string - keyPrefix: - type: string - description: The prefix of the generated key. - example: "fes/output/" - oneOf: - - required: - - type - properties: - type: - pattern: "^GeneratedUuid" - tableName: - type: string - kafkaTopicName: - description: The kafka topic which events will be published to - type: string - kafkaBootstrapServers: - type: string - description: | - The kafka bootstrap servers. See https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#bootstrap-servers - oneOf: - - required: - - type - - queueName - properties: - type: - pattern: "^SqsStandard$" - - required: - - type - - queueName - properties: - type: - pattern: "^SqsFifo$" - - required: - - type - properties: - type: - pattern: "^Nakadi$" - - required: - - type - - bucket - - key - properties: - type: - pattern: "^S3" - - required: - - type - - tableName - properties: - type: - pattern: "^DynamoDb$" - - required: - - type - - kafkaTopicName - - kafkaBootstrapServers - properties: - type: - pattern: "^Kafka$" - oneOf: - - required: - - type - properties: - type: - pattern: "^None" - - required: - - type - properties: - type: - pattern: "^Ignore" - - required: - - type - - sink - properties: - type: - pattern: "^DeadLetter" - status: - type: object - x-kubernetes-preserve-unknown-fields: true - name: v1 - served: true storage: true - subresources: - status: {} - additionalPrinterColumns: - - name: ApplicationID - type: string - jsonPath: .spec.applicationId - - name: Age - type: date - jsonPath: .metadata.creationTimestamp schema: openAPIV3Schema: type: object - properties: - spec: - type: object - required: - - eventStreams - - applicationId - properties: - applicationId: - type: string - description: The application name that these streams belong to as registered in YourTurn - example: donut - eventStreams: - type: array - items: - type: object - required: - - source - - sink - properties: - uniqueStreamId: - type: string - description: | - A unique-per-namespace ID supplied by a human to use for the deployment name. - Used to help distinguish streams from each other in telemetry output. - Ignored if multiple streams share the same source specification - maxLength: 20 - source: - description: | - This section configures the source of the data which will be pushed onto Fabric Event Scheduler's work queue. - type: object - properties: - type: - type: string - jdbcConnection: - type: object - description: Represents a JDBC connection to a replication slot. - required: - - slotName - - jdbcUrl - - databaseAuthentication - properties: - slotName: - type: string - description: The name of an existing replication slot to connect to. - pattern: "^[0-9a-zA-Z_]+$" - maxLength: 63 - example: my_slot - pluginType: - type: string - description: Postgres WAL plugin. Defaults to wal2json initially, later migrating to pgoutput - example: wal2json - publicationName: - type: string - description: | - The name of the Publication to subscribe to when using `pgoutput` plugin type. - https://www.postgresql.org/docs/14/logical-replication-publication.html - The recommendation is to manually create the publication, tailored to your CDC needs. - If absent, FES will attempt to create this Publication (requires SUPERUSER permission). - pattern: "^[0-9a-zA-Z_]+$" - example: my_fes_publication - jdbcUrl: - type: string - description: JDBC url of the database to connect to. - example: jdbc:postgresql://host:port/database?sslmode=require - databaseAuthentication: - type: object - description: Represents a Kubernetes secret. - required: - - type - - name - - userKey - - passwordKey - properties: - type: - type: string - pattern: "^DatabaseAuthenticationSecret$" - name: - type: string - description: | - Name of the secret to extract the username and password from. - userKey: - type: string - description: | - The key in the secret which contains the username to be used for logical replication. - passwordKey: - type: string - description: | - The key in the secret which contains the password to be used for logical replication. - oneOf: - - required: - - type - - name - - userKey - - passwordKey - properties: - type: - pattern: "^DatabaseAuthenticationSecret$" - schema: - type: string - description: Name of the datbase schema to monitor. Defaults to `public` - example: public - table: - type: object - description: Which table should be monitored. Row updates to this table will be the data source. - required: - - name - properties: - name: - type: string - description: Table name. - example: mytable - idColumn: - type: string - description: The id column in the source table to use in telemetry. Defaults to `id` - example: id - subscription: - type: object - description: DynamoDb stream arn and lease table to use for a subscription - required: - - streamArn - - leaseTableName - properties: - leaseTableName: - type: string - description: | - The lease table used by the the KCL connector to keep track of shard positions in the DynamoDb Stream. - Defaults to the fes resource name if not specified - streamArn: - type: string - description: DynamoDb stream name to connect to - stsAssumeRoleArn: - type: string - description: | - ARN of a role to be assumed to access DynamoDB and DynamoDB Stream - (needed for a cross-account access) - arnResolutionMode: - type: string - description: | - EXACT (default) - requires the exact stream ARN to be specified - DYNAMIC - Allows a wildcard (*) at the end of the ARN to enable dynamic fetching of the stream ID based on the tableName. - default: "EXACT" - enum: - - "EXACT" - - "DYNAMIC" - nakadiSubscription: - type: object - description: Nakadi event-type to subscribe to - required: - - eventTypeName - - consumerGroup - - adminTeams - properties: - eventTypeName: - type: string - description: The event type to receive - consumerGroup: - type: string - description: What consumer group to use when creating or connecting to the stream - adminTeams: - type: array - description: Names of the teams that will be added as admin permissions on the subscription - items: - type: string - batchLimit: - type: integer - description: The maximum number of events to fetch in a single batch - example: "100" - maxUncommittedEvents: - type: integer - description: The maximum number of events that can be fetched but not yet committed - example: "1000" - readFrom: - description: By default, new subscription start reading the end, use this to change to begin - example: "end" - type: string - enum: - - "begin" - - "end" - filter: - description: JsonPath can be used to filter entities from the event stream. For example, to only emit events if an entity had its status set to active after the change OR if its status was active before the change. See example - example: - "[?(@.after.status=='active' || @.before.status=='active')]" - type: string - format: jsonpath - minReplicas: - description: Min replicas can be used to specify the number of replicas to use for the component to scale horizontally - example: "3" - type: integer - operations: - description: | - List of source operations that should be processed. - Default behaviour is to only skip DynamoDB REMOVE and Postgres DELETE operations. - example: ["INSERT", "UPDATE", "DELETE"] - type: array - items: - type: string - enum: - - "INSERT" - - "UPDATE" - - "MODIFY" - - "DELETE" - - "REMOVE" - - "C" - - "U" - - "D" - - "S" - oneOf: - - required: - - type - - jdbcConnection - - table - properties: - type: - pattern: "^PostgresLogicalReplication$" - - required: - - type - - subscription - properties: - type: - pattern: "^DynamoDbStreamsSubscription$" - - required: - - type - - nakadiSubscription - properties: - type: - pattern: "^NakadiStreamsSubscription$" - flow: - type: object - description: | - This section configures how the data from the source will be processed or transformed. - properties: - type: - type: string - callHomeUrl: - type: string - description: The value to put in the callHomeUri property. - callHomeIdColumn: - type: string - description: Name of column holding the String value to put in entityId property. - example: id - flowIdColumn: - description: Name of the flow id column in the source table if any. - type: string - spanCtxColumn: - description: Name of the span context column in the source table if any. - type: string - dataTypeColumn: - type: string - description: Name of the column holding the String value to put in the data_type property. - example: data_type - dataType: - type: string - description: Value of the data_type field in the Nakadi event. - example: change_data_capture - dataOpColumn: - type: string - description: Name of the column holding the String value to put in the data_op property. - example: data_op - metadataColumn: - type: string - description: Name of the column holding the Json payload to put in the metadata property. - example: metadata - dataColumn: - type: string - description: Name of the column holding the Json payload to put in the data property. - example: data - payloadColumn: - type: string - description: Name of the column holding the Json payload for the Nakadi event. Defaults to 'payload' - example: my_business_event - redisKeyPrefix: - type: string - description: The prefix to add to the constructed key value to be added to redis. Defaults to the empty string. - example: "prefix:" - redisKeyColumns: - type: array - items: - type: string - description: The names of the columns who's values will be concatenated together seperated by redisKeySeparator and prefixed by redisKeyPrefix - example: ["id", "sub_id"] - redisValueColumn: - type: string - description: Name of the column holding the value of the key that will be stored in redis. Defaults to 'payload' - example: value_column - redisKeySeparator: - type: string - description: The separator character used to seperated redisKeyColumns value(s) when constructing a key. Defaults to '.' - example: . - lambdaFunctionArn: - type: string - description: Name of the lambda function holding the data transformation logic from the Stream. - example: "arn:aws:lambda:eu-central-1:000000000000:function:convert-account-events" - kafkaValueColumn: - type: string - description: Name of the column holding value for the kafka sink. - example: data_col - kafkaKeyColumn: - type: string - description: Name of the column holding the key for the kafka sink. - example: key_col - kafkaTopicColumn: - type: string - description: Name of the column holding the topic, overrides value in the kafka sink if present. - example: topic_col - oneOf: - - required: - - type - - callHomeUrl - - callHomeIdColumn - properties: - type: - pattern: "^PostgresWalToApiCallHomeEvent$" - - required: - - type - properties: - type: - pattern: "^PostgresWalToNakadiDataEvent$" - - required: - - type - properties: - type: - pattern: "^PostgresWalToGenericNakadiEvent$|^DynamoDbStreamToGenericNakadiEvent$" - - required: - - type - properties: - type: - pattern: "^PostgresWalFromNakadiProducerToGenericNakadiEvent$" - - required: - - type - - lambdaFunctionArn - properties: - type: - pattern: "^AwsLambdaConversion$" - - required: - - type - properties: - type: - pattern: "^PostgresWalToNakadiCdcDataEvent$" - - required: - - type - - redisKeyColumns - properties: - type: - pattern: "^PostgresWalToRedisKvEvent$" - - required: - - type - - kafkaValueColumn - properties: - type: - pattern: "^PostgresWalToKafkaEvent$" - sink: - description: A sink belonging to an event stream. - type: object - properties: - type: - type: string - queueName: - type: string - queueUrl: - type: string - maxBatchSize: - type: integer - minimum: 1 - eventType: - type: string - description: The nakadi event type. - nakadiBaseUri: - type: string - description: The Nakadi base URI is defaulted per cluster environment. This property can be used to override it. - format: uri - example: https://nakadi-live.nakadi.zalan.do - partitionKeyFields: - type: array - description: | - The fields in the payload to be used by FES to partition the Events prior to publishing. - Builders should define the same fields here as in the event type definition. - If FES can locate non-null values for these fields, it will consistently partition events for parallel publishing. - The declared fields should consistently identify an entity, such that logical event ordering can be preserved. - If set incorrectly or null values are found, FES will not attempt partitioning and will publish in sequential batches. - Example: ["user_id", "order.order_id"] - items: - type: string - bucket: - type: string - description: The name of the S3 bucket. - key: - type: object - description: Defines how FES should derive the s3 key for each event. - properties: - type: - type: string - keyPrefix: - type: string - description: The prefix of the generated key. - example: "fes/output/" - oneOf: - - required: - - type - properties: - type: - pattern: "^GeneratedUuid" - tableName: - type: string - kafkaTopicName: - description: The kafka topic which events will be published to - type: string - kafkaBootstrapServers: - type: string - description: | - The kafka bootstrap servers. See https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#bootstrap-servers - redisClusterEnabled: - type: boolean - description: | - If the redis instances being connected to are a redis cluster, then this needs to specified as true. - Non clustered redis instances operating in a main node/replica configuration do not need to specify this property. - redisConnectionUrls: - type: array - description: | - The redis connection urls used to connect to the redis instance(s) to sink to. - Example: ["rediss://instance1.myredis-host.com:6379/0", "rediss://instance2.myredis-host.com:6379/0"] - items: - type: string - redisAuthentication: - type: object - description: Represents a Kubernetes secret. - required: - - type - - name - - passwordKey - - userKey - properties: - type: - type: string - pattern: "^Redis.*AuthenticationSecret$" - name: - type: string - description: | - Name of the secret to extract the username and password from. - userKey: - type: string - description: | - The key in the secret which contains the username to be used for redis ACL authentication. - (Not required when using RedisLegacyAuthenticationSecret) - passwordKey: - type: string - description: | - The key in the secret which contains the password to be used for redis authentication (ACL or legacy) - oneOf: - - required: - - type - - name - - passwordKey - properties: - type: - pattern: "^RedisLegacyAuthenticationSecret$" - - required: - - type - - name - - userKey - - passwordKey - properties: - type: - pattern: "^RedisACLAuthenticationSecret$" - oneOf: - - required: - - type - - queueName - properties: - type: - pattern: "^SqsStandard$" - - required: - - type - - queueName - properties: - type: - pattern: "^SqsFifo$" - - required: - - type - properties: - type: - pattern: "^Nakadi$" - - required: - - type - - bucket - - key - properties: - type: - pattern: "^S3" - - required: - - type - - tableName - properties: - type: - pattern: "^DynamoDb$" - - required: - - type - - redisConnectionUrls - properties: - type: - pattern: "^Redis$" - - required: - - type - - kafkaTopicName - - kafkaBootstrapServers - properties: - type: - pattern: "^Kafka$" - recovery: - type: object - description: This section configures recovery strategy for publishing errors. - properties: - type: - type: string - sink: - type: object - description: The dead letter sink. - properties: - type: - type: string - queueName: - type: string - queueUrl: - type: string - maxBatchSize: - type: integer - minimum: 1 - eventType: - type: string - description: The nakadi event type. - nakadiBaseUri: - type: string - description: The Nakadi base URI is defaulted per cluster environment. This property can be used to override it. - format: uri - example: https://nakadi-live.nakadi.zalan.do - partitionKeyFields: - type: array - description: | - The fields in the payload to be used by FES to partition the Events prior to publishing. - Builders should define the same fields here as in the event type definition. - If FES can locate non-null values for these fields, it will consistently partition events for parallel publishing. - The declared fields should consistently identify an entity, such that logical event ordering can be preserved. - If set incorrectly or null values are found, FES will not attempt partitioning and will publish in sequential batches. - Example: ["user_id", "order.order_id"] - items: - type: string - bucket: - type: string - description: The name of the S3 bucket. - key: - type: object - description: Defines how FES should derive the s3 key for each event. - properties: - type: - type: string - keyPrefix: - type: string - description: The prefix of the generated key. - example: "fes/output/" - oneOf: - - required: - - type - properties: - type: - pattern: "^GeneratedUuid" - tableName: - type: string - kafkaTopicName: - description: The kafka topic which events will be published to - type: string - kafkaBootstrapServers: - type: string - description: | - The kafka bootstrap servers. See https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#bootstrap-servers - oneOf: - - required: - - type - - queueName - properties: - type: - pattern: "^SqsStandard$" - - required: - - type - - queueName - properties: - type: - pattern: "^SqsFifo$" - - required: - - type - properties: - type: - pattern: "^Nakadi$" - - required: - - type - - bucket - - key - properties: - type: - pattern: "^S3" - - required: - - type - - tableName - properties: - type: - pattern: "^DynamoDb$" - - required: - - type - - kafkaTopicName - - kafkaBootstrapServers - properties: - type: - pattern: "^Kafka$" - oneOf: - - required: - - type - properties: - type: - pattern: "^None" - - required: - - type - properties: - type: - pattern: "^Ignore" - - required: - - type - - sink - properties: - type: - pattern: "^DeadLetter" - status: - type: object - x-kubernetes-preserve-unknown-fields: true From b64c39e3c4bda66f4c64c9b4dee1e8135ab1aac3 Mon Sep 17 00:00:00 2001 From: idanovinda Date: Mon, 8 Jul 2024 17:28:30 +0200 Subject: [PATCH 10/27] modify served section in fes crd --- manifests/fes.crd.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/manifests/fes.crd.yaml b/manifests/fes.crd.yaml index d887d34f6..70a8c9555 100644 --- a/manifests/fes.crd.yaml +++ b/manifests/fes.crd.yaml @@ -16,6 +16,7 @@ spec: scope: Namespaced versions: - name: v1 + served: true storage: true schema: openAPIV3Schema: From 3d6026037a80032831209cd1be07fbfd25345031 Mon Sep 17 00:00:00 2001 From: idanovinda Date: Thu, 11 Jul 2024 14:56:01 +0200 Subject: [PATCH 11/27] apply feedback * refactor syncStreams() by making few variables as a struct * filtering publication to just get the ones created by postgres user --- pkg/apis/zalando.org/v1/fabriceventstream.go | 6 ++ pkg/cluster/database.go | 1 + pkg/cluster/streams.go | 87 +++++++++----------- 3 files changed, 46 insertions(+), 48 deletions(-) diff --git a/pkg/apis/zalando.org/v1/fabriceventstream.go b/pkg/apis/zalando.org/v1/fabriceventstream.go index 609f3c9bc..41bb5e80c 100644 --- a/pkg/apis/zalando.org/v1/fabriceventstream.go +++ b/pkg/apis/zalando.org/v1/fabriceventstream.go @@ -1,6 +1,7 @@ package v1 import ( + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -89,3 +90,8 @@ type DBAuth struct { UserKey string `json:"userKey,omitempty"` PasswordKey string `json:"passwordKey,omitempty"` } + +type Slot struct { + Slot map[string]string `json:"slot"` + Publication map[string]acidv1.StreamTable `json:"publication"` +} diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index a89f9fbdc..9f6ab5b33 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -49,6 +49,7 @@ const ( getPublicationsSQL = `SELECT p.pubname, string_agg(pt.schemaname || '.' || pt.tablename, ', ' ORDER BY pt.schemaname, pt.tablename) FROM pg_publication p LEFT JOIN pg_publication_tables pt ON pt.pubname = p.pubname + WHERE p.pubowner = 10 GROUP BY p.pubname;` createPublicationSQL = `CREATE PUBLICATION "%s" FOR TABLE %s WITH (publish = 'insert, update');` alterPublicationSQL = `ALTER PUBLICATION "%s" SET TABLE %s;` diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index dd3e35240..9ba4d8b34 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -95,11 +95,10 @@ func gatherApplicationIds(streams []acidv1.Stream) []string { return appIds } -func (c *Cluster) syncPublication(slots map[string]map[string]string, publications map[string]map[string]acidv1.StreamTable, dbName string, slotNames []string) (map[string]map[string]string, error) { +func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]zalandov1.Slot, slotsToSync *map[string]map[string]string) error { createPublications := make(map[string]string) alterPublications := make(map[string]string) - deletePublications := make(map[string]string) - slotsToSync := make(map[string]map[string]string) + deletePublications := []string{} defer func() { if err := c.closeDbConn(); err != nil { @@ -109,17 +108,17 @@ func (c *Cluster) syncPublication(slots map[string]map[string]string, publicatio // check for existing publications if err := c.initDbConnWithName(dbName); err != nil { - return nil, fmt.Errorf("could not init database connection: %v", err) + return fmt.Errorf("could not init database connection: %v", err) } currentPublications, err := c.getPublications() if err != nil { - return nil, fmt.Errorf("could not get current publications: %v", err) + return fmt.Errorf("could not get current publications: %v", err) } - for _, slotName := range slotNames { - tables := publications[slotName] - tableNames := make([]string, len(publications[slotName])) + for slotName, slotAndPublication := range databaseSlotsList { + tables := slotAndPublication.Publication + tableNames := make([]string, len(tables)) i := 0 for t := range tables { tableName, schemaName := getTableSchema(t) @@ -135,38 +134,38 @@ func (c *Cluster) syncPublication(slots map[string]map[string]string, publicatio } else if currentTables != tableList { alterPublications[slotName] = tableList } - slotsToSync[slotName] = slots[slotName] + (*slotsToSync)[slotName] = slotAndPublication.Slot } // check if there is any deletion - for slotName, tables := range currentPublications { - if _, exists := publications[slotName]; !exists { - deletePublications[slotName] = tables + for slotName, _ := range currentPublications { + if _, exists := databaseSlotsList[slotName]; !exists { + deletePublications = append(deletePublications, slotName) } } if len(createPublications)+len(alterPublications)+len(deletePublications) == 0 { - return nil, nil + return nil } for publicationName, tables := range createPublications { if err = c.executeCreatePublication(publicationName, tables); err != nil { - return nil, fmt.Errorf("creation of publication %q failed: %v", publicationName, err) + return fmt.Errorf("creation of publication %q failed: %v", publicationName, err) } } for publicationName, tables := range alterPublications { if err = c.executeAlterPublication(publicationName, tables); err != nil { - return nil, fmt.Errorf("update of publication %q failed: %v", publicationName, err) + return fmt.Errorf("update of publication %q failed: %v", publicationName, err) } } - for publicationName, _ := range deletePublications { - slotsToSync[publicationName] = nil + for _, publicationName := range deletePublications { + (*slotsToSync)[publicationName] = nil if err = c.executeDropPublication(publicationName); err != nil { - return nil, fmt.Errorf("deletion of publication %q failed: %v", publicationName, err) + return fmt.Errorf("deletion of publication %q failed: %v", publicationName, err) } } - return slotsToSync, nil + return nil } func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream { @@ -308,17 +307,15 @@ func (c *Cluster) syncStreams() error { return nil } - slots := make(map[string]map[string]string) + databaseSlots := make(map[string]map[string]zalandov1.Slot) slotsToSync := make(map[string]map[string]string) - publications := make(map[string]map[string]acidv1.StreamTable) requiredPatroniConfig := c.Spec.Patroni - databases := make(map[string][]string) - if len(requiredPatroniConfig.Slots) > 0 { - slots = requiredPatroniConfig.Slots - } + // skip this for now + // if len(requiredPatroniConfig.Slots) > 0 { + // slots = requiredPatroniConfig.Slots + // } - // list existing publications if err := c.initDbConn(); err != nil { return fmt.Errorf("could not init database connection") } @@ -334,54 +331,48 @@ func (c *Cluster) syncStreams() error { // get database name with empty list of slot, except template0 and template1 for dbName, _ := range listDatabases { if dbName != "template0" && dbName != "template1" { - databases[dbName] = []string{} + databaseSlots[dbName] = map[string]zalandov1.Slot{} } } // gather list of required slots and publications, group by database for _, stream := range c.Spec.Streams { + if _, exists := databaseSlots[stream.Database]; !exists { + c.logger.Warningf("database %q does not exist in the cluster", stream.Database) + continue + } slot := map[string]string{ "database": stream.Database, "plugin": constants.EventStreamSourcePluginType, "type": "logical", } slotName := getSlotName(stream.Database, stream.ApplicationId) - if _, exists := slots[slotName]; !exists { - slots[slotName] = slot - publications[slotName] = stream.Tables + if _, exists := databaseSlots[stream.Database][slotName]; !exists { + databaseSlots[stream.Database][slotName] = zalandov1.Slot{ + Slot: slot, + Publication: stream.Tables, + } } else { - streamTables := publications[slotName] + slotAndPublication := databaseSlots[stream.Database][slotName] + streamTables := slotAndPublication.Publication for tableName, table := range stream.Tables { if _, exists := streamTables[tableName]; !exists { streamTables[tableName] = table } } - publications[slotName] = streamTables - } - // save the slotName in the database list - if _, exists := databases[stream.Database]; !exists { - databases[stream.Database] = []string{slotName} - } else { - if !util.SliceContains(databases[stream.Database], slotName) { - databases[stream.Database] = append(databases[stream.Database], slotName) - } + slotAndPublication.Publication = streamTables + databaseSlots[stream.Database][slotName] = slotAndPublication } } // sync publication in a database c.logger.Debug("syncing database publications") - for dbName, slotNames := range databases { - slotsToSyncDb, err := c.syncPublication(slots, publications, dbName, slotNames) + for dbName, databaseSlotsList := range databaseSlots { + err := c.syncPublication(dbName, databaseSlotsList, &slotsToSync) if err != nil { c.logger.Warningf("could not sync publications in database %q: %v", dbName, err) continue } - // if does not exist in the slotsToSync, add it - for slotName, slotSection := range slotsToSyncDb { - if _, exists := slotsToSync[slotName]; !exists { - slotsToSync[slotName] = slotSection - } - } } // no slots to sync = no streams defined or publications created From 5418d89c7c902737c0ca5b58a94a07072a2cbed1 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 12 Jul 2024 17:08:43 +0200 Subject: [PATCH 12/27] fix bug of removing manifest slots in syncStreams Co-authored-by: Ida Novindasari idanovinda@gmail.com --- pkg/cluster/streams.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 9ba4d8b34..e34ecbec6 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -311,10 +311,9 @@ func (c *Cluster) syncStreams() error { slotsToSync := make(map[string]map[string]string) requiredPatroniConfig := c.Spec.Patroni - // skip this for now - // if len(requiredPatroniConfig.Slots) > 0 { - // slots = requiredPatroniConfig.Slots - // } + if len(requiredPatroniConfig.Slots) > 0 { + slotsToSync = requiredPatroniConfig.Slots + } if err := c.initDbConn(); err != nil { return fmt.Errorf("could not init database connection") @@ -375,11 +374,6 @@ func (c *Cluster) syncStreams() error { } } - // no slots to sync = no streams defined or publications created - if len(slotsToSync) > 0 { - requiredPatroniConfig.Slots = slotsToSync - } - c.logger.Debug("syncing logical replication slots") pods, err := c.listPods() if err != nil { From 6ed0e7091c2b1380ec56c54ed8bf3bbef95d9654 Mon Sep 17 00:00:00 2001 From: inovindasari Date: Wed, 17 Jul 2024 15:02:17 +0200 Subject: [PATCH 13/27] fix overwrite because of pass by reference value --- pkg/cluster/streams.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index e34ecbec6..c76523f4a 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -312,7 +312,9 @@ func (c *Cluster) syncStreams() error { requiredPatroniConfig := c.Spec.Patroni if len(requiredPatroniConfig.Slots) > 0 { - slotsToSync = requiredPatroniConfig.Slots + for slotName, slotConfig := range requiredPatroniConfig.Slots { + slotsToSync[slotName] = slotConfig + } } if err := c.initDbConn(); err != nil { @@ -381,6 +383,7 @@ func (c *Cluster) syncStreams() error { } // sync logical replication slots in Patroni config + requiredPatroniConfig.Slots = slotsToSync configPatched, _, _, err := c.syncPatroniConfig(pods, requiredPatroniConfig, nil) if err != nil { c.logger.Warningf("Patroni config updated? %v - errors during config sync: %v", configPatched, err) From 2a90be39209bcfc08b0d9b7c50b7036664431ded Mon Sep 17 00:00:00 2001 From: inovindasari Date: Wed, 17 Jul 2024 15:40:57 +0200 Subject: [PATCH 14/27] improve e2etest make sure patroni slot is not deleted --- e2e/tests/test_e2e.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index b3fe8ba66..8bac260d0 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2011,6 +2011,13 @@ def test_stream_resources(self): # update the manifest with the streaming section patch_streaming_config = { "spec": { + "patroni": { + "slots": { + "manual_slot": { + "type": "physical" + } + } + }, "streams": [ { "applicationId": "test-app", @@ -2026,7 +2033,7 @@ def test_stream_resources(self): } } } - ] + ] } } k8s.api.custom_objects_api.patch_namespaced_custom_object( @@ -2047,7 +2054,7 @@ def test_stream_resources(self): self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1, "Could not find Fabric Event Stream resource", 10, 5) - + # remove the streaming section from the manifest patch_streaming_config_removal = { "spec": { @@ -2067,6 +2074,13 @@ def test_stream_resources(self): self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0, "Replication slot is not deleted", 10, 5) + # check the manual_slot should not get deleted + get_manual_slot_query = """ + SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot'; + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1, + "Slot defined in patroni config is deleted", 10, 5) + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_taint_based_eviction(self): ''' From 7747151a0190ec49d8a493e8479f34e51d0f0000 Mon Sep 17 00:00:00 2001 From: inovindasari Date: Fri, 19 Jul 2024 14:02:54 +0200 Subject: [PATCH 15/27] e2e test: should not delete nonstream publication --- e2e/tests/test_e2e.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 8bac260d0..c57885adb 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2055,6 +2055,12 @@ def test_stream_resources(self): "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1, "Could not find Fabric Event Stream resource", 10, 5) + # patch non-postgres user created publication + create_nonstream_publication = """ + CREATE PUBLICATION mypublication FOR TABLE test_table; + """ + self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user") + # remove the streaming section from the manifest patch_streaming_config_removal = { "spec": { @@ -2078,8 +2084,13 @@ def test_stream_resources(self): get_manual_slot_query = """ SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot'; """ + get_nonstream_publication_query = """ + SELECT * FROM pg_publication WHERE pubname = 'mypublication'; + """ self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1, "Slot defined in patroni config is deleted", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_nonstream_publication_query)), 1, + "Publication defined not in stream section is deleted", 10, 5) @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_taint_based_eviction(self): From 952c8a8179b1899e6a5b89e27aa56f855dc03434 Mon Sep 17 00:00:00 2001 From: inovindasari Date: Mon, 22 Jul 2024 09:20:21 +0200 Subject: [PATCH 16/27] grant permission to create publication for foo_user --- e2e/tests/test_e2e.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index c57885adb..83445adb0 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2055,7 +2055,13 @@ def test_stream_resources(self): "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1, "Could not find Fabric Event Stream resource", 10, 5) - # patch non-postgres user created publication + # grant create and ownership of test_tabble to foo_user + grant_permission_foo_user = """ + GRANT CREATE ON DATABASE foo TO foo_user; + ALTER TABLE test_table OWNER TO foo_user; + """ + self.query_database(leader.metadata.name, "foo", grant_permission_foo_user) + # non-postgres user creates a publication create_nonstream_publication = """ CREATE PUBLICATION mypublication FOR TABLE test_table; """ @@ -2080,7 +2086,7 @@ def test_stream_resources(self): self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0, "Replication slot is not deleted", 10, 5) - # check the manual_slot should not get deleted + # check the manual_slot and mypublication should not get deleted get_manual_slot_query = """ SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot'; """ @@ -2089,7 +2095,7 @@ def test_stream_resources(self): """ self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1, "Slot defined in patroni config is deleted", 10, 5) - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_nonstream_publication_query)), 1, + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1, "Publication defined not in stream section is deleted", 10, 5) @timeout_decorator.timeout(TEST_TIMEOUT_SEC) From 35990d043d46259dbc339704275d7b8ab6a5a482 Mon Sep 17 00:00:00 2001 From: inovindasari Date: Mon, 22 Jul 2024 11:19:18 +0200 Subject: [PATCH 17/27] add check if the publication is created successfully --- e2e/tests/test_e2e.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 83445adb0..5af70f393 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2066,6 +2066,14 @@ def test_stream_resources(self): CREATE PUBLICATION mypublication FOR TABLE test_table; """ self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user") + # check if query_database_with_user foo_user work + print(self.query_database_with_user(leader.metadata.name, "foo", "SELECT * FROM pg_publication;", "foo_user")) + # check if the publication is created + get_nonstream_publication_query = """ + SELECT * FROM pg_publication WHERE pubname = 'mypublication'; + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1, + "Publication defined not in stream section failed to be created", 10, 5) # remove the streaming section from the manifest patch_streaming_config_removal = { @@ -2090,9 +2098,6 @@ def test_stream_resources(self): get_manual_slot_query = """ SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot'; """ - get_nonstream_publication_query = """ - SELECT * FROM pg_publication WHERE pubname = 'mypublication'; - """ self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1, "Slot defined in patroni config is deleted", 10, 5) self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1, From fb6dac9d6ca7340111f86c9d755368a9fb01c6cf Mon Sep 17 00:00:00 2001 From: inovindasari Date: Mon, 22 Jul 2024 22:43:15 +0200 Subject: [PATCH 18/27] fix e2e test: reset search path foo_user --- e2e/tests/test_e2e.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 5af70f393..9e2546d5d 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2055,10 +2055,11 @@ def test_stream_resources(self): "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1, "Could not find Fabric Event Stream resource", 10, 5) - # grant create and ownership of test_tabble to foo_user + # grant create and ownership of test_tabble to foo_user, reset search path to default grant_permission_foo_user = """ GRANT CREATE ON DATABASE foo TO foo_user; ALTER TABLE test_table OWNER TO foo_user; + ALTER ROLE foo_user RESET search_path; """ self.query_database(leader.metadata.name, "foo", grant_permission_foo_user) # non-postgres user creates a publication @@ -2066,19 +2067,11 @@ def test_stream_resources(self): CREATE PUBLICATION mypublication FOR TABLE test_table; """ self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user") - # check if query_database_with_user foo_user work - print(self.query_database_with_user(leader.metadata.name, "foo", "SELECT * FROM pg_publication;", "foo_user")) - # check if the publication is created - get_nonstream_publication_query = """ - SELECT * FROM pg_publication WHERE pubname = 'mypublication'; - """ - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1, - "Publication defined not in stream section failed to be created", 10, 5) # remove the streaming section from the manifest patch_streaming_config_removal = { "spec": { - "streams": [] + "streams": [] } } k8s.api.custom_objects_api.patch_namespaced_custom_object( @@ -2098,6 +2091,9 @@ def test_stream_resources(self): get_manual_slot_query = """ SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot'; """ + get_nonstream_publication_query = """ + SELECT * FROM pg_publication WHERE pubname = 'mypublication'; + """ self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1, "Slot defined in patroni config is deleted", 10, 5) self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1, From 15e57f7d1a208b737e9e6ada09c86b9b9e350343 Mon Sep 17 00:00:00 2001 From: inovindasari Date: Tue, 23 Jul 2024 13:01:09 +0200 Subject: [PATCH 19/27] apply feedback --- e2e/tests/k8s_api.py | 1 + e2e/tests/test_e2e.py | 12 +++++++++ manifests/operator-service-account-rbac.yaml | 26 ++++++++++---------- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/e2e/tests/k8s_api.py b/e2e/tests/k8s_api.py index 12e45f4b0..276ddfa25 100644 --- a/e2e/tests/k8s_api.py +++ b/e2e/tests/k8s_api.py @@ -20,6 +20,7 @@ def __init__(self): self.config = config.load_kube_config() self.k8s_client = client.ApiClient() + self.rbac_api = client.RbacAuthorizationV1Api() self.core_v1 = client.CoreV1Api() self.apps_v1 = client.AppsV1Api() diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 9e2546d5d..306ab14e0 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -1998,6 +1998,18 @@ def test_stream_resources(self): ''' k8s = self.k8s + # patch ClusterRole to enable listing FES resources + patch_cluster_role_config = { + "rules": [ + { + "apiGroups": ["zalando.org"], + "resources": ["fabriceventstreams"], + "verbs": ["list"] + } + ] + } + k8s.api.rbac_api.patch_cluster_role("postgres-operator", patch_cluster_role_config) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") leader = k8s.get_cluster_leader_pod() diff --git a/manifests/operator-service-account-rbac.yaml b/manifests/operator-service-account-rbac.yaml index 08c676089..97629ee95 100644 --- a/manifests/operator-service-account-rbac.yaml +++ b/manifests/operator-service-account-rbac.yaml @@ -36,19 +36,19 @@ rules: - list - watch # all verbs allowed for event streams (Zalando-internal feature) -- apiGroups: - - zalando.org - resources: - - fabriceventstreams - verbs: - - create - - delete - - deletecollection - - get - - list - - patch - - update - - watch +# - apiGroups: +# - zalando.org +# resources: +# - fabriceventstreams +# verbs: +# - create +# - delete +# - deletecollection +# - get +# - list +# - patch +# - update +# - watch # to create or get/update CRDs when starting up - apiGroups: - apiextensions.k8s.io From 5d037e20eb74f5bda8c74b0425096f1d4b5a829e Mon Sep 17 00:00:00 2001 From: inovindasari Date: Tue, 23 Jul 2024 15:10:59 +0200 Subject: [PATCH 20/27] not patching cluster role --- e2e/tests/test_e2e.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 306ab14e0..021be1973 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -1997,19 +1997,6 @@ def test_stream_resources(self): Create and delete fabric event streaming resources. ''' k8s = self.k8s - - # patch ClusterRole to enable listing FES resources - patch_cluster_role_config = { - "rules": [ - { - "apiGroups": ["zalando.org"], - "resources": ["fabriceventstreams"], - "verbs": ["list"] - } - ] - } - k8s.api.rbac_api.patch_cluster_role("postgres-operator", patch_cluster_role_config) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") leader = k8s.get_cluster_leader_pod() From 134f49c006f5ecdf9a18cceb819fc01132dc4dec Mon Sep 17 00:00:00 2001 From: inovindasari Date: Tue, 23 Jul 2024 16:11:02 +0200 Subject: [PATCH 21/27] e2e test: patch with append to not overwrite --- e2e/tests/test_e2e.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 021be1973..6cf43dd16 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2001,6 +2001,16 @@ def test_stream_resources(self): "Operator does not get in sync") leader = k8s.get_cluster_leader_pod() + # patch ClusterRole to enable listing FES resources + cluster_role = k8s.api.rbac_api.read_cluster_role("postgres-operator") + fes_cluster_role_rule = client.V1PolicyRule( + api_groups=["zalando.org"], + resources=["fabriceventstreams"], + verbs=["list"] + ) + cluster_role.rules.append(fes_cluster_role_rule) + k8s.api.rbac_api.patch_cluster_role("postgres-operator", cluster_role) + # create a table in one of the database of acid-minimal-cluster create_stream_table = """ CREATE TABLE test_table (id int, payload jsonb); From fc05c13ae760c77b9b9fd2b029fc88181542e066 Mon Sep 17 00:00:00 2001 From: inovindasari Date: Tue, 23 Jul 2024 16:56:40 +0200 Subject: [PATCH 22/27] allow all verbs for fes resources --- e2e/tests/test_e2e.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 6cf43dd16..d9a2d863b 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2006,7 +2006,7 @@ def test_stream_resources(self): fes_cluster_role_rule = client.V1PolicyRule( api_groups=["zalando.org"], resources=["fabriceventstreams"], - verbs=["list"] + verbs=["create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"] ) cluster_role.rules.append(fes_cluster_role_rule) k8s.api.rbac_api.patch_cluster_role("postgres-operator", cluster_role) From d2f8179061f02a841a5201049596f51372f4c683 Mon Sep 17 00:00:00 2001 From: Ida Novindasari Date: Wed, 24 Jul 2024 14:19:19 +0200 Subject: [PATCH 23/27] Update e2e/tests/test_e2e.py Co-authored-by: Felix Kunde --- e2e/tests/test_e2e.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index d9a2d863b..dff4aa589 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2001,7 +2001,7 @@ def test_stream_resources(self): "Operator does not get in sync") leader = k8s.get_cluster_leader_pod() - # patch ClusterRole to enable listing FES resources + # patch ClusterRole with CRUD privileges on FES resources cluster_role = k8s.api.rbac_api.read_cluster_role("postgres-operator") fes_cluster_role_rule = client.V1PolicyRule( api_groups=["zalando.org"], From ceca99a3bdcd966418844ad468859a7831cc85ab Mon Sep 17 00:00:00 2001 From: Ida Novindasari Date: Wed, 24 Jul 2024 14:19:26 +0200 Subject: [PATCH 24/27] Update e2e/tests/test_e2e.py Co-authored-by: Felix Kunde --- e2e/tests/test_e2e.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index dff4aa589..65d448907 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2017,7 +2017,7 @@ def test_stream_resources(self): """ self.query_database(leader.metadata.name, "foo", create_stream_table) - # update the manifest with the streaming section + # update the manifest with the streams section patch_streaming_config = { "spec": { "patroni": { From e4df22ca34cca0f442cff0ec624f84878431f723 Mon Sep 17 00:00:00 2001 From: Ida Novindasari Date: Wed, 24 Jul 2024 14:19:33 +0200 Subject: [PATCH 25/27] Update e2e/tests/test_e2e.py Co-authored-by: Felix Kunde --- e2e/tests/test_e2e.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 65d448907..404b0f551 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2064,7 +2064,7 @@ def test_stream_resources(self): "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1, "Could not find Fabric Event Stream resource", 10, 5) - # grant create and ownership of test_tabble to foo_user, reset search path to default + # grant create and ownership of test_table to foo_user, reset search path to default grant_permission_foo_user = """ GRANT CREATE ON DATABASE foo TO foo_user; ALTER TABLE test_table OWNER TO foo_user; From 47767aa5bcd5e004622e288ceef17278897bba52 Mon Sep 17 00:00:00 2001 From: Ida Novindasari Date: Wed, 24 Jul 2024 14:20:07 +0200 Subject: [PATCH 26/27] Update e2e/tests/test_e2e.py --- e2e/tests/test_e2e.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 404b0f551..5182851b4 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2077,7 +2077,7 @@ def test_stream_resources(self): """ self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user") - # remove the streaming section from the manifest + # remove the streams section from the manifest patch_streaming_config_removal = { "spec": { "streams": [] From 7d2987edc2a021f552f27b00fd2e5a498f9d5652 Mon Sep 17 00:00:00 2001 From: Ida Novindasari Date: Wed, 24 Jul 2024 15:34:33 +0200 Subject: [PATCH 27/27] Update pkg/cluster/database.go Co-authored-by: Felix Kunde --- pkg/cluster/database.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 9f6ab5b33..433e4438e 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -49,7 +49,8 @@ const ( getPublicationsSQL = `SELECT p.pubname, string_agg(pt.schemaname || '.' || pt.tablename, ', ' ORDER BY pt.schemaname, pt.tablename) FROM pg_publication p LEFT JOIN pg_publication_tables pt ON pt.pubname = p.pubname - WHERE p.pubowner = 10 + WHERE p.pubowner = 'postgres'::regrole + AND p.pubname LIKE 'fes_%' GROUP BY p.pubname;` createPublicationSQL = `CREATE PUBLICATION "%s" FOR TABLE %s WITH (publish = 'insert, update');` alterPublicationSQL = `ALTER PUBLICATION "%s" SET TABLE %s;`