Skip to content

Commit 271b940

Browse files
committed
polishing
1 parent 5c6ccf0 commit 271b940

File tree

4 files changed

+175
-145
lines changed

4 files changed

+175
-145
lines changed

splitio/push/workers.py

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
from splitio.models.telemetry import UpdateFromSSE
1313
from splitio.push.parser import UpdateType
1414
from splitio.optional.loaders import asyncio
15+
from splitio.util.storage_helper import update_feature_flag_storage, update_feature_flag_storage_async
16+
from splitio.util import log_helper
1517

1618

1719
_LOGGER = logging.getLogger(__name__)
@@ -80,8 +82,8 @@ def _run(self):
8082
try:
8183
self._handler(event.segment_name, event.change_number)
8284
except Exception:
83-
_LOGGER.error('Exception raised in segment synchronization')
84-
_LOGGER.debug('Exception information: ', exc_info=True)
85+
self._LOGGER.error('Exception raised in segment synchronization')
86+
self._LOGGER.debug('Exception information: ', exc_info=True)
8587

8688
def start(self):
8789
"""Start worker."""
@@ -156,7 +158,7 @@ async def stop(self):
156158
"""Stop worker."""
157159
_LOGGER.debug('Stopping Segment Worker')
158160
if not self.is_running():
159-
_LOGGER.debug('Worker is not running. Ignoring.')
161+
self._LOGGER.debug('Worker is not running. Ignoring.')
160162
return
161163
self._running = False
162164
await self._segment_queue.put(self._centinel)
@@ -218,25 +220,27 @@ def _run(self):
218220
try:
219221
if self._check_instant_ff_update(event):
220222
try:
221-
new_split = from_raw(json.loads(self._get_feature_flag_definition(event)))
222-
if new_split.status == Status.ACTIVE:
223-
self._feature_flag_storage.put(new_split)
224-
_LOGGER.debug('Feature flag %s is updated', new_split.name)
225-
for segment_name in new_split.get_segment_names():
226-
if self._segment_storage.get(segment_name) is None:
227-
_LOGGER.debug('Fetching new segment %s', segment_name)
228-
self._segment_handler(segment_name, event.change_number)
229-
else:
230-
self._feature_flag_storage.remove(new_split.name)
231-
self._feature_flag_storage.set_change_number(event.change_number)
223+
new_feature_flag = from_raw(json.loads(self._get_feature_flag_definition(event)))
224+
segment_list = update_feature_flag_storage(self._feature_flag_storage, [new_feature_flag], event.change_number)
225+
for segment_name in segment_list:
226+
if self._segment_storage.get(segment_name) is None:
227+
self._LOGGER.debug('Fetching new segment %s', segment_name)
228+
self._segment_handler(segment_name, event.change_number)
229+
232230
self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
233231
continue
234232
except Exception as e:
235233
_LOGGER.error('Exception raised in updating feature flag')
236234
_LOGGER.debug(str(e))
237235
_LOGGER.debug('Exception information: ', exc_info=True)
238236
pass
239-
self._handler(event.change_number)
237+
sync_result = self._handler(event.change_number)
238+
if not sync_result.success and sync_result.error_code == 414:
239+
_LOGGER.error("URI too long exception caught, sync failed")
240+
241+
if not sync_result.success:
242+
_LOGGER.error("feature flags sync failed")
243+
240244
except Exception as e: # pylint: disable=broad-except
241245
_LOGGER.error('Exception raised in feature flag synchronization')
242246
_LOGGER.debug(str(e))
@@ -318,17 +322,13 @@ async def _run(self):
318322
try:
319323
if await self._check_instant_ff_update(event):
320324
try:
321-
new_split = from_raw(json.loads(self._get_feature_flag_definition(event)))
322-
if new_split.status == Status.ACTIVE:
323-
await self._feature_flag_storage.put(new_split)
324-
_LOGGER.debug('Feature flag %s is updated', new_split.name)
325-
for segment_name in new_split.get_segment_names():
326-
if await self._segment_storage.get(segment_name) is None:
327-
_LOGGER.debug('Fetching new segment %s', segment_name)
328-
await self._segment_handler(segment_name, event.change_number)
329-
else:
330-
await self._feature_flag_storage.remove(new_split.name)
331-
await self._feature_flag_storage.set_change_number(event.change_number)
325+
new_feature_flag = from_raw(json.loads(self._get_feature_flag_definition(event)))
326+
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, [new_feature_flag], event.change_number)
327+
for segment_name in segment_list:
328+
if await self._segment_storage.get(segment_name) is None:
329+
self._LOGGER.debug('Fetching new segment %s', segment_name)
330+
await self._segment_handler(segment_name, event.change_number)
331+
332332
await self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
333333
continue
334334
except Exception as e:

