Skip to content

Commit a5e0d3b

Browse files
Polishing
1 parent fada384 commit a5e0d3b

File tree

3 files changed

+34
-39
lines changed

3 files changed

+34
-39
lines changed

src/sync/polling/updaters/__tests__/splitChangesUpdater.spec.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock';
1313
import { telemetryTrackerFactory } from '../../../../trackers/telemetryTracker';
1414
import { splitNotifications } from '../../../streaming/__tests__/dataMocks';
1515
import { RBSegmentsCacheInMemory } from '../../../../storages/inMemory/RBSegmentsCacheInMemory';
16+
import { RB_SEGMENT_UPDATE, SPLIT_UPDATE } from '../../../streaming/constants';
1617

1718
const ARCHIVED_FF = 'ARCHIVED';
1819

@@ -202,7 +203,7 @@ describe('splitChangesUpdater', () => {
202203
const payload = notification.decoded as Pick<ISplit, 'name' | 'changeNumber' | 'killed' | 'defaultTreatment' | 'trafficTypeName' | 'conditions' | 'status' | 'seed' | 'trafficAllocation' | 'trafficAllocationSeed' | 'configurations'>;
203204
const changeNumber = payload.changeNumber;
204205

205-
await expect(splitChangesUpdater(undefined, undefined, { payload, changeNumber: changeNumber })).resolves.toBe(true);
206+
await expect(splitChangesUpdater(undefined, undefined, { payload, changeNumber: changeNumber, type: SPLIT_UPDATE })).resolves.toBe(true);
206207

207208
// fetch and RBSegments.update not being called
208209
expect(fetchSplitChanges).toBeCalledTimes(0);
@@ -226,7 +227,7 @@ describe('splitChangesUpdater', () => {
226227
const payload = { name: 'rbsegment', status: 'ACTIVE', changeNumber: 1684329854385, conditions: [] } as unknown as IRBSegment;
227228
const changeNumber = payload.changeNumber;
228229

229-
await expect(splitChangesUpdater(undefined, undefined, { payload, changeNumber: changeNumber })).resolves.toBe(true);
230+
await expect(splitChangesUpdater(undefined, undefined, { payload, changeNumber: changeNumber, type: RB_SEGMENT_UPDATE })).resolves.toBe(true);
230231

231232
// fetch and Splits.update not being called
232233
expect(fetchSplitChanges).toBeCalledTimes(0);
@@ -256,7 +257,7 @@ describe('splitChangesUpdater', () => {
256257
let calls = 0;
257258
// emit always if not configured sets
258259
for (const setMock of setMocks) {
259-
await expect(splitChangesUpdater(undefined, undefined, { payload: { ...payload, sets: setMock.sets, status: 'ACTIVE' }, changeNumber: index })).resolves.toBe(true);
260+
await expect(splitChangesUpdater(undefined, undefined, { payload: { ...payload, sets: setMock.sets, status: 'ACTIVE' }, changeNumber: index, type: SPLIT_UPDATE })).resolves.toBe(true);
260261
expect(splitsEmitSpy.mock.calls[index][0]).toBe('state::splits-arrived');
261262
index++;
262263
}
@@ -268,7 +269,7 @@ describe('splitChangesUpdater', () => {
268269
splitsEmitSpy.mockReset();
269270
index = 0;
270271
for (const setMock of setMocks) {
271-
await expect(splitChangesUpdater(undefined, undefined, { payload: { ...payload, sets: setMock.sets, status: 'ACTIVE' }, changeNumber: index })).resolves.toBe(true);
272+
await expect(splitChangesUpdater(undefined, undefined, { payload: { ...payload, sets: setMock.sets, status: 'ACTIVE' }, changeNumber: index, type: SPLIT_UPDATE })).resolves.toBe(true);
272273
if (setMock.shouldEmit) calls++;
273274
expect(splitsEmitSpy.mock.calls.length).toBe(calls);
274275
index++;

src/sync/polling/updaters/splitChangesUpdater.ts

Lines changed: 20 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ import { SYNC_SPLITS_FETCH, SYNC_SPLITS_UPDATE, SYNC_RBS_UPDATE, SYNC_SPLITS_FET
99
import { startsWith } from '../../../utils/lang';
1010
import { IN_RULE_BASED_SEGMENT, IN_SEGMENT } from '../../../utils/constants';
1111
import { setToArray } from '../../../utils/lang/sets';
12+
import { SPLIT_UPDATE } from '../../streaming/constants';
1213

13-
type ISplitChangesUpdater = (noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit | IRBSegment, changeNumber: number }) => Promise<boolean>
14+
export type InstantUpdate = { payload: ISplit | IRBSegment, changeNumber: number, type: string };
15+
type SplitChangesUpdater = (noCache?: boolean, till?: number, instantUpdate?: InstantUpdate) => Promise<boolean>
1416

1517
// Checks that all registered segments have been fetched (changeNumber !== -1 for every segment).
1618
// Returns a promise that could be rejected.
@@ -27,8 +29,9 @@ function checkAllSegmentsExist(segments: ISegmentsCacheBase): Promise<boolean> {
2729
* Collect segments from a raw split definition.
2830
* Exported for testing purposes.
2931
*/
30-
export function parseSegments({ conditions }: ISplit | IRBSegment, matcherType: typeof IN_SEGMENT | typeof IN_RULE_BASED_SEGMENT = IN_SEGMENT): Set<string> {
31-
let segments = new Set<string>();
32+
export function parseSegments(ruleEntity: ISplit | IRBSegment, matcherType: typeof IN_SEGMENT | typeof IN_RULE_BASED_SEGMENT = IN_SEGMENT): Set<string> {
33+
const { conditions, excluded } = ruleEntity as IRBSegment;
34+
const segments = new Set<string>(excluded && excluded.segments);
3235

3336
for (let i = 0; i < conditions.length; i++) {
3437
const matchers = conditions[i].matcherGroup.matchers;
@@ -67,26 +70,22 @@ function matchFilters(featureFlag: ISplit, filters: ISplitFiltersValidation) {
6770
return matchNames || matchPrefix;
6871
}
6972

70-
function isFF(ruleBasedEntity: IRBSegment | ISplit): ruleBasedEntity is ISplit {
71-
return (ruleBasedEntity as ISplit).defaultTreatment !== undefined;
72-
}
73-
7473
/**
7574
* Given the list of splits from /splitChanges endpoint, it returns the mutations,
7675
* i.e., an object with added splits, removed splits and used segments.
7776
* Exported for testing purposes.
7877
*/
7978
export function computeMutation<T extends ISplit | IRBSegment>(rules: Array<T>, segments: Set<string>, filters?: ISplitFiltersValidation): ISplitMutations<T> {
8079

81-
return rules.reduce((accum, ruleBasedEntity) => {
82-
if (ruleBasedEntity.status === 'ACTIVE' && (!filters || matchFilters(ruleBasedEntity as ISplit, filters))) {
83-
accum.added.push(ruleBasedEntity);
80+
return rules.reduce((accum, ruleEntity) => {
81+
if (ruleEntity.status === 'ACTIVE' && (!filters || matchFilters(ruleEntity as ISplit, filters))) {
82+
accum.added.push(ruleEntity);
8483

85-
parseSegments(ruleBasedEntity).forEach((segmentName: string) => {
84+
parseSegments(ruleEntity).forEach((segmentName: string) => {
8685
segments.add(segmentName);
8786
});
8887
} else {
89-
accum.removed.push(ruleBasedEntity);
88+
accum.removed.push(ruleEntity);
9089
}
9190

9291
return accum;
@@ -116,7 +115,7 @@ export function splitChangesUpdaterFactory(
116115
requestTimeoutBeforeReady: number = 0,
117116
retriesOnFailureBeforeReady: number = 0,
118117
isClientSide?: boolean
119-
): ISplitChangesUpdater {
118+
): SplitChangesUpdater {
120119
const { splits, rbSegments, segments } = storage;
121120

122121
let startingUp = true;
@@ -134,7 +133,7 @@ export function splitChangesUpdaterFactory(
134133
* @param noCache - true to revalidate data to fetch
135134
* @param till - query param to bypass CDN requests
136135
*/
137-
return function splitChangesUpdater(noCache?: boolean, till?: number, updateNotification?: { payload: ISplit | IRBSegment, changeNumber: number }) {
136+
return function splitChangesUpdater(noCache?: boolean, till?: number, instantUpdate?: InstantUpdate) {
138137

139138
/**
140139
* @param since - current changeNumber at splitsCache
@@ -144,15 +143,15 @@ export function splitChangesUpdaterFactory(
144143
const [since, rbSince] = sinces;
145144
log.debug(SYNC_SPLITS_FETCH, sinces);
146145
const fetcherPromise = Promise.resolve(
147-
updateNotification ?
148-
isFF(updateNotification.payload) ?
146+
instantUpdate ?
147+
instantUpdate.type === SPLIT_UPDATE ?
149148
// IFFU edge case: a change to a flag that adds an IN_RULE_BASED_SEGMENT matcher that is not present yet
150-
Promise.resolve(rbSegments.contains(parseSegments(updateNotification.payload, IN_RULE_BASED_SEGMENT))).then((contains) => {
149+
Promise.resolve(rbSegments.contains(parseSegments(instantUpdate.payload, IN_RULE_BASED_SEGMENT))).then((contains) => {
151150
return contains ?
152-
{ ff: { d: [updateNotification.payload as ISplit], t: updateNotification.changeNumber } } :
151+
{ ff: { d: [instantUpdate.payload as ISplit], t: instantUpdate.changeNumber } } :
153152
splitChangesFetcher(since, noCache, till, rbSince, _promiseDecorator);
154153
}) :
155-
{ rbs: { d: [updateNotification.payload as IRBSegment], t: updateNotification.changeNumber } } :
154+
{ rbs: { d: [instantUpdate.payload as IRBSegment], t: instantUpdate.changeNumber } } :
156155
splitChangesFetcher(since, noCache, till, rbSince, _promiseDecorator)
157156
)
158157
.then((splitChanges: ISplitChangesResponse) => {
@@ -180,14 +179,8 @@ export function splitChangesUpdaterFactory(
180179
]).then(([ffChanged, rbsChanged]) => {
181180
if (splitsEventEmitter) {
182181
// To emit SDK_SPLITS_ARRIVED for server-side SDK, we must check that all registered segments have been fetched
183-
return Promise.resolve(!splitsEventEmitter.splitsArrived ||
184-
(
185-
(!splitChanges.ff || since !== splitChanges.ff.t) &&
186-
(!splitChanges.rbs || rbSince !== splitChanges.rbs.t) &&
187-
(ffChanged || rbsChanged) &&
188-
(isClientSide || checkAllSegmentsExist(segments))
189-
)
190-
)
182+
return Promise.resolve(!splitsEventEmitter.splitsArrived || ((ffChanged || rbsChanged) && (isClientSide || checkAllSegmentsExist(segments))))
183+
.catch(() => false /** noop. just to handle a possible `checkAllSegmentsExist` rejection, before emitting SDK event */)
191184
.then(emitSplitsArrivedEvent => {
192185
// emit SDK events
193186
if (emitSplitsArrivedEvent) splitsEventEmitter.emit(SDK_SPLITS_ARRIVED);

src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { ITelemetryTracker } from '../../../trackers/types';
88
import { Backoff } from '../../../utils/Backoff';
99
import { SPLITS } from '../../../utils/constants';
1010
import { ISegmentsSyncTask, ISplitsSyncTask } from '../../polling/types';
11+
import { InstantUpdate } from '../../polling/updaters/splitChangesUpdater';
1112
import { RB_SEGMENT_UPDATE } from '../constants';
1213
import { parseFFUpdatePayload } from '../parseUtils';
1314
import { ISplitKillData, ISplitUpdateData } from '../SSEHandler/types';
@@ -24,26 +25,26 @@ export function SplitsUpdateWorker(log: ILogger, storage: IStorageSync, splitsSy
2425
let handleNewEvent = false;
2526
let isHandlingEvent: boolean;
2627
let cdnBypass: boolean;
27-
let payload: ISplit | IRBSegment | undefined;
28+
let instantUpdate: InstantUpdate | undefined;
2829
const backoff = new Backoff(__handleSplitUpdateCall, FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT);
2930

3031
function __handleSplitUpdateCall() {
3132
isHandlingEvent = true;
3233
if (maxChangeNumber > cache.getChangeNumber()) {
3334
handleNewEvent = false;
34-
const splitUpdateNotification = payload ? { payload, changeNumber: maxChangeNumber } : undefined;
3535
// fetch splits revalidating data if cached
36-
splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, splitUpdateNotification).then(() => {
36+
splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, instantUpdate).then(() => {
3737
if (!isHandlingEvent) return; // halt if `stop` has been called
3838
if (handleNewEvent) {
3939
__handleSplitUpdateCall();
4040
} else {
41-
if (splitUpdateNotification) telemetryTracker.trackUpdatesFromSSE(SPLITS);
41+
if (instantUpdate) telemetryTracker.trackUpdatesFromSSE(SPLITS);
4242
// fetch new registered segments for server-side API. Not retrying on error
4343
if (segmentsSyncTask) segmentsSyncTask.execute(true);
4444

4545
const attempts = backoff.attempts + 1;
4646

47+
// @TODO and with RBS and FF
4748
if (maxChangeNumber <= cache.getChangeNumber()) {
4849
log.debug(`Refresh completed${cdnBypass ? ' bypassing the CDN' : ''} in ${attempts} attempts.`);
4950
isHandlingEvent = false;
@@ -76,18 +77,18 @@ export function SplitsUpdateWorker(log: ILogger, storage: IStorageSync, splitsSy
7677
*
7778
* @param changeNumber - change number of the notification
7879
*/
79-
put({ changeNumber, pcn }: ISplitUpdateData, _payload?: ISplit | IRBSegment) {
80+
put({ changeNumber, pcn, type }: ISplitUpdateData, payload?: ISplit | IRBSegment) {
8081
const currentChangeNumber = cache.getChangeNumber();
8182

8283
if (changeNumber <= currentChangeNumber || changeNumber <= maxChangeNumber) return;
8384

8485
maxChangeNumber = changeNumber;
8586
handleNewEvent = true;
8687
cdnBypass = false;
87-
payload = undefined;
88+
instantUpdate = undefined;
8889

89-
if (_payload && currentChangeNumber === pcn) {
90-
payload = _payload;
90+
if (payload && currentChangeNumber === pcn) {
91+
instantUpdate = { payload, changeNumber, type };
9192
}
9293

9394
if (backoff.timeoutID || !isHandlingEvent) __handleSplitUpdateCall();

0 commit comments

Comments
 (0)