Skip to content

Commit 4534a4c

Browse files
authored
fix syncing of stream CRDs (#2152)
* fix syncing of stream CRDs and improve corresponding unit tests
1 parent c1657ec commit 4534a4c

File tree

2 files changed

+48
-27
lines changed

2 files changed

+48
-27
lines changed

pkg/cluster/streams.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -374,13 +374,16 @@ func (c *Cluster) createOrUpdateStreams() error {
374374
}
375375

376376
for _, appId := range appIds {
377+
streamExists := false
378+
377379
// update stream when it exists and EventStreams array differs
378380
for _, stream := range streams.Items {
379381
if appId == stream.Spec.ApplicationId {
382+
streamExists = true
380383
desiredStreams := c.generateFabricEventStream(appId)
381384
if match, reason := sameStreams(stream.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match {
382385
c.logger.Debugf("updating event streams: %s", reason)
383-
desiredStreams.ObjectMeta.ResourceVersion = stream.ObjectMeta.ResourceVersion
386+
desiredStreams.ObjectMeta = stream.ObjectMeta
384387
err = c.updateStreams(desiredStreams)
385388
if err != nil {
386389
return fmt.Errorf("failed updating event stream %s: %v", stream.Name, err)
@@ -390,12 +393,15 @@ func (c *Cluster) createOrUpdateStreams() error {
390393
continue
391394
}
392395
}
393-
c.logger.Infof("event streams with applicationId %s do not exist, create it", appId)
394-
streamCRD, err := c.createStreams(appId)
395-
if err != nil {
396-
return fmt.Errorf("failed creating event streams with applicationId %s: %v", appId, err)
396+
397+
if !streamExists {
398+
c.logger.Infof("event streams with applicationId %s do not exist, create it", appId)
399+
streamCRD, err := c.createStreams(appId)
400+
if err != nil {
401+
return fmt.Errorf("failed creating event streams with applicationId %s: %v", appId, err)
402+
}
403+
c.logger.Infof("event streams %q have been successfully created", streamCRD.Name)
397404
}
398-
c.logger.Infof("event streams %q have been successfully created", streamCRD.Name)
399405
}
400406

401407
return nil

pkg/cluster/streams_test.go

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package cluster
22

33
import (
4-
"encoding/json"
54
"fmt"
65
"reflect"
76
"strings"
@@ -167,6 +166,15 @@ var (
167166
}
168167
)
169168

169+
func TestGatherApplicationIds(t *testing.T) {
170+
testAppIds := []string{appId}
171+
appIds := gatherApplicationIds(pg.Spec.Streams)
172+
173+
if !util.IsEqualIgnoreOrder(testAppIds, appIds) {
174+
t.Errorf("gathered applicationIds do not match, expected %#v, got %#v", testAppIds, appIds)
175+
}
176+
}
177+
170178
func TestGenerateFabricEventStream(t *testing.T) {
171179
client, _ := newFakeK8sStreamClient()
172180

@@ -206,12 +214,18 @@ func TestGenerateFabricEventStream(t *testing.T) {
206214
t.Errorf("malformed FabricEventStream, expected %#v, got %#v", fes, result)
207215
}
208216

209-
// compare stream returned from API with expected stream
210217
listOptions := metav1.ListOptions{
211218
LabelSelector: cluster.labelsSet(true).String(),
212219
}
213220
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
214221
assert.NoError(t, err)
222+
223+
// check if there is only one stream
224+
if len(streams.Items) > 1 {
225+
t.Errorf("too many stream CRDs found: got %d, but expected only one", len(streams.Items))
226+
}
227+
228+
// compare stream returned from API with expected stream
215229
if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, fes.Spec.EventStreams); !match {
216230
t.Errorf("malformed FabricEventStream returned from API, expected %#v, got %#v", fes, streams.Items[0])
217231
}
@@ -220,9 +234,15 @@ func TestGenerateFabricEventStream(t *testing.T) {
220234
err = cluster.createOrUpdateStreams()
221235
assert.NoError(t, err)
222236

223-
// compare stream resturned from API with generated stream
224237
streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
225238
assert.NoError(t, err)
239+
240+
// check if there is still only one stream
241+
if len(streams.Items) > 1 {
242+
t.Errorf("too many stream CRDs found after sync: got %d, but expected only one", len(streams.Items))
243+
}
244+
245+
// compare stream resturned from API with generated stream
226246
if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match {
227247
t.Errorf("returned FabricEventStream differs from generated one, expected %#v, got %#v", result, streams.Items[0])
228248
}
@@ -335,31 +355,26 @@ func TestUpdateFabricEventStream(t *testing.T) {
335355
_, err = cluster.createStatefulSet()
336356
assert.NoError(t, err)
337357

358+
// now create the stream
338359
err = cluster.createOrUpdateStreams()
339360
assert.NoError(t, err)
340361

341-
var pgSpec acidv1.PostgresSpec
342-
pgSpec.Streams = []acidv1.Stream{
343-
{
344-
ApplicationId: appId,
345-
Database: dbName,
346-
Tables: map[string]acidv1.StreamTable{
347-
"data.bar": acidv1.StreamTable{
348-
EventType: "stream-type-c",
349-
IdColumn: k8sutil.StringToPointer("b_id"),
350-
PayloadColumn: k8sutil.StringToPointer("b_payload"),
351-
},
352-
},
353-
BatchSize: k8sutil.UInt32ToPointer(uint32(250)),
354-
},
362+
// change specs of streams and patch CRD
363+
for i, stream := range pg.Spec.Streams {
364+
if stream.ApplicationId == appId {
365+
streamTable := stream.Tables["data.bar"]
366+
streamTable.EventType = "stream-type-c"
367+
stream.Tables["data.bar"] = streamTable
368+
stream.BatchSize = k8sutil.UInt32ToPointer(uint32(250))
369+
pg.Spec.Streams[i] = stream
370+
}
355371
}
356-
patch, err := json.Marshal(struct {
357-
PostgresSpec interface{} `json:"spec"`
358-
}{&pgSpec})
372+
373+
patchData, err := specPatch(pg.Spec)
359374
assert.NoError(t, err)
360375

361376
pgPatched, err := cluster.KubeClient.Postgresqls(namespace).Patch(
362-
context.TODO(), cluster.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "spec")
377+
context.TODO(), cluster.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "spec")
363378
assert.NoError(t, err)
364379

365380
cluster.Postgresql.Spec = pgPatched.Spec

0 commit comments

Comments
 (0)