splitio/sync/split.py

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,6 @@
2222
_LEGACY_DEFINITION_LINE_RE = re.compile(r'^(?<![^#])(?P<feature>[\w_-]+)\s+(?P<treatment>[\w_-]+)$')
2323

2424

25-
_LOGGER = logging.getLogger(__name__)
26-
27-
2825
_ON_DEMAND_FETCH_BACKOFF_BASE = 10 # backoff base starting at 10 seconds
2926
_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT = 30 # don't sleep for more than 30 seconds
3027
_ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10
@@ -67,6 +64,8 @@ def _get_config_sets(self):
6764
class SplitSynchronizer(SplitSynchronizerBase):
6865
"""Feature Flag changes synchronizer."""
6966

67+
_LOGGER = logging.getLogger(__name__)
68+
7069
def __init__(self, feature_flag_api, feature_flag_storage):
7170
"""
7271
Class constructor.
@@ -105,12 +104,12 @@ def _fetch_until(self, fetch_options, till=None):
105104
feature_flag_changes = self._api.fetch_splits(change_number, fetch_options)
106105
except APIException as exc:
107106
if exc._status_code is not None and exc._status_code == 414:
108-
_LOGGER.error('Exception caught: the amount of flag sets provided are big causing uri length error.')
109-
_LOGGER.debug('Exception information: ', exc_info=True)
110-
raise APIUriException("URI is too long due to FlagSets count")
107+
self._LOGGER.error('Exception caught: the amount of flag sets provided are big causing uri length error.')
108+
self._LOGGER.debug('Exception information: ', exc_info=True)
109+
raise APIUriException("URI is too long due to FlagSets count", exc._status_code)
111110

112-
_LOGGER.error('Exception raised while fetching feature flags')
113-
_LOGGER.debug('Exception information: ', exc_info=True)
111+
self._LOGGER.error('Exception raised while fetching feature flags')
112+
self._LOGGER.debug('Exception information: ', exc_info=True)
114113
raise exc
115114

116115
fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
@@ -159,18 +158,18 @@ def synchronize_splits(self, till=None):
159158
final_segment_list.update(segment_list)
160159
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
161160
if successful_sync: # succedeed sync
162-
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
161+
self._LOGGER.debug('Refresh completed in %d attempts.', attempts)
163162
return final_segment_list
164163
with_cdn_bypass = FetchOptions(True, change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
165164
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = self._attempt_feature_flag_sync(with_cdn_bypass, till)
166165
final_segment_list.update(segment_list)
167166
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
168167
if without_cdn_successful_sync:
169-
_LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.',
168+
self._LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.',
170169
without_cdn_attempts)
171170
return final_segment_list
172171
else:
173-
_LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.',
172+
self._LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.',
174173
without_cdn_attempts)
175174

176175
def kill_split(self, feature_flag_name, default_treatment, change_number):
@@ -189,6 +188,8 @@ def kill_split(self, feature_flag_name, default_treatment, change_number):
189188
class SplitSynchronizerAsync(SplitSynchronizerBase):
190189
"""Feature Flag changes synchronizer async."""
191190

191+
_LOGGER = logging.getLogger('asyncio')
192+
192193
def __init__(self, feature_flag_api, feature_flag_storage):
193194
"""
194195
Class constructor.
@@ -227,12 +228,12 @@ async def _fetch_until(self, fetch_options, till=None):
227228
feature_flag_changes = await self._api.fetch_splits(change_number, fetch_options)
228229
except APIException as exc:
229230
if exc._status_code is not None and exc._status_code == 414:
230-
_LOGGER.error('Exception caught: the amount of flag sets provided are big causing uri length error.')
231-
_LOGGER.debug('Exception information: ', exc_info=True)
232-
raise APIUriException("URI is too long due to FlagSets count")
231+
self._LOGGER.error('Exception caught: the amount of flag sets provided are big causing uri length error.')
232+
self._LOGGER.debug('Exception information: ', exc_info=True)
233+
raise APIUriException("URI is too long due to FlagSets count", exc._status_code)
233234

