Skip to content

Commit 1cecdf8

Browse files
Handle RBSEGMENT_UPDATE notification
1 parent 17bbd36 commit 1cecdf8

File tree

11 files changed

+116
-98
lines changed

11 files changed

+116
-98
lines changed

src/logger/messages/warn.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ export const codesWarn: [number, string][] = codesError.concat([
3333
[c.WARN_SDK_KEY, c.LOG_PREFIX_SETTINGS + ': You already have %s. We recommend keeping only one instance of the factory at all times (Singleton pattern) and reusing it throughout your application'],
3434

3535
[c.STREAMING_PARSING_MEMBERSHIPS_UPDATE, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching Memberships due to an error processing %s notification: %s'],
36-
[c.STREAMING_PARSING_SPLIT_UPDATE, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching SplitChanges due to an error processing SPLIT_UPDATE notification: %s'],
36+
[c.STREAMING_PARSING_SPLIT_UPDATE, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching SplitChanges due to an error processing %s notification: %s'],
3737
[c.WARN_INVALID_FLAGSET, '%s: you passed %s, flag set must adhere to the regular expressions %s. This means a flag set must start with a letter or number, be in lowercase, alphanumeric and have a max length of 50 characters. %s was discarded.'],
3838
[c.WARN_LOWERCASE_FLAGSET, '%s: flag set %s should be all lowercase - converting string to lowercase.'],
3939
[c.WARN_FLAGSET_WITHOUT_FLAGS, '%s: you passed %s flag set that does not contain cached feature flag names. Please double check what flag sets are in use in the Split user interface.'],

src/sync/polling/types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
import { ISplit } from '../../dtos/types';
1+
import { IRBSegment, ISplit } from '../../dtos/types';
22
import { IReadinessManager } from '../../readiness/types';
33
import { IStorageSync } from '../../storages/types';
44
import { MEMBERSHIPS_LS_UPDATE, MEMBERSHIPS_MS_UPDATE } from '../streaming/types';
55
import { ITask, ISyncTask } from '../types';
66

7-
export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }], boolean> { }
7+
export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit | IRBSegment, changeNumber: number }], boolean> { }
88

99
export interface ISegmentsSyncTask extends ISyncTask<[fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number], boolean> { }
1010

src/sync/polling/updaters/segmentChangesUpdater.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ export function segmentChangesUpdaterFactory(
5151
* Returned promise will not be rejected.
5252
*
5353
* @param fetchOnlyNew - if true, only fetch the segments that not exists, i.e., which `changeNumber` is equal to -1.
54-
* This param is used by SplitUpdateWorker on server-side SDK, to fetch new registered segments on SPLIT_UPDATE notifications.
54+
* This param is used by SplitUpdateWorker on server-side SDK, to fetch new registered segments on SPLIT_UPDATE or RBSEGMENT_UPDATE notifications.
5555
* @param segmentName - segment name to fetch. By passing `undefined` it fetches the list of segments registered at the storage
5656
* @param noCache - true to revalidate data to fetch on a SEGMENT_UPDATE notifications.
5757
* @param till - till target for the provided segmentName, for CDN bypass.

src/sync/streaming/SSEHandler/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { errorParser, messageParser } from './NotificationParser';
22
import { notificationKeeperFactory } from './NotificationKeeper';
3-
import { PUSH_RETRYABLE_ERROR, PUSH_NONRETRYABLE_ERROR, OCCUPANCY, CONTROL, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE } from '../constants';
3+
import { PUSH_RETRYABLE_ERROR, PUSH_NONRETRYABLE_ERROR, OCCUPANCY, CONTROL, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, RBSEGMENT_UPDATE } from '../constants';
44
import { IPushEventEmitter } from '../types';
55
import { ISseEventHandler } from '../SSEClient/types';
66
import { INotificationError, INotificationMessage } from './types';
@@ -84,6 +84,7 @@ export function SSEHandlerFactory(log: ILogger, pushEmitter: IPushEventEmitter,
8484
case MEMBERSHIPS_MS_UPDATE:
8585
case MEMBERSHIPS_LS_UPDATE:
8686
case SPLIT_KILL:
87+
case RBSEGMENT_UPDATE:
8788
pushEmitter.emit(parsedData.type, parsedData);
8889
break;
8990

src/sync/streaming/SSEHandler/types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ControlType } from '../constants';
2-
import { SEGMENT_UPDATE, SPLIT_UPDATE, SPLIT_KILL, CONTROL, OCCUPANCY, MEMBERSHIPS_LS_UPDATE, MEMBERSHIPS_MS_UPDATE } from '../types';
2+
import { SEGMENT_UPDATE, SPLIT_UPDATE, SPLIT_KILL, CONTROL, OCCUPANCY, MEMBERSHIPS_LS_UPDATE, MEMBERSHIPS_MS_UPDATE, RBSEGMENT_UPDATE } from '../types';
33

44
export enum Compression {
55
None = 0,
@@ -42,7 +42,7 @@ export interface ISegmentUpdateData {
4242
}
4343

4444
export interface ISplitUpdateData {
45-
type: SPLIT_UPDATE,
45+
type: SPLIT_UPDATE | RBSEGMENT_UPDATE,
4646
changeNumber: number,
4747
pcn?: number,
4848
d?: string,
Lines changed: 93 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { ISplit } from '../../../dtos/types';
1+
import { IRBSegment, ISplit } from '../../../dtos/types';
2+
import { STREAMING_PARSING_SPLIT_UPDATE } from '../../../logger/constants';
23
import { ILogger } from '../../../logger/types';
34
import { SDK_SPLITS_ARRIVED } from '../../../readiness/constants';
45
import { ISplitsEventEmitter } from '../../../readiness/types';
@@ -7,94 +8,120 @@ import { ITelemetryTracker } from '../../../trackers/types';
78
import { Backoff } from '../../../utils/Backoff';
89
import { SPLITS } from '../../../utils/constants';
910
import { ISegmentsSyncTask, ISplitsSyncTask } from '../../polling/types';
11+
import { RBSEGMENT_UPDATE } from '../constants';
12+
import { parseFFUpdatePayload } from '../parseUtils';
1013
import { ISplitKillData, ISplitUpdateData } from '../SSEHandler/types';
1114
import { FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT, FETCH_BACKOFF_MAX_RETRIES } from './constants';
1215
import { IUpdateWorker } from './types';
1316

1417
/**
1518
* SplitsUpdateWorker factory
1619
*/
17-
export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, telemetryTracker: ITelemetryTracker, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker<[updateData: ISplitUpdateData, payload?: ISplit]> & { killSplit(event: ISplitKillData): void } {
20+
export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, telemetryTracker: ITelemetryTracker, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker<[updateData: ISplitUpdateData]> & { killSplit(event: ISplitKillData): void } {
1821

19-
let maxChangeNumber = 0;
20-
let handleNewEvent = false;
21-
let isHandlingEvent: boolean;
22-
let cdnBypass: boolean;
23-
let payload: ISplit | undefined;
24-
const backoff = new Backoff(__handleSplitUpdateCall, FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT);
22+
function SplitsUpdateWorker() {
23+
let maxChangeNumber = 0;
24+
let handleNewEvent = false;
25+
let isHandlingEvent: boolean;
26+
let cdnBypass: boolean;
27+
let payload: ISplit | IRBSegment | undefined;
28+
const backoff = new Backoff(__handleSplitUpdateCall, FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT);
2529

26-
function __handleSplitUpdateCall() {
27-
isHandlingEvent = true;
28-
if (maxChangeNumber > splitsCache.getChangeNumber()) {
29-
handleNewEvent = false;
30-
const splitUpdateNotification = payload ? { payload, changeNumber: maxChangeNumber } : undefined;
31-
// fetch splits revalidating data if cached
32-
splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, splitUpdateNotification).then(() => {
33-
if (!isHandlingEvent) return; // halt if `stop` has been called
34-
if (handleNewEvent) {
35-
__handleSplitUpdateCall();
36-
} else {
37-
if (splitUpdateNotification) telemetryTracker.trackUpdatesFromSSE(SPLITS);
38-
// fetch new registered segments for server-side API. Not retrying on error
39-
if (segmentsSyncTask) segmentsSyncTask.execute(true);
30+
function __handleSplitUpdateCall() {
31+
isHandlingEvent = true;
32+
if (maxChangeNumber > splitsCache.getChangeNumber()) {
33+
handleNewEvent = false;
34+
const splitUpdateNotification = payload ? { payload, changeNumber: maxChangeNumber } : undefined;
35+
// fetch splits revalidating data if cached
36+
splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, splitUpdateNotification).then(() => {
37+
if (!isHandlingEvent) return; // halt if `stop` has been called
38+
if (handleNewEvent) {
39+
__handleSplitUpdateCall();
40+
} else {
41+
if (splitUpdateNotification) telemetryTracker.trackUpdatesFromSSE(SPLITS);
42+
// fetch new registered segments for server-side API. Not retrying on error
43+
if (segmentsSyncTask) segmentsSyncTask.execute(true);
4044

41-
const attempts = backoff.attempts + 1;
45+
const attempts = backoff.attempts + 1;
4246

43-
if (maxChangeNumber <= splitsCache.getChangeNumber()) {
44-
log.debug(`Refresh completed${cdnBypass ? ' bypassing the CDN' : ''} in ${attempts} attempts.`);
45-
isHandlingEvent = false;
46-
return;
47-
}
47+
if (maxChangeNumber <= splitsCache.getChangeNumber()) {
48+
log.debug(`Refresh completed${cdnBypass ? ' bypassing the CDN' : ''} in ${attempts} attempts.`);
49+
isHandlingEvent = false;
50+
return;
51+
}
4852

49-
if (attempts < FETCH_BACKOFF_MAX_RETRIES) {
50-
backoff.scheduleCall();
51-
return;
52-
}
53+
if (attempts < FETCH_BACKOFF_MAX_RETRIES) {
54+
backoff.scheduleCall();
55+
return;
56+
}
5357

54-
if (cdnBypass) {
55-
log.debug(`No changes fetched after ${attempts} attempts with CDN bypassed.`);
56-
isHandlingEvent = false;
57-
} else {
58-
backoff.reset();
59-
cdnBypass = true;
60-
__handleSplitUpdateCall();
58+
if (cdnBypass) {
59+
log.debug(`No changes fetched after ${attempts} attempts with CDN bypassed.`);
60+
isHandlingEvent = false;
61+
} else {
62+
backoff.reset();
63+
cdnBypass = true;
64+
__handleSplitUpdateCall();
65+
}
6166
}
62-
}
63-
});
64-
} else {
65-
isHandlingEvent = false;
67+
});
68+
} else {
69+
isHandlingEvent = false;
70+
}
6671
}
67-
}
6872

69-
/**
70-
* Invoked by NotificationProcessor on SPLIT_UPDATE event
71-
*
72-
* @param changeNumber - change number of the SPLIT_UPDATE notification
73-
*/
74-
function put({ changeNumber, pcn }: ISplitUpdateData, _payload?: ISplit) {
75-
const currentChangeNumber = splitsCache.getChangeNumber();
73+
return {
74+
/**
75+
* Invoked by NotificationProcessor on SPLIT_UPDATE or RBSEGMENT_UPDATE event
76+
*
77+
* @param changeNumber - change number of the notification
78+
*/
79+
put({ changeNumber, pcn }: ISplitUpdateData, _payload?: ISplit | IRBSegment) {
80+
const currentChangeNumber = splitsCache.getChangeNumber();
7681

77-
if (changeNumber <= currentChangeNumber || changeNumber <= maxChangeNumber) return;
82+
if (changeNumber <= currentChangeNumber || changeNumber <= maxChangeNumber) return;
7883

79-
maxChangeNumber = changeNumber;
80-
handleNewEvent = true;
81-
cdnBypass = false;
82-
payload = undefined;
84+
maxChangeNumber = changeNumber;
85+
handleNewEvent = true;
86+
cdnBypass = false;
87+
payload = undefined;
8388

84-
if (_payload && currentChangeNumber === pcn) {
85-
payload = _payload;
86-
}
89+
if (_payload && currentChangeNumber === pcn) {
90+
payload = _payload;
91+
}
8792

88-
if (backoff.timeoutID || !isHandlingEvent) __handleSplitUpdateCall();
89-
backoff.reset();
93+
if (backoff.timeoutID || !isHandlingEvent) __handleSplitUpdateCall();
94+
backoff.reset();
95+
},
96+
stop() {
97+
isHandlingEvent = false;
98+
backoff.reset();
99+
}
100+
};
90101
}
91102

103+
const ff = SplitsUpdateWorker();
104+
const rbs = SplitsUpdateWorker();
105+
92106
return {
93-
put,
107+
put(parsedData) {
108+
if (parsedData.d && parsedData.c !== undefined) {
109+
try {
110+
const payload = parseFFUpdatePayload(parsedData.c, parsedData.d);
111+
if (payload) {
112+
(parsedData.type === RBSEGMENT_UPDATE ? rbs : ff).put(parsedData, payload);
113+
return;
114+
}
115+
} catch (e) {
116+
log.warn(STREAMING_PARSING_SPLIT_UPDATE, [parsedData.type, e]);
117+
}
118+
}
119+
(parsedData.type === RBSEGMENT_UPDATE ? rbs : ff).put(parsedData);
120+
},
94121
/**
95122
* Invoked by NotificationProcessor on SPLIT_KILL event
96123
*
97-
* @param changeNumber - change number of the SPLIT_UPDATE notification
124+
* @param changeNumber - change number of the notification
98125
* @param splitName - name of split to kill
99126
* @param defaultTreatment - default treatment value
100127
*/
@@ -104,12 +131,12 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync,
104131
splitsEventEmitter.emit(SDK_SPLITS_ARRIVED, true);
105132
}
106133
// queues the SplitChanges fetch (only if changeNumber is newer)
107-
put({ changeNumber } as ISplitUpdateData);
134+
ff.put({ changeNumber } as ISplitUpdateData);
108135
},
109136

110137
stop() {
111-
isHandlingEvent = false;
112-
backoff.reset();
138+
ff.stop();
139+
rbs.stop();
113140
}
114141
};
115142
}

src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ describe('SplitsUpdateWorker', () => {
219219
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker);
220220
const payload = notification.decoded;
221221
const changeNumber = payload.changeNumber;
222-
splitUpdateWorker.put({ changeNumber, pcn }, payload); // queued
222+
splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression }); // queued
223223
expect(splitsSyncTask.execute).toBeCalledTimes(1);
224224
expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, { changeNumber, payload }]);
225225
});
@@ -237,7 +237,7 @@ describe('SplitsUpdateWorker', () => {
237237

238238
let splitsSyncTask = splitsSyncTaskMock(cache);
239239
let splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker);
240-
splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded);
240+
splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression });
241241
expect(splitsSyncTask.execute).toBeCalledTimes(1);
242242
expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]);
243243
splitsSyncTask.execute.mockClear();
@@ -250,7 +250,7 @@ describe('SplitsUpdateWorker', () => {
250250

251251
splitsSyncTask = splitsSyncTaskMock(cache);
252252
splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker);
253-
splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded);
253+
splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression });
254254
expect(splitsSyncTask.execute).toBeCalledTimes(1);
255255
expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]);
256256
splitsSyncTask.execute.mockClear();
@@ -263,7 +263,7 @@ describe('SplitsUpdateWorker', () => {
263263

264264
splitsSyncTask = splitsSyncTaskMock(cache);
265265
splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker);
266-
splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded);
266+
splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression });
267267
expect(splitsSyncTask.execute).toBeCalledTimes(1);
268268
expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, { payload: notification.decoded, changeNumber }]);
269269

