Skip to content

Commit cc9d543

Browse files
authored
Merge pull request #499 from splitio/async-flagsets-sync
updated sync.split, sync.synchronizer and tasks.util.asynctask classes
2 parents a75cd87 + aae892f commit cc9d543

File tree

10 files changed

+775
-180
lines changed

10 files changed

+775
-180
lines changed

splitio/api/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ def status_code(self):
1414
"""Return HTTP status code."""
1515
return self._status_code
1616

17+
class APIUriException(APIException):
18+
"""Exception to raise when an API call fails due to 414 http error."""
19+
20+
def __init__(self, custom_message, status_code=None):
21+
"""Constructor."""
22+
APIException.__init__(self, custom_message)
23+
1724
def headers_from_metadata(sdk_metadata, client_key=None):
1825
"""
1926
Generate a dict with headers required by data-recording API endpoints.

splitio/push/workers.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,13 @@ def _run(self):
232232
_LOGGER.error('Exception raised in updating feature flag')
233233
_LOGGER.debug('Exception information: ', exc_info=True)
234234
pass
235-
self._handler(event.change_number)
235+
sync_result = self._handler(event.change_number)
236+
if not sync_result.success and sync_result.error_code is not None and sync_result.error_code == 414:
237+
_LOGGER.error("URI too long exception caught, sync failed")
238+
239+
if not sync_result.success:
240+
_LOGGER.error("feature flags sync failed")
241+
236242
except Exception as e: # pylint: disable=broad-except
237243
_LOGGER.error('Exception raised in feature flag synchronization')
238244
_LOGGER.debug('Exception information: ', exc_info=True)

splitio/sync/manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ def __init__(self, synchronizer): # pylint:disable=too-many-arguments
285285
:param synchronizer: synchronizers for performing start/stop logic
286286
:type synchronizer: splitio.sync.synchronizer.Synchronizer
287287
"""
288-
super().__init__(synchronizer)
288+
RedisManagerBase.__init__(self, synchronizer)
289289

290290
def stop(self, blocking):
291291
"""
@@ -308,7 +308,7 @@ def __init__(self, synchronizer): # pylint:disable=too-many-arguments
308308
:param synchronizer: synchronizers for performing start/stop logic
309309
:type synchronizer: splitio.sync.synchronizer.Synchronizer
310310
"""
311-
super().__init__(synchronizer)
311+
RedisManagerBase.__init__(self, synchronizer)
312312

313313
async def stop(self, blocking):
314314
"""

splitio/sync/split.py

Lines changed: 66 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@
88
import hashlib
99
from enum import Enum
1010

11-
from splitio.api import APIException
11+
from splitio.api import APIException, APIUriException
1212
from splitio.api.commons import FetchOptions
13+
from splitio.client.input_validator import validate_flag_sets
1314
from splitio.models import splits
1415
from splitio.util.backoff import Backoff
1516
from splitio.util.time import get_current_epoch_time_ms
17+
from splitio.util.storage_helper import update_feature_flag_storage, update_feature_flag_storage_async
1618
from splitio.sync import util
1719
from splitio.optional.loaders import asyncio, aiofiles
1820

@@ -28,7 +30,7 @@
2830
_ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10
2931

3032

31-
class SplitSynchronizer(object):
33+
class SplitSynchronizerBase(object):
3234
"""Feature Flag changes synchronizer."""
3335

3436
def __init__(self, feature_flag_api, feature_flag_storage):
@@ -52,6 +54,31 @@ def feature_flag_storage(self):
5254
"""Return Feature_flag storage object"""
5355
return self._feature_flag_storage
5456

57+
def _get_config_sets(self):
58+
"""
59+
Get all filter flag sets cnverrted to string, if no filter flagsets exist return None
60+
:return: string with flagsets
61+
:rtype: str
62+
"""
63+
if self._feature_flag_storage.flag_set_filter.flag_sets == set({}):
64+
return None
65+
return ','.join(self._feature_flag_storage.flag_set_filter.sorted_flag_sets)
66+
67+
class SplitSynchronizer(SplitSynchronizerBase):
68+
"""Feature Flag changes synchronizer."""
69+
70+
def __init__(self, feature_flag_api, feature_flag_storage):
71+
"""
72+
Class constructor.
73+
74+
:param feature_flag_api: Feature Flag API Client.
75+
:type feature_flag_api: splitio.api.splits.SplitsAPI
76+
77+
:param feature_flag_storage: Feature Flag Storage.
78+
:type feature_flag_storage: splitio.storage.InMemorySplitStorage
79+
"""
80+
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage)
81+
5582
def _fetch_until(self, fetch_options, till=None):
5683
"""
5784
Hit endpoint, update storage and return when since==till.
@@ -77,18 +104,17 @@ def _fetch_until(self, fetch_options, till=None):
77104
try:
78105
feature_flag_changes = self._api.fetch_splits(change_number, fetch_options)
79106
except APIException as exc:
107+
if exc._status_code is not None and exc._status_code == 414:
108+
_LOGGER.error('Exception caught: the amount of flag sets provided are big causing uri length error.')
109+
_LOGGER.debug('Exception information: ', exc_info=True)
110+
raise APIUriException("URI is too long due to FlagSets count", exc._status_code)
111+
80112
_LOGGER.error('Exception raised while fetching feature flags')
81113
_LOGGER.debug('Exception information: ', exc_info=True)
82114
raise exc
83115

