Skip to content

Commit 874f205

Browse files
authored
Merge pull request #455 from splitio/flagsets-sync-split
Added sync.split logic, updated storage.inmemory.split and sync.synch…
2 parents 016102b + 8bccfc9 commit 874f205

File tree

6 files changed

+270
-101
lines changed

6 files changed

+270
-101
lines changed

splitio/storage/inmemmory.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
class InMemorySplitStorage(SplitStorage):
1818
"""InMemory implementation of a split storage."""
1919

20-
def __init__(self):
20+
def __init__(self, flag_sets=[]):
2121
"""Constructor."""
2222
self._lock = threading.RLock()
2323
self._splits = {}
2424
self._change_number = -1
2525
self._traffic_types = Counter()
2626
self._sets_feature_flag_map = {}
27+
self.config_flag_sets_used = len(flag_sets)
28+
for flag_set in flag_sets:
29+
self._sets_feature_flag_map[flag_set] = set()
2730

2831
def get(self, split_name):
2932
"""
@@ -73,13 +76,15 @@ def _put(self, split):
7376
"""
7477
with self._lock:
7578
if split.name in self._splits:
76-
self._remove_flag_sets(self._splits[split.name])
79+
self._remove_from_flag_sets(self._splits[split.name])
7780
self._decrease_traffic_type_count(self._splits[split.name].traffic_type_name)
7881
self._splits[split.name] = split
7982
self._increase_traffic_type_count(split.traffic_type_name)
8083
if split.sets is not None:
8184
for flag_set in split.sets:
8285
if flag_set not in self._sets_feature_flag_map.keys():
86+
if self.config_flag_sets_used > 0:
87+
continue
8388
self._sets_feature_flag_map[flag_set] = set()
8489
self._sets_feature_flag_map[flag_set].add(split.name)
8590

@@ -101,10 +106,10 @@ def _remove(self, split_name):
101106

102107
self._splits.pop(split_name)
103108
self._decrease_traffic_type_count(split.traffic_type_name)
104-
self._remove_flag_sets(split)
109+
self._remove_from_flag_sets(split)
105110
return True
106111

107-
def _remove_flag_sets(self, feature_flag):
112+
def _remove_from_flag_sets(self, feature_flag):
108113
"""
109114
Remove flag sets associated to a split
110115
@@ -114,7 +119,7 @@ def _remove_flag_sets(self, feature_flag):
114119
if feature_flag.sets is not None:
115120
for flag_set in feature_flag.sets:
116121
self._sets_feature_flag_map[flag_set].remove(feature_flag.name)
117-
if len(self._sets_feature_flag_map[flag_set]) == 0:
122+
if len(self._sets_feature_flag_map[flag_set]) == 0 and self.config_flag_sets_used == 0:
118123
del self._sets_feature_flag_map[flag_set]
119124

120125
def get_feature_flags_by_set(self, set):
@@ -232,6 +237,20 @@ def _decrease_traffic_type_count(self, traffic_type_name):
232237
self._traffic_types.subtract([traffic_type_name])
233238
self._traffic_types += Counter()
234239

240+
def is_flag_set_exist(self, flag_set):
241+
"""
242+
Return whether a flag set exists in at least one feature flag in cache.
243+
244+
:param flag_set: Flag set to validate.
245+
:type flag_set: str
246+
247+
:return: True if the flag_set exist. False otherwise.
248+
:rtype: bool
249+
"""
250+
if flag_set in self._sets_feature_flag_map.keys():
251+
return True
252+
return False
253+
235254

236255
class InMemorySegmentStorage(SegmentStorage):
237256
"""In-memory implementation of a segment storage."""

splitio/sync/split.py

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,17 +80,38 @@ def _fetch_until(self, fetch_options, till=None):
8080
_LOGGER.debug('Exception information: ', exc_info=True)
8181
raise exc
8282

83+
to_add = []
84+
to_delete = []
8385
for feature_flag in feature_flag_changes.get('splits', []):
84-
if feature_flag['status'] == splits.Status.ACTIVE.value:
86+
if (self._feature_flag_storage.config_flag_sets_used == 0 and feature_flag['status'] == splits.Status.ACTIVE.value) or \
87+
(feature_flag['status'] == splits.Status.ACTIVE.value and self._check_flag_sets(feature_flag)):
8588
parsed = splits.from_raw(feature_flag)
86-
self._feature_flag_storage.put(parsed)
89+
to_add.append(parsed)
8790
segment_list.update(set(parsed.get_segment_names()))
8891
else:
89-
self._feature_flag_storage.remove(feature_flag['name'])
90-
self._feature_flag_storage.set_change_number(feature_flag_changes['till'])
92+
if self._feature_flag_storage.get(feature_flag['name']) is not None:
93+
to_delete.append(feature_flag['name'])
94+
95+
self._feature_flag_storage.update(to_add, to_delete, feature_flag_changes['till'])
9196
if feature_flag_changes['till'] == feature_flag_changes['since']:
9297
return feature_flag_changes['till'], segment_list
9398

99+
def _check_flag_sets(self, feature_flag):
100+
"""
101+
Check all flag sets in a feature flag, return True if any of sets exist in storage
102+
103+
:param feature_flag: Flag set to validate.
104+
:type feature_flag: json
105+
106+
:return: True if any of its flag_set exist. False otherwise.
107+
:rtype: bool
108+
"""
109+
for flag_set in feature_flag['sets']:
110+
if self._feature_flag_storage.is_flag_set_exist(flag_set):
111+
return True
112+
return False
113+
114+
94115
def _attempt_feature_flag_sync(self, fetch_options, till=None):
95116
"""
96117
Hit endpoint, update storage and return True if sync is complete.
@@ -347,11 +368,10 @@ def _synchronize_legacy(self):
347368
fetched = self._read_feature_flags_from_legacy_file(self._filename)
348369
to_delete = [name for name in self._feature_flag_storage.get_split_names()
349370
if name not in fetched.keys()]
350-
for feature_flag in fetched.values():
351-
self._feature_flag_storage.put(feature_flag)
371+
to_add = []
372+
[to_add.append(feature_flag) for feature_flag in fetched.values()]
352373

