Skip to content

Commit f2b18df

Browse files
Refactor SplitsUpdateWorker to force sync based on splits and rbSegments change number
1 parent a5e0d3b commit f2b18df

File tree

2 files changed

+46
-43
lines changed

2 files changed

+46
-43
lines changed

src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ import { IUpdateWorker } from './types';
2020
*/
2121
export function SplitsUpdateWorker(log: ILogger, storage: IStorageSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, telemetryTracker: ITelemetryTracker, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker<[updateData: ISplitUpdateData]> & { killSplit(event: ISplitKillData): void } {
2222

23+
const ff = SplitsUpdateWorker(storage.splits);
24+
const rbs = SplitsUpdateWorker(storage.rbSegments);
25+
2326
function SplitsUpdateWorker(cache: ISplitsCacheSync | IRBSegmentsCacheSync) {
24-
let maxChangeNumber = 0;
27+
let maxChangeNumber = -1;
2528
let handleNewEvent = false;
2629
let isHandlingEvent: boolean;
2730
let cdnBypass: boolean;
@@ -44,8 +47,7 @@ export function SplitsUpdateWorker(log: ILogger, storage: IStorageSync, splitsSy
4447

4548
const attempts = backoff.attempts + 1;
4649

47-
// @TODO and with RBS and FF
48-
if (maxChangeNumber <= cache.getChangeNumber()) {
50+
if (ff.isSync() && rbs.isSync()) {
4951
log.debug(`Refresh completed${cdnBypass ? ' bypassing the CDN' : ''} in ${attempts} attempts.`);
5052
isHandlingEvent = false;
5153
return;
@@ -97,19 +99,20 @@ export function SplitsUpdateWorker(log: ILogger, storage: IStorageSync, splitsSy
9799
stop() {
98100
isHandlingEvent = false;
99101
backoff.reset();
102+
},
103+
isSync() {
104+
return maxChangeNumber <= cache.getChangeNumber();
100105
}
101106
};
102107
}
103108

104-
const ff = SplitsUpdateWorker(storage.splits);
105-
const rbs = SplitsUpdateWorker(storage.rbSegments);
106-
107109
return {
108110
put(parsedData) {
109111
if (parsedData.d && parsedData.c !== undefined) {
110112
try {
111113
const payload = parseFFUpdatePayload(parsedData.c, parsedData.d);
112114
if (payload) {
115+
console.log('payload ', JSON.stringify(payload));
113116
(parsedData.type === RB_SEGMENT_UPDATE ? rbs : ff).put(parsedData, payload);
114117
return;
115118
}

src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// @ts-nocheck
22
import { SDK_SPLITS_ARRIVED } from '../../../../readiness/constants';
33
import { SplitsCacheInMemory } from '../../../../storages/inMemory/SplitsCacheInMemory';
4+
import { RBSegmentsCacheInMemory } from '../../../../storages/inMemory/RBSegmentsCacheInMemory';
45
import { SplitsUpdateWorker } from '../SplitsUpdateWorker';
56
import { FETCH_BACKOFF_MAX_RETRIES } from '../constants';
67
import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock';
@@ -53,19 +54,26 @@ const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker
5354

5455
describe('SplitsUpdateWorker', () => {
5556

57+
const storage = {
58+
splits: new SplitsCacheInMemory(),
59+
rbSegments: new RBSegmentsCacheInMemory()
60+
};
61+
5662
afterEach(() => { // restore
5763
Backoff.__TEST__BASE_MILLIS = undefined;
5864
Backoff.__TEST__MAX_MILLIS = undefined;
65+
66+
storage.splits.clear();
67+
storage.rbSegments.clear();
5968
});
6069

6170
test('put', async () => {
6271

6372
// setup
64-
const cache = new SplitsCacheInMemory();
65-
const splitsSyncTask = splitsSyncTaskMock(cache);
73+
const splitsSyncTask = splitsSyncTaskMock(storage.splits);
6674

6775
Backoff.__TEST__BASE_MILLIS = 1; // retry immediately
68-
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
76+
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);
6977

7078
// assert calling `splitsSyncTask.execute` if `isExecuting` is false
7179
expect(splitsSyncTask.isExecuting()).toBe(false);
@@ -102,9 +110,8 @@ describe('SplitsUpdateWorker', () => {
102110
test('put, backoff', async () => {
103111
// setup
104112
Backoff.__TEST__BASE_MILLIS = 50;
105-
const cache = new SplitsCacheInMemory();
106-
const splitsSyncTask = splitsSyncTaskMock(cache, [90, 90, 90]);
107-
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
113+
const splitsSyncTask = splitsSyncTaskMock(storage.splits, [90, 90, 90]);
114+
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);
108115

109116
// while fetch fails, should retry with backoff
110117
splitUpdateWorker.put({ changeNumber: 100 });
@@ -121,9 +128,8 @@ describe('SplitsUpdateWorker', () => {
121128
// setup
122129
Backoff.__TEST__BASE_MILLIS = 10; // 10 millis instead of 10 sec
123130
Backoff.__TEST__MAX_MILLIS = 60; // 60 millis instead of 1 min
124-
const cache = new SplitsCacheInMemory();
125-
const splitsSyncTask = splitsSyncTaskMock(cache, [...Array(FETCH_BACKOFF_MAX_RETRIES).fill(90), 90, 100]); // 12 executions. Last one is valid
126-
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
131+
const splitsSyncTask = splitsSyncTaskMock(storage.splits, [...Array(FETCH_BACKOFF_MAX_RETRIES).fill(90), 90, 100]); // 12 executions. Last one is valid
132+
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);
127133

128134
splitUpdateWorker.put({ changeNumber: 100 }); // queued
129135

@@ -146,9 +152,8 @@ describe('SplitsUpdateWorker', () => {
146152
// setup
147153
Backoff.__TEST__BASE_MILLIS = 10; // 10 millis instead of 10 sec
148154
Backoff.__TEST__MAX_MILLIS = 60; // 60 millis instead of 1 min
149-
const cache = new SplitsCacheInMemory();
150-
const splitsSyncTask = splitsSyncTaskMock(cache, Array(FETCH_BACKOFF_MAX_RETRIES * 2).fill(90)); // 20 executions. No one is valid
151-
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
155+
const splitsSyncTask = splitsSyncTaskMock(storage.splits, Array(FETCH_BACKOFF_MAX_RETRIES * 2).fill(90)); // 20 executions. No one is valid
156+
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);
152157

153158
splitUpdateWorker.put({ changeNumber: 100 }); // queued
154159

@@ -168,18 +173,17 @@ describe('SplitsUpdateWorker', () => {
168173

169174
test('killSplit', async () => {
170175
// setup
171-
const cache = new SplitsCacheInMemory();
172-
cache.addSplit({ name: 'something' });
173-
cache.addSplit({ name: 'something else' });
176+
storage.splits.addSplit({ name: 'something' });
177+
storage.splits.addSplit({ name: 'something else' });
174178

175-
const splitsSyncTask = splitsSyncTaskMock(cache);
176-
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, splitsEventEmitterMock, telemetryTracker);
179+
const splitsSyncTask = splitsSyncTaskMock(storage.splits);
180+
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, splitsEventEmitterMock, telemetryTracker);
177181

178182
// assert killing split locally, emitting SDK_SPLITS_ARRIVED event, and synchronizing splits if changeNumber is new
179183
splitUpdateWorker.killSplit({ changeNumber: 100, splitName: 'something', defaultTreatment: 'off' }); // splitsCache.killLocally is synchronous
180184
expect(splitsSyncTask.execute).toBeCalledTimes(1); // synchronizes splits if `isExecuting` is false
181185
expect(splitsEventEmitterMock.emit.mock.calls).toEqual([[SDK_SPLITS_ARRIVED, true]]); // emits `SDK_SPLITS_ARRIVED` with `isSplitKill` flag in true, if split kill resolves with update
182-
assertKilledSplit(cache, 100, 'something', 'off');
186+
assertKilledSplit(storage.splits, 100, 'something', 'off');
183187

184188
// assert not killing split locally, not emitting SDK_SPLITS_ARRIVED event, and not synchronizes splits, if changeNumber is old
185189
splitsSyncTask.__resolveSplitsUpdaterCall(100);
@@ -192,15 +196,14 @@ describe('SplitsUpdateWorker', () => {
192196
expect(splitsSyncTask.execute).toBeCalledTimes(0); // doesn't synchronize splits if killLocally resolved without update
193197
expect(splitsEventEmitterMock.emit).toBeCalledTimes(0); // doesn't emit `SDK_SPLITS_ARRIVED` if killLocally resolved without update
194198

195-
assertKilledSplit(cache, 100, 'something', 'off'); // calling `killLocally` with an old changeNumber made no effect
199+
assertKilledSplit(storage.splits, 100, 'something', 'off'); // calling `killLocally` with an old changeNumber made no effect
196200
});
197201

198202
test('stop', async () => {
199203
// setup
200-
const cache = new SplitsCacheInMemory();
201-
const splitsSyncTask = splitsSyncTaskMock(cache, [95]);
204+
const splitsSyncTask = splitsSyncTaskMock(storage.splits, [95]);
202205
Backoff.__TEST__BASE_MILLIS = 1;
203-
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
206+
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);
204207

205208
splitUpdateWorker.put({ changeNumber: 100 });
206209

@@ -212,11 +215,10 @@ describe('SplitsUpdateWorker', () => {
212215

213216
test('put, avoid fetching if payload sent', async () => {
214217

215-
const cache = new SplitsCacheInMemory();
216218
splitNotifications.forEach(notification => {
217-
const pcn = cache.getChangeNumber();
218-
const splitsSyncTask = splitsSyncTaskMock(cache);
219-
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
219+
const pcn = storage.splits.getChangeNumber();
220+
const splitsSyncTask = splitsSyncTaskMock(storage.splits);
221+
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);
220222
const payload = notification.decoded;
221223
const changeNumber = payload.changeNumber;
222224
splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression }); // queued
@@ -226,17 +228,15 @@ describe('SplitsUpdateWorker', () => {
226228
});
227229

228230
test('put, ccn and pcn validation for IFF', () => {
229-
const cache = new SplitsCacheInMemory();
230-
231231
// ccn = 103 & pcn = 104: Something was missed -> fetch split changes
232232
let ccn = 103;
233233
let pcn = 104;
234234
let changeNumber = 105;
235-
cache.setChangeNumber(ccn);
235+
storage.splits.setChangeNumber(ccn);
236236
const notification = splitNotifications[0];
237237

238-
let splitsSyncTask = splitsSyncTaskMock(cache);
239-
let splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
238+
let splitsSyncTask = splitsSyncTaskMock(storage.splits);
239+
let splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);
240240
splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression });
241241
expect(splitsSyncTask.execute).toBeCalledTimes(1);
242242
expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]);
@@ -246,10 +246,10 @@ describe('SplitsUpdateWorker', () => {
246246
ccn = 110;
247247
pcn = 0;
248248
changeNumber = 111;
249-
cache.setChangeNumber(ccn);
249+
storage.splits.setChangeNumber(ccn);
250250

251-
splitsSyncTask = splitsSyncTaskMock(cache);
252-
splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
251+
splitsSyncTask = splitsSyncTaskMock(storage.splits);
252+
splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);
253253
splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression });
254254
expect(splitsSyncTask.execute).toBeCalledTimes(1);
255255
expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]);
@@ -259,10 +259,10 @@ describe('SplitsUpdateWorker', () => {
259259
ccn = 120;
260260
pcn = 120;
261261
changeNumber = 121;
262-
cache.setChangeNumber(ccn);
262+
storage.splits.setChangeNumber(ccn);
263263

264-
splitsSyncTask = splitsSyncTaskMock(cache);
265-
splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
264+
splitsSyncTask = splitsSyncTaskMock(storage.splits);
265+
splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);
266266
splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression });
267267
expect(splitsSyncTask.execute).toBeCalledTimes(1);
268268
expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, { payload: notification.decoded, changeNumber }]);

0 commit comments

Comments
 (0)