Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -2041,6 +2041,20 @@ def test_stream_resources(self):
"recoveryEventType": "test-event-dlq"
}
}
},
{
"applicationId": "test-app2",
"batchSize": 100,
"database": "foo",
"enableRecovery": True,
"tables": {
"test_non_exist_table": {
"eventType": "test-event",
"idColumn": "id",
"payloadColumn": "payload",
"recoveryEventType": "test-event-dlq"
}
}
}
]
}
Expand All @@ -2064,6 +2078,18 @@ def test_stream_resources(self):
"zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1,
"Could not find Fabric Event Stream resource", 10, 5)

# check if the non-existing table in the stream section does not create a publication and slot
get_publication_query_not_exist_table = """
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app2';
"""
get_slot_query_not_exist_table = """
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app2';
"""
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query_not_exist_table)), 0,
"Publication is created for non-existing tables", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query_not_exist_table)), 0,
"Replication slot is created for non-existing tables", 10, 5)

# grant create and ownership of test_table to foo_user, reset search path to default
grant_permission_foo_user = """
GRANT CREATE ON DATABASE foo TO foo_user;
Expand Down
29 changes: 20 additions & 9 deletions pkg/cluster/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
} else if currentTables != tableList {
alterPublications[slotName] = tableList
}
(*slotsToSync)[slotName] = slotAndPublication.Slot
}

// check if there is any deletion
Expand All @@ -148,24 +147,30 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
return nil
}

var errorMessage error = nil
for publicationName, tables := range createPublications {
if err = c.executeCreatePublication(publicationName, tables); err != nil {
return fmt.Errorf("creation of publication %q failed: %v", publicationName, err)
errorMessage = fmt.Errorf("creation of publication %q failed: %v", publicationName, err)
continue
}
(*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot
}
for publicationName, tables := range alterPublications {
if err = c.executeAlterPublication(publicationName, tables); err != nil {
return fmt.Errorf("update of publication %q failed: %v", publicationName, err)
errorMessage = fmt.Errorf("update of publication %q failed: %v", publicationName, err)
continue
}
(*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot
}
for _, publicationName := range deletePublications {
(*slotsToSync)[publicationName] = nil
if err = c.executeDropPublication(publicationName); err != nil {
return fmt.Errorf("deletion of publication %q failed: %v", publicationName, err)
errorMessage = fmt.Errorf("deletion of publication %q failed: %v", publicationName, err)
continue
}
(*slotsToSync)[publicationName] = nil
}

return nil
return errorMessage
}

func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream {
Expand Down Expand Up @@ -390,15 +395,15 @@ func (c *Cluster) syncStreams() error {
}

// finally sync stream CRDs
err = c.createOrUpdateStreams()
err = c.createOrUpdateStreams(slotsToSync)
if err != nil {
return err
}

return nil
}

func (c *Cluster) createOrUpdateStreams() error {
func (c *Cluster) createOrUpdateStreams(createdSlots map[string]map[string]string) error {

// fetch different application IDs from streams section
// there will be a separate event stream resource for each ID
Expand All @@ -413,7 +418,7 @@ func (c *Cluster) createOrUpdateStreams() error {
return fmt.Errorf("could not list of FabricEventStreams: %v", err)
}

for _, appId := range appIds {
for idx, appId := range appIds {
streamExists := false

// update stream when it exists and EventStreams array differs
Expand All @@ -435,6 +440,12 @@ func (c *Cluster) createOrUpdateStreams() error {
}

if !streamExists {
// check if there is any slot with the applicationId
slotName := getSlotName(c.Spec.Streams[idx].Database, appId)
if _, exists := createdSlots[slotName]; !exists {
c.logger.Warningf("no slot %s with applicationId %s exists, skipping event stream creation", slotName, appId)
continue
}
c.logger.Infof("event streams with applicationId %s do not exist, create it", appId)
streamCRD, err := c.createStreams(appId)
if err != nil {
Expand Down
16 changes: 10 additions & 6 deletions pkg/cluster/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ var (
fesUser string = fmt.Sprintf("%s%s", constants.EventStreamSourceSlotPrefix, constants.UserRoleNameSuffix)
slotName string = fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1))

fakeCreatedSlots map[string]map[string]string = map[string]map[string]string{
slotName: {},
}

pg = acidv1.Postgresql{
TypeMeta: metav1.TypeMeta{
Kind: "Postgresql",
Expand Down Expand Up @@ -222,7 +226,7 @@ func TestGenerateFabricEventStream(t *testing.T) {
assert.NoError(t, err)

// create the streams
err = cluster.createOrUpdateStreams()
err = cluster.createOrUpdateStreams(fakeCreatedSlots)
assert.NoError(t, err)

// compare generated stream with expected stream
Expand All @@ -248,7 +252,7 @@ func TestGenerateFabricEventStream(t *testing.T) {
}

// sync streams once again
err = cluster.createOrUpdateStreams()
err = cluster.createOrUpdateStreams(fakeCreatedSlots)
assert.NoError(t, err)

streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
Expand Down Expand Up @@ -397,7 +401,7 @@ func TestUpdateFabricEventStream(t *testing.T) {
assert.NoError(t, err)

// now create the stream
err = cluster.createOrUpdateStreams()
err = cluster.createOrUpdateStreams(fakeCreatedSlots)
assert.NoError(t, err)

// change specs of streams and patch CRD
Expand All @@ -419,7 +423,7 @@ func TestUpdateFabricEventStream(t *testing.T) {
assert.NoError(t, err)

cluster.Postgresql.Spec = pgPatched.Spec
err = cluster.createOrUpdateStreams()
err = cluster.createOrUpdateStreams(fakeCreatedSlots)
assert.NoError(t, err)

// compare stream returned from API with expected stream
Expand Down Expand Up @@ -448,7 +452,7 @@ func TestUpdateFabricEventStream(t *testing.T) {
assert.NoError(t, err)

cluster.Postgresql.Spec = pgPatched.Spec
err = cluster.createOrUpdateStreams()
err = cluster.createOrUpdateStreams(fakeCreatedSlots)
assert.NoError(t, err)

result = cluster.generateFabricEventStream(appId)
Expand All @@ -466,7 +470,7 @@ func TestUpdateFabricEventStream(t *testing.T) {
assert.NoError(t, err)

cluster.Postgresql.Spec = pgUpdated.Spec
cluster.createOrUpdateStreams()
cluster.createOrUpdateStreams(fakeCreatedSlots)

streamList, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
if len(streamList.Items) > 0 || err != nil {
Expand Down