84-
for feature_flag in feature_flag_changes.get('splits', []):
85-
if feature_flag['status'] == splits.Status.ACTIVE.value:
86-
parsed = splits.from_raw(feature_flag)
87-
self._feature_flag_storage.put(parsed)
88-
segment_list.update(set(parsed.get_segment_names()))
89-
else:
90-
self._feature_flag_storage.remove(feature_flag['name'])
91-
self._feature_flag_storage.set_change_number(feature_flag_changes['till'])
116+
fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
117+
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes['till'])
92118
if feature_flag_changes['till'] == feature_flag_changes['since']:
93119
return feature_flag_changes['till'], segment_list
94120

@@ -127,15 +153,15 @@ def synchronize_splits(self, till=None):
127153
:type till: int
128154
"""
129155
final_segment_list = set()
130-
fetch_options = FetchOptions(True) # Set Cache-Control to no-cache
156+
fetch_options = FetchOptions(True, sets=self._get_config_sets()) # Set Cache-Control to no-cache
131157
successful_sync, remaining_attempts, change_number, segment_list = self._attempt_feature_flag_sync(fetch_options,
132158
till)
133159
final_segment_list.update(segment_list)
134160
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
135161
if successful_sync: # succedeed sync
136162
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
137163
return final_segment_list
138-
with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN
164+
with_cdn_bypass = FetchOptions(True, change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
139165
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = self._attempt_feature_flag_sync(with_cdn_bypass, till)
140166
final_segment_list.update(segment_list)
141167
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
@@ -160,8 +186,7 @@ def kill_split(self, feature_flag_name, default_treatment, change_number):
160186
"""
161187
self._feature_flag_storage.kill_locally(feature_flag_name, default_treatment, change_number)
162188

163-
164-
class SplitSynchronizerAsync(object):
189+
class SplitSynchronizerAsync(SplitSynchronizerBase):
165190
"""Feature Flag changes synchronizer async."""
166191

167192
def __init__(self, feature_flag_api, feature_flag_storage):
@@ -174,16 +199,7 @@ def __init__(self, feature_flag_api, feature_flag_storage):
174199
:param feature_flag_storage: Feature Flag Storage.
175200
:type feature_flag_storage: splitio.storage.InMemorySplitStorage
176201
"""
177-
self._api = feature_flag_api
178-
self._feature_flag_storage = feature_flag_storage
179-
self._backoff = Backoff(
180-
_ON_DEMAND_FETCH_BACKOFF_BASE,
181-
_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT)
182-
183-
@property
184-
def feature_flag_storage(self):
185-
"""Return Feature_flag storage object"""
186-
return self._feature_flag_storage
202+
SplitSynchronizerBase.__init__(self, feature_flag_api, feature_flag_storage)
187203

188204
async def _fetch_until(self, fetch_options, till=None):
189205
"""
@@ -210,18 +226,17 @@ async def _fetch_until(self, fetch_options, till=None):
210226
try:
211227
feature_flag_changes = await self._api.fetch_splits(change_number, fetch_options)
212228
except APIException as exc:
229+
if exc._status_code is not None and exc._status_code == 414:
230+
_LOGGER.error('Exception caught: the amount of flag sets provided are big causing uri length error.')
231+
_LOGGER.debug('Exception information: ', exc_info=True)
232+
raise APIUriException("URI is too long due to FlagSets count", exc._status_code)
233+
213234
_LOGGER.error('Exception raised while fetching feature flags')
214235
_LOGGER.debug('Exception information: ', exc_info=True)
215236
raise exc
216237

