Skip to content

Commit e4f00b7

Browse files
committed
extend and improve hasSlotsInSync unit test
1 parent 25ccc87 commit e4f00b7

File tree

2 files changed

+143
-8
lines changed

2 files changed

+143
-8
lines changed

pkg/cluster/streams.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ func (c *Cluster) syncStreams() error {
393393
// there will be a separate event stream resource for each ID
394394
appIds := getDistinctApplicationIds(c.Spec.Streams)
395395
for _, appId := range appIds {
396-
if hasSlotsInSync(appId, databaseSlots, slotsToSync) {
396+
if c.hasSlotsInSync(appId, databaseSlots, slotsToSync) {
397397
if err = c.syncStream(appId); err != nil {
398398
c.logger.Warningf("could not sync event streams with applicationId %s: %v", appId, err)
399399
}
@@ -410,13 +410,15 @@ func (c *Cluster) syncStreams() error {
410410
return nil
411411
}
412412

413-
func hasSlotsInSync(appId string, databaseSlots map[string]map[string]zalandov1.Slot, slotsToSync map[string]map[string]string) bool {
413+
func (c *Cluster) hasSlotsInSync(appId string, databaseSlots map[string]map[string]zalandov1.Slot, slotsToSync map[string]map[string]string) bool {
414414
allSlotsInSync := true
415415
for dbName, slots := range databaseSlots {
416416
for slotName := range slots {
417417
if slotName == getSlotName(dbName, appId) {
418418
if _, exists := slotsToSync[slotName]; !exists {
419419
allSlotsInSync = false
420+
c.logger.Warnf("replication slot %q for applicationId %s not found in database", slotName, appId)
421+
continue
420422
}
421423
}
422424
}

pkg/cluster/streams_test.go

Lines changed: 139 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,15 +193,44 @@ func TestGatherApplicationIds(t *testing.T) {
193193
}
194194

195195
func TestHasSlotsInSync(t *testing.T) {
196+
client, _ := newFakeK8sStreamClient()
197+
var cluster = New(
198+
Config{
199+
OpConfig: config.Config{
200+
Auth: config.Auth{
201+
SecretNameTemplate: "{username}.{cluster}.credentials.{tprkind}.{tprgroup}",
202+
},
203+
PodManagementPolicy: "ordered_ready",
204+
Resources: config.Resources{
205+
ClusterLabels: map[string]string{"application": "spilo"},
206+
ClusterNameLabel: "cluster-name",
207+
DefaultCPURequest: "300m",
208+
DefaultCPULimit: "300m",
209+
DefaultMemoryRequest: "300Mi",
210+
DefaultMemoryLimit: "300Mi",
211+
PodRoleLabel: "spilo-role",
212+
},
213+
},
214+
}, client, pg, logger, eventRecorder)
215+
216+
cluster.Name = clusterName
217+
cluster.Namespace = namespace
218+
219+
appId2 := fmt.Sprintf("%s-2", appId)
220+
dbNotExists := "dbnotexists"
221+
slotNotExists := fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbNotExists, strings.Replace(appId, "-", "_", -1))
222+
slotNotExistsAppId2 := fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbNotExists, strings.Replace(appId2, "-", "_", -1))
196223

197224
tests := []struct {
198225
subTest string
226+
applicationId string
199227
expectedSlots map[string]map[string]zalandov1.Slot
200228
actualSlots map[string]map[string]string
201229
slotsInSync bool
202230
}{
203231
{
204-
subTest: "slots are in sync",
232+
subTest: fmt.Sprintf("slots in sync for applicationId %s", appId),
233+
applicationId: appId,
205234
expectedSlots: map[string]map[string]zalandov1.Slot{
206235
dbName: {
207236
slotName: zalandov1.Slot{
@@ -227,7 +256,70 @@ func TestHasSlotsInSync(t *testing.T) {
227256
},
228257
slotsInSync: true,
229258
}, {
230-
subTest: "slots are not in sync",
259+
subTest: fmt.Sprintf("slots empty for applicationId %s", appId),
260+
applicationId: appId,
261+
expectedSlots: map[string]map[string]zalandov1.Slot{
262+
dbNotExists: {
263+
slotNotExists: zalandov1.Slot{
264+
Slot: map[string]string{
265+
"databases": dbName,
266+
"plugin": constants.EventStreamSourcePluginType,
267+
"type": "logical",
268+
},
269+
Publication: map[string]acidv1.StreamTable{
270+
"test1": acidv1.StreamTable{
271+
EventType: "stream-type-a",
272+
},
273+
},
274+
},
275+
},
276+
},
277+
actualSlots: map[string]map[string]string{},
278+
slotsInSync: false,
279+
}, {
280+
subTest: fmt.Sprintf("one slot not in sync for applicationId %s", appId),
281+
applicationId: appId,
282+
expectedSlots: map[string]map[string]zalandov1.Slot{
283+
dbName: {
284+
slotName: zalandov1.Slot{
285+
Slot: map[string]string{
286+
"databases": dbName,
287+
"plugin": constants.EventStreamSourcePluginType,
288+
"type": "logical",
289+
},
290+
Publication: map[string]acidv1.StreamTable{
291+
"test1": acidv1.StreamTable{
292+
EventType: "stream-type-a",
293+
},
294+
},
295+
},
296+
},
297+
dbNotExists: {
298+
slotNotExists: zalandov1.Slot{
299+
Slot: map[string]string{
300+
"databases": "dbnotexists",
301+
"plugin": constants.EventStreamSourcePluginType,
302+
"type": "logical",
303+
},
304+
Publication: map[string]acidv1.StreamTable{
305+
"test2": acidv1.StreamTable{
306+
EventType: "stream-type-b",
307+
},
308+
},
309+
},
310+
},
311+
},
312+
actualSlots: map[string]map[string]string{
313+
slotName: map[string]string{
314+
"databases": dbName,
315+
"plugin": constants.EventStreamSourcePluginType,
316+
"type": "logical",
317+
},
318+
},
319+
slotsInSync: false,
320+
}, {
321+
subTest: fmt.Sprintf("slots in sync for applicationId %s, but not for for %s - checking %s", appId, appId2, appId),
322+
applicationId: appId,
231323
expectedSlots: map[string]map[string]zalandov1.Slot{
232324
dbName: {
233325
slotName: zalandov1.Slot{
@@ -243,8 +335,49 @@ func TestHasSlotsInSync(t *testing.T) {
243335
},
244336
},
245337
},
246-
"dbnotexists": {
338+
dbNotExists: {
339+
slotNotExistsAppId2: zalandov1.Slot{
340+
Slot: map[string]string{
341+
"databases": "dbnotexists",
342+
"plugin": constants.EventStreamSourcePluginType,
343+
"type": "logical",
344+
},
345+
Publication: map[string]acidv1.StreamTable{
346+
"test2": acidv1.StreamTable{
347+
EventType: "stream-type-b",
348+
},
349+
},
350+
},
351+
},
352+
},
353+
actualSlots: map[string]map[string]string{
354+
slotName: map[string]string{
355+
"databases": dbName,
356+
"plugin": constants.EventStreamSourcePluginType,
357+
"type": "logical",
358+
},
359+
},
360+
slotsInSync: true,
361+
}, {
362+
subTest: fmt.Sprintf("slots in sync for applicationId %s, but not for for %s - checking %s", appId, appId2, appId2),
363+
applicationId: appId2,
364+
expectedSlots: map[string]map[string]zalandov1.Slot{
365+
dbName: {
247366
slotName: zalandov1.Slot{
367+
Slot: map[string]string{
368+
"databases": dbName,
369+
"plugin": constants.EventStreamSourcePluginType,
370+
"type": "logical",
371+
},
372+
Publication: map[string]acidv1.StreamTable{
373+
"test1": acidv1.StreamTable{
374+
EventType: "stream-type-a",
375+
},
376+
},
377+
},
378+
},
379+
dbNotExists: {
380+
slotNotExistsAppId2: zalandov1.Slot{
248381
Slot: map[string]string{
249382
"databases": "dbnotexists",
250383
"plugin": constants.EventStreamSourcePluginType,
@@ -270,9 +403,9 @@ func TestHasSlotsInSync(t *testing.T) {
270403
}
271404

272405
for _, tt := range tests {
273-
result := hasSlotsInSync(appId, tt.expectedSlots, tt.actualSlots)
274-
if !result {
275-
t.Errorf("slots are not in sync, expected %#v, got %#v", tt.expectedSlots, tt.actualSlots)
406+
result := cluster.hasSlotsInSync(tt.applicationId, tt.expectedSlots, tt.actualSlots)
407+
if result != tt.slotsInSync {
408+
t.Errorf("%s: unexpected result for slot test of applicationId: %v, expected slots %#v, actual slots %#v", tt.subTest, tt.applicationId, tt.expectedSlots, tt.actualSlots)
276409
}
277410
}
278411
}

0 commit comments

Comments
 (0)