Skip to content

Commit 988003b

Browse files
Merge pull request #390 from splitio/rb_segments_storage_consumer_mode
[Rule-based segments] Add storage interface and implementations for Redis and Pluggable storages
2 parents 6e7d790 + 261b86e commit 988003b

File tree

8 files changed

+277
-2
lines changed

8 files changed

+277
-2
lines changed

src/dtos/types.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,17 @@ export interface ISplitCondition {
194194
conditionType: 'ROLLOUT' | 'WHITELIST'
195195
}
196196

197+
export interface IRBSegment {
198+
name: string,
199+
changeNumber: number,
200+
status: 'ACTIVE' | 'ARCHIVED',
201+
excluded: {
202+
keys: string[],
203+
segments: string[]
204+
},
205+
conditions: ISplitCondition[],
206+
}
207+
197208
export interface ISplit {
198209
name: string,
199210
changeNumber: number,

src/storages/KeyBuilder.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,18 @@ export class KeyBuilder {
3737
return `${this.prefix}.split.`;
3838
}
3939

40+
buildRBSegmentKey(splitName: string) {
41+
return `${this.prefix}.rbsegment.${splitName}`;
42+
}
43+
44+
buildRBSegmentsTillKey() {
45+
return `${this.prefix}.rbsegments.till`;
46+
}
47+
48+
buildRBSegmentKeyPrefix() {
49+
return `${this.prefix}.rbsegment.`;
50+
}
51+
4052
buildSegmentNameKey(segmentName: string) {
4153
return `${this.prefix}.segment.${segmentName}`;
4254
}

src/storages/KeyBuilderSS.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ export class KeyBuilderSS extends KeyBuilder {
5353
return `${this.buildSplitKeyPrefix()}*`;
5454
}
5555

56+
searchPatternForRBSegmentKeys() {
57+
return `${this.buildRBSegmentKeyPrefix()}*`;
58+
}
59+
5660
/* Telemetry keys */
5761

5862
buildLatencyKey(method: Method, bucket: number) {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import { RBSegmentsCacheInRedis } from '../inRedis/RBSegmentsCacheInRedis';
2+
import { RBSegmentsCachePluggable } from '../pluggable/RBSegmentsCachePluggable';
3+
import { KeyBuilderSS } from '../KeyBuilderSS';
4+
import { rbSegment, rbSegmentWithInSegmentMatcher } from '../__tests__/testUtils';
5+
import { loggerMock } from '../../logger/__tests__/sdkLogger.mock';
6+
import { metadata } from './KeyBuilder.spec';
7+
import { RedisAdapter } from '../inRedis/RedisAdapter';
8+
import { wrapperMockFactory } from '../pluggable/__tests__/wrapper.mock';
9+
10+
const keys = new KeyBuilderSS('RBSEGMENT', metadata);
11+
12+
const redisClient = new RedisAdapter(loggerMock);
13+
const cacheInRedis = new RBSegmentsCacheInRedis(loggerMock, keys, redisClient);
14+
15+
const storageWrapper = wrapperMockFactory();
16+
const cachePluggable = new RBSegmentsCachePluggable(loggerMock, keys, storageWrapper);
17+
18+
describe.each([{ cache: cacheInRedis, wrapper: redisClient }, { cache: cachePluggable, wrapper: storageWrapper }])('Rule-based segments cache async (Redis & Pluggable)', ({ cache, wrapper }) => {
19+
20+
afterAll(async () => {
21+
await wrapper.del(keys.buildRBSegmentsTillKey());
22+
await wrapper.disconnect();
23+
});
24+
25+
test('update should add and remove segments correctly', async () => {
26+
// Add segments
27+
expect(await cache.update([rbSegment, rbSegmentWithInSegmentMatcher], [], 1)).toBe(true);
28+
expect(await cache.get(rbSegment.name)).toEqual(rbSegment);
29+
expect(await cache.get(rbSegmentWithInSegmentMatcher.name)).toEqual(rbSegmentWithInSegmentMatcher);
30+
expect(await cache.getChangeNumber()).toBe(1);
31+
32+
// Remove a segment
33+
expect(await cache.update([], [rbSegment], 2)).toBe(true);
34+
expect(await cache.get(rbSegment.name)).toBeNull();
35+
expect(await cache.get(rbSegmentWithInSegmentMatcher.name)).toEqual(rbSegmentWithInSegmentMatcher);
36+
expect(await cache.getChangeNumber()).toBe(2);
37+
38+
// Remove remaining segment
39+
expect(await cache.update([], [rbSegmentWithInSegmentMatcher], 3)).toBe(true);
40+
expect(await cache.get(rbSegment.name)).toBeNull();
41+
expect(await cache.get(rbSegmentWithInSegmentMatcher.name)).toBeNull();
42+
expect(await cache.getChangeNumber()).toBe(3);
43+
44+
// No changes
45+
expect(await cache.update([], [rbSegmentWithInSegmentMatcher], 4)).toBe(false);
46+
expect(await cache.getChangeNumber()).toBe(4);
47+
});
48+
49+
test('contains should check for segment existence correctly', async () => {
50+
await cache.update([rbSegment, rbSegmentWithInSegmentMatcher], [], 1);
51+
52+
expect(await cache.contains(new Set([rbSegment.name]))).toBe(true);
53+
expect(await cache.contains(new Set([rbSegment.name, rbSegmentWithInSegmentMatcher.name]))).toBe(true);
54+
expect(await cache.contains(new Set(['nonexistent']))).toBe(false);
55+
expect(await cache.contains(new Set([rbSegment.name, 'nonexistent']))).toBe(false);
56+
57+
await cache.update([], [rbSegment, rbSegmentWithInSegmentMatcher], 2);
58+
});
59+
});

src/storages/__tests__/testUtils.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ISplit } from '../../dtos/types';
1+
import { IRBSegment, ISplit } from '../../dtos/types';
22
import { IStorageSync, IStorageAsync, IImpressionsCacheSync, IEventsCacheSync } from '../types';
33

44
// Assert that instances created by storage factories have the expected interface
@@ -45,3 +45,9 @@ export const featureFlagTwo: ISplit = { name: 'ff_two', sets: ['t','w','o'] };
4545
export const featureFlagThree: ISplit = { name: 'ff_three', sets: ['t','h','r','e'] };
4646
//@ts-ignore
4747
export const featureFlagWithoutFS: ISplit = { name: 'ff_four' };
48+
49+
// Rule-based segments
50+
//@ts-ignore
51+
export const rbSegment: IRBSegment = { name: 'rb_segment', conditions: [{ matcherGroup: { matchers: [{ matcherType: 'EQUAL_TO', unaryNumericMatcherData: { value: 10 } }] } }] };
52+
//@ts-ignore
53+
export const rbSegmentWithInSegmentMatcher: IRBSegment = { name: 'rb_segment_with_in_segment_matcher', conditions: [{ matcherGroup: { matchers: [{ matcherType: 'IN_SEGMENT', userDefinedSegmentMatcherData: { segmentName: 'employees' } }] } }] };
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import { isNaNNumber } from '../../utils/lang';
2+
import { IRBSegmentsCacheAsync } from '../types';
3+
import { ILogger } from '../../logger/types';
4+
import { IRBSegment } from '../../dtos/types';
5+
import { LOG_PREFIX } from './constants';
6+
import { setToArray } from '../../utils/lang/sets';
7+
import { RedisAdapter } from './RedisAdapter';
8+
import { KeyBuilderSS } from '../KeyBuilderSS';
9+
10+
export class RBSegmentsCacheInRedis implements IRBSegmentsCacheAsync {
11+
12+
private readonly log: ILogger;
13+
private readonly keys: KeyBuilderSS;
14+
private readonly redis: RedisAdapter;
15+
16+
constructor(log: ILogger, keys: KeyBuilderSS, redis: RedisAdapter) {
17+
this.log = log;
18+
this.keys = keys;
19+
this.redis = redis;
20+
}
21+
22+
get(name: string): Promise<IRBSegment | null> {
23+
return this.redis.get(this.keys.buildRBSegmentKey(name))
24+
.then(maybeRBSegment => maybeRBSegment && JSON.parse(maybeRBSegment));
25+
}
26+
27+
private getNames(): Promise<string[]> {
28+
return this.redis.keys(this.keys.searchPatternForRBSegmentKeys()).then(
29+
(listOfKeys) => listOfKeys.map(this.keys.extractKey)
30+
);
31+
}
32+
33+
contains(names: Set<string>): Promise<boolean> {
34+
const namesArray = setToArray(names);
35+
return this.getNames().then(namesInStorage => {
36+
return namesArray.every(name => namesInStorage.includes(name));
37+
});
38+
}
39+
40+
update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): Promise<boolean> {
41+
return Promise.all([
42+
this.setChangeNumber(changeNumber),
43+
Promise.all(toAdd.map(toAdd => {
44+
const key = this.keys.buildRBSegmentKey(toAdd.name);
45+
const stringifiedNewRBSegment = JSON.stringify(toAdd);
46+
return this.redis.set(key, stringifiedNewRBSegment).then(() => true);
47+
})),
48+
Promise.all(toRemove.map(toRemove => {
49+
const key = this.keys.buildRBSegmentKey(toRemove.name);
50+
return this.redis.del(key).then(status => status === 1);
51+
}))
52+
]).then(([, added, removed]) => {
53+
return added.some(result => result) || removed.some(result => result);
54+
});
55+
}
56+
57+
setChangeNumber(changeNumber: number) {
58+
return this.redis.set(this.keys.buildRBSegmentsTillKey(), changeNumber + '').then(
59+
status => status === 'OK'
60+
);
61+
}
62+
63+
getChangeNumber(): Promise<number> {
64+
return this.redis.get(this.keys.buildRBSegmentsTillKey()).then((value: string | null) => {
65+
const i = parseInt(value as string, 10);
66+
67+
return isNaNNumber(i) ? -1 : i;
68+
}).catch((e) => {
69+
this.log.error(LOG_PREFIX + 'Could not retrieve changeNumber from storage. Error: ' + e);
70+
return -1;
71+
});
72+
}
73+
74+
// @TODO implement if required by DataLoader or producer mode
75+
clear() {
76+
return Promise.resolve();
77+
}
78+
79+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import { isNaNNumber } from '../../utils/lang';
2+
import { KeyBuilder } from '../KeyBuilder';
3+
import { IPluggableStorageWrapper, IRBSegmentsCacheAsync } from '../types';
4+
import { ILogger } from '../../logger/types';
5+
import { IRBSegment } from '../../dtos/types';
6+
import { LOG_PREFIX } from './constants';
7+
import { setToArray } from '../../utils/lang/sets';
8+
9+
export class RBSegmentsCachePluggable implements IRBSegmentsCacheAsync {
10+
11+
private readonly log: ILogger;
12+
private readonly keys: KeyBuilder;
13+
private readonly wrapper: IPluggableStorageWrapper;
14+
15+
constructor(log: ILogger, keys: KeyBuilder, wrapper: IPluggableStorageWrapper) {
16+
this.log = log;
17+
this.keys = keys;
18+
this.wrapper = wrapper;
19+
}
20+
21+
get(name: string): Promise<IRBSegment | null> {
22+
return this.wrapper.get(this.keys.buildRBSegmentKey(name))
23+
.then(maybeRBSegment => maybeRBSegment && JSON.parse(maybeRBSegment));
24+
}
25+
26+
private getNames(): Promise<string[]> {
27+
return this.wrapper.getKeysByPrefix(this.keys.buildRBSegmentKeyPrefix()).then(
28+
(listOfKeys) => listOfKeys.map(this.keys.extractKey)
29+
);
30+
}
31+
32+
contains(names: Set<string>): Promise<boolean> {
33+
const namesArray = setToArray(names);
34+
return this.getNames().then(namesInStorage => {
35+
return namesArray.every(name => namesInStorage.includes(name));
36+
});
37+
}
38+
39+
update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): Promise<boolean> {
40+
return Promise.all([
41+
this.setChangeNumber(changeNumber),
42+
Promise.all(toAdd.map(toAdd => {
43+
const key = this.keys.buildRBSegmentKey(toAdd.name);
44+
const stringifiedNewRBSegment = JSON.stringify(toAdd);
45+
return this.wrapper.set(key, stringifiedNewRBSegment).then(() => true);
46+
})),
47+
Promise.all(toRemove.map(toRemove => {
48+
const key = this.keys.buildRBSegmentKey(toRemove.name);
49+
return this.wrapper.del(key);
50+
}))
51+
]).then(([, added, removed]) => {
52+
return added.some(result => result) || removed.some(result => result);
53+
});
54+
}
55+
56+
setChangeNumber(changeNumber: number) {
57+
return this.wrapper.set(this.keys.buildRBSegmentsTillKey(), changeNumber + '');
58+
}
59+
60+
getChangeNumber(): Promise<number> {
61+
return this.wrapper.get(this.keys.buildRBSegmentsTillKey()).then((value) => {
62+
const i = parseInt(value as string, 10);
63+
64+
return isNaNNumber(i) ? -1 : i;
65+
}).catch((e) => {
66+
this.log.error(LOG_PREFIX + 'Could not retrieve changeNumber from storage. Error: ' + e);
67+
return -1;
68+
});
69+
}
70+
71+
// @TODO implement if required by DataLoader or producer mode
72+
clear() {
73+
return Promise.resolve();
74+
}
75+
76+
}