234-
_LOGGER.error('Exception raised while fetching feature flags')
235-
_LOGGER.debug('Exception information: ', exc_info=True)
235+
self._LOGGER.error('Exception raised while fetching feature flags')
236+
self._LOGGER.debug('Exception information: ', exc_info=True)
236237
raise exc
237238

238239
fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
@@ -281,18 +282,18 @@ async def synchronize_splits(self, till=None):
281282
final_segment_list.update(segment_list)
282283
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
283284
if successful_sync: # succedeed sync
284-
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
285+
self._LOGGER.debug('Refresh completed in %d attempts.', attempts)
285286
return final_segment_list
286287
with_cdn_bypass = FetchOptions(True, change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
287288
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = await self._attempt_feature_flag_sync(with_cdn_bypass, till)
288289
final_segment_list.update(segment_list)
289290
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
290291
if without_cdn_successful_sync:
291-
_LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.',
292+
self._LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.',
292293
without_cdn_attempts)
293294
return final_segment_list
294295
else:
295-
_LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.',
296+
self._LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.',
296297
without_cdn_attempts)
297298

298299
async def kill_split(self, feature_flag_name, default_treatment, change_number):
@@ -432,7 +433,7 @@ def _sanitize_feature_flag_elements(self, parsed_feature_flags):
432433
sanitized_feature_flags = []
433434
for feature_flag in parsed_feature_flags:
434435
if 'name' not in feature_flag or feature_flag['name'].strip() == '':
435-
_LOGGER.warning("A feature flag in json file does not have (Name) or property is empty, skipping.")
436+
self._LOGGER.warning("A feature flag in json file does not have (Name) or property is empty, skipping.")
436437
continue
437438
for element in [('trafficTypeName', 'user', None, None, None, None),
438439
('trafficAllocation', 100, 0, 100, None, None),
@@ -475,7 +476,7 @@ def _sanitize_condition(self, feature_flag):
475476
break
476477

477478
if not found_all_keys_matcher:
478-
_LOGGER.debug("Missing default rule condition for feature flag: %s, adding default rule with 100%% off treatment", feature_flag['name'])
479+
self._LOGGER.debug("Missing default rule condition for feature flag: %s, adding default rule with 100%% off treatment", feature_flag['name'])
479480
feature_flag['conditions'].append(
480481
{
481482
"conditionType": "ROLLOUT",
@@ -529,6 +530,8 @@ def _convert_yaml_to_feature_flag(cls, parsed):
529530
class LocalSplitSynchronizer(LocalSplitSynchronizerBase):
530531
"""Localhost mode feature_flag synchronizer."""
531532

533+
_LOGGER = logging.getLogger(__name__)
534+
532535
def __init__(self, filename, feature_flag_storage, localhost_mode=LocalhostMode.LEGACY):
533536
"""
534537
Class constructor.
@@ -565,7 +568,7 @@ def _read_feature_flags_from_legacy_file(cls, filename):
565568

566569
definition_match = _LEGACY_DEFINITION_LINE_RE.match(line)
567570
if not definition_match:
568-
_LOGGER.warning(
571+
self._LOGGER.warning(
569572
'Invalid line on localhost environment feature flag '
570573
'definition. Line = %s',
571574
line
@@ -601,11 +604,11 @@ def _read_feature_flags_from_yaml_file(cls, filename):
601604

602605
def synchronize_splits(self, till=None): # pylint:disable=unused-argument
603606
"""Update feature flags in storage."""
604-
_LOGGER.info('Synchronizing feature flags now.')
607+
self._LOGGER.info('Synchronizing feature flags now.')
605608
try:
606609
return self._synchronize_json() if self._localhost_mode == LocalhostMode.JSON else self._synchronize_legacy()
607610
except Exception as exc:
608-
_LOGGER.debug('Exception: ', exc_info=True)
611+
self._LOGGER.debug('Exception: ', exc_info=True)
609612
raise APIException("Error fetching feature flags information") from exc
610613

611614
def _synchronize_legacy(self):
@@ -647,7 +650,7 @@ def _synchronize_json(self):
647650
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, till)
648651
return segment_list
649652
except Exception as exc:
650-
_LOGGER.debug('Exception: ', exc_info=True)
653+
self._LOGGER.debug('Exception: ', exc_info=True)
651654
raise ValueError("Error reading feature flags from json.") from exc
652655

653656
def _read_feature_flags_from_json_file(self, filename):
@@ -666,13 +669,15 @@ def _read_feature_flags_from_json_file(self, filename):
666669
santitized = self._sanitize_feature_flag(parsed)
667670
return santitized['splits'], santitized['till']
668671
except Exception as exc:
669-
_LOGGER.debug('Exception: ', exc_info=True)
672+
self._LOGGER.debug('Exception: ', exc_info=True)
670673
raise ValueError("Error parsing file %s. Make sure it's readable." % filename) from exc
671674

672675

673676
class LocalSplitSynchronizerAsync(LocalSplitSynchronizerBase):
674677
"""Localhost mode async feature_flag synchronizer."""
675678

679+
_LOGGER = logging.getLogger('asyncio')
680+
676681
def __init__(self, filename, feature_flag_storage, localhost_mode=LocalhostMode.LEGACY):
677682
"""
678683
Class constructor.
@@ -709,7 +714,7 @@ async def _read_feature_flags_from_legacy_file(cls, filename):
709714

710715
definition_match = _LEGACY_DEFINITION_LINE_RE.match(line)
711716
if not definition_match:
712-
_LOGGER.warning(
717+
self._LOGGER.warning(
713718
'Invalid line on localhost environment feature flag '
714719
'definition. Line = %s',
715720
line
@@ -745,11 +750,11 @@ async def _read_feature_flags_from_yaml_file(cls, filename):
745750

746751
async def synchronize_splits(self, till=None): # pylint:disable=unused-argument
747752
"""Update feature flags in storage."""
748-
_LOGGER.info('Synchronizing feature flags now.')
753+
self._LOGGER.info('Synchronizing feature flags now.')
749754
try:
750755
return await self._synchronize_json() if self._localhost_mode == LocalhostMode.JSON else await self._synchronize_legacy()
751756
except Exception as exc:
752-
_LOGGER.debug('Exception: ', exc_info=True)
757+
self._LOGGER.debug('Exception: ', exc_info=True)
753758
raise APIException("Error fetching feature flags information") from exc
754759

755760
async def _synchronize_legacy(self):
@@ -791,7 +796,7 @@ async def _synchronize_json(self):
791796
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, till)
792797
return segment_list
793798
except Exception as exc:
794-
_LOGGER.debug('Exception: ', exc_info=True)
799+
self._LOGGER.debug('Exception: ', exc_info=True)
795800
raise ValueError("Error reading feature flags from json.") from exc
796801

797802
async def _read_feature_flags_from_json_file(self, filename):
@@ -810,5 +815,5 @@ async def _read_feature_flags_from_json_file(self, filename):
810815
santitized = self._sanitize_feature_flag(parsed)
811816
return santitized['splits'], santitized['till']
812817
except Exception as exc:
813-
_LOGGER.debug('Exception: ', exc_info=True)
818+
self._LOGGER.debug('Exception: ', exc_info=True)
814819
raise ValueError("Error parsing file %s. Make sure it's readable." % filename) from exc

0 commit comments

Comments
 (0)