353-
for feature_flag in to_delete:
354-
self._feature_flag_storage.remove(feature_flag)
374+
self._feature_flag_storage.update(to_add, to_delete, 0)
355375

356376
return []
357377

@@ -371,16 +391,18 @@ def _synchronize_json(self):
371391
self._current_json_sha = fecthed_sha
372392
if self._feature_flag_storage.get_change_number() > till and till != self._DEFAULT_FEATURE_FLAG_TILL:
373393
return []
394+
to_add = []
395+
to_delete = []
374396
for feature_flag in fetched:
375397
if feature_flag['status'] == splits.Status.ACTIVE.value:
376398
parsed = splits.from_raw(feature_flag)
377-
self._feature_flag_storage.put(parsed)
399+
to_add.append(parsed)
378400
_LOGGER.debug("feature flag %s is updated", parsed.name)
379401
segment_list.update(set(parsed.get_segment_names()))
380402
else:
381-
self._feature_flag_storage.remove(feature_flag['name'])
403+
to_delete.append(feature_flag['name'])
382404

383-
self._feature_flag_storage.set_change_number(till)
405+
self._feature_flag_storage.update(to_add, to_delete, till)
384406
return segment_list
385407
except Exception as exc:
386408
raise ValueError("Error reading feature flags from json.") from exc

splitio/sync/synchronizer.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ def __init__(self, split_synchronizers, split_tasks):
251251
self._periodic_data_recording_tasks.append(self._split_tasks.unique_keys_task)
252252
if self._split_tasks.clear_filter_task:
253253
self._periodic_data_recording_tasks.append(self._split_tasks.clear_filter_task)
254+
self._break_sync_all = False
254255