src/sync/streaming/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export const MEMBERSHIPS_LS_UPDATE = 'MEMBERSHIPS_LS_UPDATE';
3030
export const SEGMENT_UPDATE = 'SEGMENT_UPDATE';
3131
export const SPLIT_KILL = 'SPLIT_KILL';
3232
export const SPLIT_UPDATE = 'SPLIT_UPDATE';
33+
export const RBSEGMENT_UPDATE = 'RBSEGMENT_UPDATE';
3334

3435
// Control-type push notifications, handled by NotificationKeeper
3536
export const CONTROL = 'CONTROL';

src/sync/streaming/parseUtils.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { algorithms } from '../../utils/decompress';
22
import { decodeFromBase64 } from '../../utils/base64';
33
import { hash } from '../../utils/murmur3/murmur3';
44
import { Compression, IMembershipMSUpdateData, KeyList } from './SSEHandler/types';
5-
import { ISplit } from '../../dtos/types';
5+
import { IRBSegment, ISplit } from '../../dtos/types';
66

77
const GZIP = 1;
88
const ZLIB = 2;
@@ -82,7 +82,7 @@ export function isInBitmap(bitmap: Uint8Array, hash64hex: string) {
8282
/**
8383
* Parse feature flags notifications for instant feature flag updates
8484
*/
85-
export function parseFFUpdatePayload(compression: Compression, data: string): ISplit | undefined {
85+
export function parseFFUpdatePayload(compression: Compression, data: string): ISplit | IRBSegment | undefined {
8686
return compression > 0 ?
8787
parseKeyList(data, compression, false) :
8888
JSON.parse(decodeFromBase64(data));

src/sync/streaming/pushManager.ts

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ import { authenticateFactory, hashUserKey } from './AuthClient';
1111
import { forOwn } from '../../utils/lang';
1212
import { SSEClient } from './SSEClient';
1313
import { getMatching } from '../../utils/key';
14-
import { MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, SECONDS_BEFORE_EXPIRATION, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, PUSH_RETRYABLE_ERROR, PUSH_SUBSYSTEM_UP, ControlType } from './constants';
15-
import { STREAMING_FALLBACK, STREAMING_REFRESH_TOKEN, STREAMING_CONNECTING, STREAMING_DISABLED, ERROR_STREAMING_AUTH, STREAMING_DISCONNECTING, STREAMING_RECONNECT, STREAMING_PARSING_MEMBERSHIPS_UPDATE, STREAMING_PARSING_SPLIT_UPDATE } from '../../logger/constants';
14+
import { MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, SECONDS_BEFORE_EXPIRATION, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, RBSEGMENT_UPDATE, PUSH_RETRYABLE_ERROR, PUSH_SUBSYSTEM_UP, ControlType } from './constants';
15+
import { STREAMING_FALLBACK, STREAMING_REFRESH_TOKEN, STREAMING_CONNECTING, STREAMING_DISABLED, ERROR_STREAMING_AUTH, STREAMING_DISCONNECTING, STREAMING_RECONNECT, STREAMING_PARSING_MEMBERSHIPS_UPDATE } from '../../logger/constants';
1616
import { IMembershipMSUpdateData, IMembershipLSUpdateData, KeyList, UpdateStrategy } from './SSEHandler/types';
17-
import { getDelay, isInBitmap, parseBitmap, parseFFUpdatePayload, parseKeyList } from './parseUtils';
17+
import { getDelay, isInBitmap, parseBitmap, parseKeyList } from './parseUtils';
1818
import { Hash64, hash64 } from '../../utils/murmur3/murmur3_64';
1919
import { IAuthTokenPushEnabled } from './AuthClient/types';
2020
import { TOKEN_REFRESH, AUTH_REJECTION } from '../../utils/constants';
@@ -219,20 +219,8 @@ export function pushManagerFactory(
219219
/** Functions related to synchronization (Queues and Workers in the spec) */
220220

221221
pushEmitter.on(SPLIT_KILL, splitsUpdateWorker.killSplit);
222-
pushEmitter.on(SPLIT_UPDATE, (parsedData) => {
223-
if (parsedData.d && parsedData.c !== undefined) {
224-
try {
225-
const payload = parseFFUpdatePayload(parsedData.c, parsedData.d);
226-
if (payload) {
227-
splitsUpdateWorker.put(parsedData, payload);
228-
return;
229-
}
230-
} catch (e) {
231-
log.warn(STREAMING_PARSING_SPLIT_UPDATE, [e]);
232-
}
233-
}
234-
splitsUpdateWorker.put(parsedData);
235-
});
222+
pushEmitter.on(SPLIT_UPDATE, splitsUpdateWorker.put);
223+
pushEmitter.on(RBSEGMENT_UPDATE, splitsUpdateWorker.put);
236224

237225
function handleMySegmentsUpdate(parsedData: IMembershipMSUpdateData | IMembershipLSUpdateData) {
238226
switch (parsedData.u) {

0 commit comments

Comments
 (0)