217-
for feature_flag in feature_flag_changes.get('splits', []):
218-
if feature_flag['status'] == splits.Status.ACTIVE.value:
219-
parsed = splits.from_raw(feature_flag)
220-
await self._feature_flag_storage.put(parsed)
221-
segment_list.update(set(parsed.get_segment_names()))
222-
else:
223-
await self._feature_flag_storage.remove(feature_flag['name'])
224-
await self._feature_flag_storage.set_change_number(feature_flag_changes['till'])
238+
fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
239+
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes['till'])
225240
if feature_flag_changes['till'] == feature_flag_changes['since']:
226241
return feature_flag_changes['till'], segment_list
227242

@@ -260,15 +275,15 @@ async def synchronize_splits(self, till=None):
260275
:type till: int
261276
"""
262277
final_segment_list = set()
263-
fetch_options = FetchOptions(True) # Set Cache-Control to no-cache
278+
fetch_options = FetchOptions(True, sets=self._get_config_sets()) # Set Cache-Control to no-cache
264279
successful_sync, remaining_attempts, change_number, segment_list = await self._attempt_feature_flag_sync(fetch_options,
265280
till)
266281
final_segment_list.update(segment_list)
267282
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
268283
if successful_sync: # succedeed sync
269284
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
270285
return final_segment_list
271-
with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN
286+
with_cdn_bypass = FetchOptions(True, change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
272287
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = await self._attempt_feature_flag_sync(with_cdn_bypass, till)
273288
final_segment_list.update(segment_list)
274289
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
@@ -430,6 +445,9 @@ def _sanitize_feature_flag_elements(self, parsed_feature_flags):
430445
('algo', 2, 2, 2, None, None)]:
431446
feature_flag = util._sanitize_object_element(feature_flag, 'split', element[0], element[1], lower_value=element[2], upper_value=element[3], in_list=element[4], not_in_list=element[5])
432447
feature_flag = self._sanitize_condition(feature_flag)
448+
if 'sets' not in feature_flag:
449+
feature_flag['sets'] = []
450+
feature_flag['sets'] = validate_flag_sets(feature_flag['sets'], 'Localhost Validator')
433451
sanitized_feature_flags.append(feature_flag)
434452
return sanitized_feature_flags
435453

@@ -587,7 +605,7 @@ def synchronize_splits(self, till=None): # pylint:disable=unused-argument
587605
try:
588606
return self._synchronize_json() if self._localhost_mode == LocalhostMode.JSON else self._synchronize_legacy()
589607
except Exception as exc:
590-
_LOGGER.error(str(exc))
608+
_LOGGER.debug('Exception: ', exc_info=True)
591609
raise APIException("Error fetching feature flags information") from exc
592610

593611
def _synchronize_legacy(self):
@@ -604,12 +622,8 @@ def _synchronize_legacy(self):
604622
fetched = self._read_feature_flags_from_legacy_file(self._filename)
605623
to_delete = [name for name in self._feature_flag_storage.get_split_names()
606624
if name not in fetched.keys()]
607-
for feature_flag in fetched.values():
608-
self._feature_flag_storage.put(feature_flag)
609-
610-
for feature_flag in to_delete:
611-
self._feature_flag_storage.remove(feature_flag)
612-
625+
to_add = [feature_flag for feature_flag in fetched.values()]
626+
self._feature_flag_storage.update(to_add, to_delete, 0)
613627
return []
614628

615629
def _synchronize_json(self):
@@ -628,18 +642,12 @@ def _synchronize_json(self):
628642
self._current_json_sha = fecthed_sha
629643
if self._feature_flag_storage.get_change_number() > till and till != self._DEFAULT_FEATURE_FLAG_TILL:
630644
return []
631-
for feature_flag in fetched:
632-
if feature_flag['status'] == splits.Status.ACTIVE.value:
633-
parsed = splits.from_raw(feature_flag)
634-
self._feature_flag_storage.put(parsed)
635-
_LOGGER.debug("feature flag %s is updated", parsed.name)
636-
segment_list.update(set(parsed.get_segment_names()))
637-
else:
638-
self._feature_flag_storage.remove(feature_flag['name'])
639645

640-
self._feature_flag_storage.set_change_number(till)
646+
fetched_feature_flags = [splits.from_raw(feature_flag) for feature_flag in fetched]
647+
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, till)
641648
return segment_list
642649
except Exception as exc:
650+
_LOGGER.debug('Exception: ', exc_info=True)
643651
raise ValueError("Error reading feature flags from json.") from exc
644652

645653
def _read_feature_flags_from_json_file(self, filename):
@@ -658,7 +666,7 @@ def _read_feature_flags_from_json_file(self, filename):
658666
santitized = self._sanitize_feature_flag(parsed)
659667
return santitized['splits'], santitized['till']
660668
except Exception as exc:
661-
_LOGGER.error(str(exc))
669+
_LOGGER.debug('Exception: ', exc_info=True)
662670
raise ValueError("Error parsing file %s. Make sure it's readable." % filename) from exc
663671

664672

@@ -741,7 +749,7 @@ async def synchronize_splits(self, till=None): # pylint:disable=unused-argument
741749
try:
742750
return await self._synchronize_json() if self._localhost_mode == LocalhostMode.JSON else await self._synchronize_legacy()
743751
except Exception as exc:
744-
_LOGGER.error(str(exc))
752+
_LOGGER.debug('Exception: ', exc_info=True)
745753
raise APIException("Error fetching feature flags information") from exc
746754

747755
async def _synchronize_legacy(self):
@@ -758,11 +766,8 @@ async def _synchronize_legacy(self):
758766
fetched = await self._read_feature_flags_from_legacy_file(self._filename)
759767
to_delete = [name for name in await self._feature_flag_storage.get_split_names()
760768
if name not in fetched.keys()]
761-
for feature_flag in fetched.values():
762-
await self._feature_flag_storage.put(feature_flag)
763-
764-
for feature_flag in to_delete:
765-
await self._feature_flag_storage.remove(feature_flag)
769+
to_add = [feature_flag for feature_flag in fetched.values()]
770+
await self._feature_flag_storage.update(to_add, to_delete, 0)
766771

767772
return []
768773

@@ -782,18 +787,11 @@ async def _synchronize_json(self):
782787
self._current_json_sha = fecthed_sha
783788
if await self._feature_flag_storage.get_change_number() > till and till != self._DEFAULT_FEATURE_FLAG_TILL:
784789
return []
785-
for feature_flag in fetched:
786-
if feature_flag['status'] == splits.Status.ACTIVE.value:
787-
parsed = splits.from_raw(feature_flag)
788-
await self._feature_flag_storage.put(parsed)
789-
_LOGGER.debug("feature flag %s is updated", parsed.name)
790-
segment_list.update(set(parsed.get_segment_names()))
791-
else:
792-
await self._feature_flag_storage.remove(feature_flag['name'])
793-
794-
await self._feature_flag_storage.set_change_number(till)
790+
fetched_feature_flags = [splits.from_raw(feature_flag) for feature_flag in fetched]
791+
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, till)
795792
return segment_list
796793
except Exception as exc:
794+
_LOGGER.debug('Exception: ', exc_info=True)
797795
raise ValueError("Error reading feature flags from json.") from exc
798796

799797
async def _read_feature_flags_from_json_file(self, filename):
@@ -812,5 +810,5 @@ async def _read_feature_flags_from_json_file(self, filename):
812810
santitized = self._sanitize_feature_flag(parsed)
813811
return santitized['splits'], santitized['till']
814812
except Exception as exc:
815-
_LOGGER.error(str(exc))
813+
_LOGGER.debug('Exception: ', exc_info=True)
816814
raise ValueError("Error parsing file %s. Make sure it's readable." % filename) from exc

0 commit comments

Comments
 (0)