src/storages/types.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import SplitIO from '../../types/splitio';
2-
import { MaybeThenable, ISplit, IMySegmentsResponse } from '../dtos/types';
2+
import { MaybeThenable, ISplit, IRBSegment, IMySegmentsResponse } from '../dtos/types';
33
import { MySegmentsData } from '../sync/polling/types';
44
import { EventDataType, HttpErrors, HttpLatencies, ImpressionDataType, LastSync, Method, MethodExceptions, MethodLatencies, MultiMethodExceptions, MultiMethodLatencies, MultiConfigs, OperationType, StoredEventWithMetadata, StoredImpressionWithMetadata, StreamingEvent, UniqueKeysPayloadCs, UniqueKeysPayloadSs, TelemetryUsageStatsPayload, UpdatesFromSSEEnum } from '../sync/submitters/types';
55
import { ISettings } from '../types';
@@ -225,6 +225,34 @@ export interface ISplitsCacheAsync extends ISplitsCacheBase {
225225
getNamesByFlagSets(flagSets: string[]): Promise<Set<string>[]>
226226
}
227227

228+
/** Rule-Based Segments cache */
229+
230+
export interface IRBSegmentsCacheBase {
231+
update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): MaybeThenable<boolean>,
232+
get(name: string): MaybeThenable<IRBSegment | null>,
233+
getChangeNumber(): MaybeThenable<number>,
234+
clear(): MaybeThenable<boolean | void>,
235+
contains(names: Set<string>): MaybeThenable<boolean>,
236+
}
237+
238+
export interface IRBSegmentsCacheSync extends IRBSegmentsCacheBase {
239+
update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): boolean,
240+
get(name: string): IRBSegment | null,
241+
getChangeNumber(): number,
242+
clear(): void,
243+
contains(names: Set<string>): boolean,
244+
// Used only for smart pausing in client-side standalone. Returns true if the storage contains a RBSegment using segments or large segments matchers
245+
usesSegments(): boolean,
246+
}
247+
248+
export interface IRBSegmentsCacheAsync extends IRBSegmentsCacheBase {
249+
update(toAdd: IRBSegment[], toRemove: IRBSegment[], changeNumber: number): Promise<boolean>,
250+
get(name: string): Promise<IRBSegment | null>,
251+
getChangeNumber(): Promise<number>,
252+
clear(): Promise<boolean | void>,
253+
contains(names: Set<string>): Promise<boolean>,
254+
}
255+
228256
/** Segments cache */
229257

230258
export interface ISegmentsCacheBase {

0 commit comments

Comments
 (0)