255256
@property
256257
def split_sync(self):
@@ -289,6 +290,7 @@ def synchronize_splits(self, till, sync_segments=True):
289290
:returns: whether the synchronization was successful or not.
290291
:rtype: bool
291292
"""
293+
self._break_sync_all = False
292294
_LOGGER.debug('Starting feature flags synchronization')
293295
try:
294296
new_segments = []
@@ -304,7 +306,9 @@ def synchronize_splits(self, till, sync_segments=True):
304306
else:
305307
_LOGGER.debug('Segment sync scheduled.')
306308
return True
307-
except APIException:
309+
except APIException as exc:
310+
if exc._status_code is not None and exc._status_code == 414:
311+
self._break_sync_all = True
308312
_LOGGER.error('Failed syncing feature flags')
309313
_LOGGER.debug('Error: ', exc_info=True)
310314
return False
@@ -334,7 +338,7 @@ def sync_all(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
334338
_LOGGER.debug('Error: ', exc_info=True)
335339
if max_retry_attempts != _SYNC_ALL_NO_RETRIES:
336340
retry_attempts += 1
337-
if retry_attempts > max_retry_attempts:
341+
if retry_attempts > max_retry_attempts or self._break_sync_all:
338342
break
339343
how_long = self._backoff.get()
340344
time.sleep(how_long)

tests/storage/test_inmemory_storage.py

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -213,36 +213,88 @@ def test_kill_locally(self):
213213
storage.kill_locally('some_split', 'default_treatment', 3)
214214
assert storage.get('some_split').change_number == 3
215215

216-
def test_flag_sets(self):
217-
storage = InMemorySplitStorage()
218-
assert storage._sets_feature_flag_map == {}
216+
def test_flag_sets_with_config_sets(self):
217+
storage = InMemorySplitStorage(['set10', 'set02', 'set05'])
218+
assert storage.config_flag_sets_used == 3
219+
assert storage._sets_feature_flag_map == {'set10': set(), 'set02': set(), 'set05': set()}
219220

220221
split1 = Split('split1', 123456789, False, 'some', 'traffic_type',
221222
'ACTIVE', 1, sets=['set10', 'set02'])
222223
split2 = Split('split2', 123456789, False, 'some', 'traffic_type',
223224
'ACTIVE', 1, sets=['set05', 'set02'])
225+
split3 = Split('split3', 123456789, False, 'some', 'traffic_type',
226+
'ACTIVE', 1, sets=['set04', 'set05'])
224227
storage.update([split1], [], 1)
225228
assert storage.get_feature_flags_by_set('set10') == ['split1']
226229
assert storage.get_feature_flags_by_set('set02') == ['split1']
230+
assert storage.is_flag_set_exist('set10')
231+
assert storage.is_flag_set_exist('set02')
232+
assert not storage.is_flag_set_exist('set03')
227233

228234
storage.update([split2], [], 1)
229235
assert storage.get_feature_flags_by_set('set05') == ['split2']
230236
assert sorted(storage.get_feature_flags_by_set('set02')) == ['split1', 'split2']
237+
assert storage.is_flag_set_exist('set05')
231238

232239
storage.update([], [split2.name], 1)
233-
assert 'set5' not in storage._sets_feature_flag_map
240+
assert storage.is_flag_set_exist('set05')
234241
assert storage.get_feature_flags_by_set('set02') == ['split1']
235242
assert storage.get_feature_flags_by_set('set05') == []
236243

237244
split1 = Split('split1', 123456789, False, 'some', 'traffic_type',
238245
'ACTIVE', 1, sets=['set02'])
239246
storage.update([split1], [], 1)
240-
assert 'set10' not in storage._sets_feature_flag_map
247+
assert storage.is_flag_set_exist('set10')
241248
assert storage.get_feature_flags_by_set('set02') == ['split1']
242249

243250
storage.update([], [split1.name], 1)
251+
assert storage.get_feature_flags_by_set('set02') == []
252+
assert storage._sets_feature_flag_map == {'set10': set(), 'set02': set(), 'set05': set()}
253+
254+
storage.update([split3], [], 1)
255+
assert storage.get_feature_flags_by_set('set05') == ['split3']
256+
assert not storage.is_flag_set_exist('set04')
257+
258+
def test_flag_sets_withut_config_sets(self):
259+
storage = InMemorySplitStorage()
244260
assert storage._sets_feature_flag_map == {}
261+
assert storage.config_flag_sets_used == 0
262+
263+
split1 = Split('split1', 123456789, False, 'some', 'traffic_type',
264+
'ACTIVE', 1, sets=['set10', 'set02'])
265+
split2 = Split('split2', 123456789, False, 'some', 'traffic_type',
266+
'ACTIVE', 1, sets=['set05', 'set02'])
267+
split3 = Split('split3', 123456789, False, 'some', 'traffic_type',
268+
'ACTIVE', 1, sets=['set04', 'set05'])
269+
storage.update([split1], [], 1)
270+
assert storage.get_feature_flags_by_set('set10') == ['split1']
271+
assert storage.get_feature_flags_by_set('set02') == ['split1']
272+
assert storage.is_flag_set_exist('set10')
273+
assert storage.is_flag_set_exist('set02')
274+
assert not storage.is_flag_set_exist('set03')
275+
276+
storage.update([split2], [], 1)
277+
assert storage.get_feature_flags_by_set('set05') == ['split2']
278+
assert sorted(storage.get_feature_flags_by_set('set02')) == ['split1', 'split2']
279+
assert storage.is_flag_set_exist('set05')
280+
281+
storage.update([], [split2.name], 1)
282+
assert not storage.is_flag_set_exist('set05')
283+
assert storage.get_feature_flags_by_set('set02') == ['split1']
284+
285+
split1 = Split('split1', 123456789, False, 'some', 'traffic_type',
286+
'ACTIVE', 1, sets=['set02'])
287+
storage.update([split1], [], 1)
288+
assert not storage.is_flag_set_exist('set10')
289+
assert storage.get_feature_flags_by_set('set02') == ['split1']
290+
291+
storage.update([], [split1.name], 1)
245292
assert storage.get_feature_flags_by_set('set02') == []
293+
assert storage._sets_feature_flag_map == {}
294+
295+
storage.update([split3], [], 1)
296+
assert storage.get_feature_flags_by_set('set05') == ['split3']
297+
assert storage.get_feature_flags_by_set('set04') == ['split3']
246298

247299

248300
class InMemorySegmentStorageTests(object):

0 commit comments

Comments
 (0)