Skip to content

Commit ca33a3c

Browse files
authored
fix: Add ChangeSet.Collections to help reduce relay memory usage (#332)
The SDK changes to convert changesets into `[]ldstoretypes.Collection` for a lot of it's internal workings. Coincidentally, the relay proxy also needs to do this same conversion when supporting FDv1 formats. To help with this, we add a `.Collections` method which memoizes the result for later use, reducing our memory allocation.
1 parent 68599fa commit ca33a3c

File tree

7 files changed

+316
-69
lines changed

7 files changed

+316
-69
lines changed

internal/datasourcev2/streaming_data_source_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ func TestStreamingDataSourceHandlesUpToDateAndSubsequentChanges(t *testing.T) {
563563
status = <-resultChan
564564
assert.Equal(t, interfaces.DataSourceStateValid, status.State)
565565
assert.NotNil(t, status.ChangeSet)
566-
dd.SetBasis(status.ChangeSet.Changes(), status.ChangeSet.Selector(), true)
566+
dd.Apply(*status.ChangeSet, true)
567567

568568
flag, err = dd.DataStore.Get(datakinds.Features, "test-flag")
569569
assert.NoError(t, err)
@@ -646,7 +646,7 @@ func TestStreamingDataSourceHandlesResettingFromError(t *testing.T) {
646646
status = <-resultChan
647647
assert.Equal(t, interfaces.DataSourceStateValid, status.State)
648648
assert.NotNil(t, status.ChangeSet)
649-
dd.SetBasis(status.ChangeSet.Changes(), status.ChangeSet.Selector(), true)
649+
dd.Apply(*status.ChangeSet, true)
650650

651651
flag, err = dd.DataStore.Get(datakinds.Features, "test-flag")
652652
assert.NoError(t, err)
@@ -734,7 +734,7 @@ func TestStreamingDataSourceIgnoresGoodbye(t *testing.T) {
734734
status = <-resultChan
735735
assert.Equal(t, interfaces.DataSourceStateValid, status.State)
736736
assert.NotNil(t, status.ChangeSet)
737-
dd.SetBasis(status.ChangeSet.Changes(), status.ChangeSet.Selector(), true)
737+
dd.Apply(*status.ChangeSet, true)
738738

739739
flag, err = dd.DataStore.Get(datakinds.Features, "test-flag")
740740
assert.NoError(t, err)

internal/datasystem/store.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func (s *Store) Close() error {
173173
// Apply applies a changeset to the store. The changeset must be a valid set of changes that can be applied
174174
// to the store. If the changeset is not valid, an error will be logged and the changeset will not be applied.
175175
func (s *Store) Apply(changeSet subsystems.ChangeSet, persist bool) {
176-
collections, err := subsystems.ToStorableItems(changeSet.Changes())
176+
collections, err := changeSet.Collections()
177177
if err != nil {
178178
s.loggers.Errorf("store: couldn't set basis due to malformed data: %v", err)
179179
return
@@ -244,7 +244,7 @@ func (s *Store) shouldPersist() bool {
244244
return s.persist && s.persistentStore.writable()
245245
}
246246

247-
// applyDelta applies a delta update to the store. applyDelta should not be called until SetBasis has been called.
247+
// applyDelta applies a delta update to the store. applyDelta should not be called until setBasis has been called.
248248
// To request data persistence, set persist to true.
249249
func (s *Store) applyDelta(collections []ldstoretypes.Collection, selector subsystems.Selector, persist bool) {
250250
s.memoryStore.ApplyDelta(collections)
@@ -298,7 +298,7 @@ func (s *Store) GetDataStoreStatusProvider() interfaces.DataStoreStatusProvider
298298
}
299299

300300
// Commit persists the data in the memory store to the persistent store, if configured. The persistent store
301-
// must also be in write mode, and the last call to SetBasis or ApplyDelta must have had persist set to true.
301+
// must also be in write mode, and the last call to setBasis or applyDelta must have had persist set to true.
302302
func (s *Store) Commit() error {
303303
s.mu.RLock()
304304
defer s.mu.RUnlock()

internal/sharedtest/mocks/mock_data_destination.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,21 @@ func (d *MockDataDestination) Selector() subsystems.Selector {
5050
return d.lastKnownSelector
5151
}
5252

53-
// SetBasis in this test implementation, delegates to d.DataStore.CapturedUpdates.
54-
func (d *MockDataDestination) SetBasis(events []subsystems.Change, selector subsystems.Selector, _ bool) {
55-
// For now, the selector is ignored. When the data sources start making use of it, it should be
56-
// stored so that assertions can be made.
53+
// Apply persists the given ChangeSet to the DataStore.
54+
func (d *MockDataDestination) Apply(changeSet subsystems.ChangeSet, persist bool) {
55+
switch changeSet.IntentCode() {
56+
case subsystems.IntentTransferFull:
57+
d.setBasis(changeSet, persist)
58+
case subsystems.IntentTransferChanges:
59+
d.applyDelta(changeSet, persist)
60+
}
61+
}
5762

58-
collections, err := subsystems.ToStorableItems(events)
63+
// setBasis in this test implementation, delegates to d.DataStore.CapturedUpdates.
64+
func (d *MockDataDestination) setBasis(changeSet subsystems.ChangeSet, _ bool) {
65+
collections, err := changeSet.Collections()
5966
if err != nil {
60-
panic("MockDataDestination.SetBasis received malformed data: " + err.Error())
67+
panic("MockDataDestination.setBasis received malformed data: " + err.Error())
6168
}
6269

6370
for _, coll := range collections {
@@ -66,19 +73,16 @@ func (d *MockDataDestination) SetBasis(events []subsystems.Change, selector subs
6673

6774
if err := d.DataStore.Init(toposort.Sort(collections)); err == nil {
6875
d.lock.Lock()
69-
d.lastKnownSelector = selector
76+
d.lastKnownSelector = changeSet.Selector()
7077
d.lock.Unlock()
7178
}
7279
}
7380

74-
// ApplyDelta in this test implementation, delegates to d.DataStore.CapturedUpdates.
75-
func (d *MockDataDestination) ApplyDelta(events []subsystems.Change, selector subsystems.Selector, _ bool) {
76-
// For now, the selector is ignored. When the data sources start making use of it, it should be
77-
// stored so that assertions can be made.
78-
79-
collections, err := subsystems.ToStorableItems(events)
81+
// applyDelta in this test implementation, delegates to d.DataStore.CapturedUpdates.
82+
func (d *MockDataDestination) applyDelta(changeSet subsystems.ChangeSet, _ bool) {
83+
collections, err := changeSet.Collections()
8084
if err != nil {
81-
panic("MockDataDestination.ApplyDelta received malformed data: " + err.Error())
85+
panic("MockDataDestination.applyDelta received malformed data: " + err.Error())
8286
}
8387

8488
for _, coll := range collections {
@@ -94,7 +98,7 @@ func (d *MockDataDestination) ApplyDelta(events []subsystems.Change, selector su
9498
}
9599

96100
d.lock.Lock()
97-
d.lastKnownSelector = selector
101+
d.lastKnownSelector = changeSet.Selector()
98102
d.lock.Unlock()
99103
}
100104

ldfiledatav2/file_data_source_impl.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,11 @@ func (fs *fileDataSource) Fetch(ds subsystems.DataSelector, ctx context.Context)
138138
changeSetChan := fs.changeSetBroadcaster.AddListener()
139139
statusChan := fs.statusBroadcaster.AddListener()
140140

141+
changeset := subsystems.NewChangeSetBuilder().NoChanges()
142+
141143
var err error
142144
basis := &subsystems.Basis{
143-
ChangeSet: subsystems.ChangeSet{},
145+
ChangeSet: *changeset,
144146
Persist: false,
145147
}
146148

subsystems/changeset.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ package subsystems
33
import (
44
"encoding/json"
55
"errors"
6+
"sync"
7+
8+
"github.com/launchdarkly/go-jsonstream/v3/jreader"
9+
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"
610
)
711

812
// ChangeType specifies if an object is being upserted or deleted.
@@ -48,6 +52,9 @@ type ChangeSet struct {
4852
intentCode IntentCode
4953
changes []Change
5054
selector Selector
55+
56+
mu *sync.Mutex
57+
collection []ldstoretypes.Collection
5158
}
5259

5360
// IntentCode represents the intent of the changeset.
@@ -66,6 +73,70 @@ func (c *ChangeSet) Selector() Selector {
6673
return c.selector
6774
}
6875

76+
// Collections converts the changeset into a list of collections suitable for
77+
// insertion into a data store, relaying to FDv1 clients, etc.
78+
func (c *ChangeSet) Collections() ([]ldstoretypes.Collection, error) {
79+
c.mu.Lock()
80+
defer c.mu.Unlock()
81+
82+
if c.collection != nil {
83+
return c.collection, nil
84+
}
85+
86+
if c.changes == nil {
87+
c.collection = []ldstoretypes.Collection{}
88+
return c.collection, nil
89+
}
90+
91+
collection, err := toStorableItems(c.changes)
92+
if err != nil {
93+
return nil, err
94+
}
95+
96+
c.collection = collection
97+
return c.collection, nil
98+
}
99+
100+
// toStorableItems converts a list of FDv2 events to a list of collections suitable for insertion
101+
// into a data store.
102+
func toStorableItems(deltas []Change) ([]ldstoretypes.Collection, error) {
103+
collections := make(kindMap)
104+
for _, event := range deltas {
105+
kind, ok := event.Kind.ToFDV1()
106+
if !ok {
107+
// If we don't recognize this kind, it's not an error and should be ignored for forwards
108+
// compatibility.
109+
continue
110+
}
111+
112+
switch event.Action {
113+
case ChangeTypePut:
114+
// A put requires deserializing the item. We delegate to the optimized streaming JSON
115+
// parser.
116+
reader := jreader.NewReader(event.Object)
117+
item, err := kind.DeserializeFromJSONReader(&reader)
118+
if err != nil {
119+
return nil, err
120+
}
121+
collections[kind] = append(collections[kind], ldstoretypes.KeyedItemDescriptor{
122+
Key: event.Key,
123+
Item: item,
124+
})
125+
case ChangeTypeDelete:
126+
// A deletion is represented by a tombstone, which is an ItemDescriptor with a version and nil item.
127+
collections[kind] = append(collections[kind], ldstoretypes.KeyedItemDescriptor{
128+
Key: event.Key,
129+
Item: ldstoretypes.ItemDescriptor{Version: event.Version, Item: nil},
130+
})
131+
default:
132+
// An unknown action isn't an error, and should be ignored for forwards compatibility.
133+
continue
134+
}
135+
}
136+
137+
return collections.flatten(), nil
138+
}
139+
69140
// ChangeSetBuilder is a helper for constructing a ChangeSet.
70141
//
71142
// This type is not stable, and not subject to any backwards
@@ -96,6 +167,7 @@ func (c *ChangeSetBuilder) NoChanges() *ChangeSet {
96167
intentCode: IntentNone,
97168
selector: NoSelector(),
98169
changes: nil,
170+
mu: &sync.Mutex{},
99171
}
100172
}
101173

@@ -106,6 +178,7 @@ func (c *ChangeSetBuilder) Empty(selector Selector) *ChangeSet {
106178
intentCode: IntentTransferFull,
107179
selector: selector,
108180
changes: nil,
181+
mu: &sync.Mutex{},
109182
}
110183
}
111184

@@ -152,6 +225,7 @@ func (c *ChangeSetBuilder) Finish(selector Selector) (*ChangeSet, error) {
152225
intentCode: c.intent.Payload.Code,
153226
selector: selector,
154227
changes: c.changes,
228+
mu: &sync.Mutex{},
155229
}
156230
c.changes = nil
157231

0 commit comments

Comments
 (0)