Skip to content

Commit 56c4893

Browse files
authored
Merge pull request #536 from splitio/async-pr-2
polishing
2 parents f0e9873 + b201e30 commit 56c4893

File tree

8 files changed

+124
-91
lines changed

8 files changed

+124
-91
lines changed

splitio/push/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
class AuthException(Exception):
2+
"""Exception to raise when an API call fails."""
3+
4+
def __init__(self, custom_message, status_code=None):
5+
"""Constructor."""
6+
Exception.__init__(self, custom_message)
7+
8+
class SplitStorageException(Exception):
9+
"""Exception to raise when an API call fails."""
10+
11+
def __init__(self, custom_message, status_code=None):
12+
"""Constructor."""
13+
Exception.__init__(self, custom_message)

splitio/push/manager.py

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from splitio.optional.loaders import asyncio, anext
66
from splitio.api import APIException
77
from splitio.util.time import get_current_epoch_time_ms
8+
from splitio.push import AuthException
89
from splitio.push.splitsse import SplitSSEClient, SplitSSEClientAsync
910
from splitio.push.sse import SSE_EVENT_ERROR
1011
from splitio.push.parser import parse_incoming_event, EventParsingException, EventType, \
@@ -315,7 +316,6 @@ def __init__(self, auth_api, synchronizer, feedback_loop, sdk_metadata, telemetr
315316
kwargs = {} if sse_url is None else {'base_url': sse_url}
316317
self._sse_client = SplitSSEClientAsync(sdk_metadata, client_key, **kwargs)
317318
self._running = False
318-
self._done = asyncio.Event()
319319
self._telemetry_runtime_producer = telemetry_runtime_producer
320320
self._token_task = None
321321

@@ -366,6 +366,7 @@ async def _event_handler(self, event):
366366
:param event: Incoming event
367367
:type event: splitio.push.sse.SSEEvent
368368
"""
369+
parsed = None
369370
try:
370371
parsed = parse_incoming_event(event)
371372
handle = self._event_handlers[parsed.event_type]
@@ -377,8 +378,8 @@ async def _event_handler(self, event):
377378
try:
378379
await handle(parsed)
379380
except Exception: # pylint:disable=broad-except
380-
_LOGGER.error('something went wrong when processing message of type %s',
381-
parsed.event_type)
381+
event_type = "unknown" if parsed is None else parsed.event_type
382+
_LOGGER.error('something went wrong when processing message of type %s', event_type)
382383
_LOGGER.debug(str(parsed), exc_info=True)
383384

384385
async def _token_refresh(self, current_token):
@@ -396,15 +397,15 @@ async def _get_auth_token(self):
396397
"""Get new auth token"""
397398
try:
398399
token = await self._auth_api.authenticate()
399-
except APIException:
400+
except APIException as e:
400401
_LOGGER.error('error performing sse auth request.')
401402
_LOGGER.debug('stack trace: ', exc_info=True)
402403
await self._feedback_loop.put(Status.PUSH_RETRYABLE_ERROR)
403-
raise
404+
raise AuthException(e)
404405

405406
if token is not None and not token.push_enabled:
406407
await self._feedback_loop.put(Status.PUSH_NONRETRYABLE_ERROR)
407-
raise Exception("Push is not enabled")
408+
raise AuthException("Push is not enabled")
408409

409410
await self._telemetry_runtime_producer.record_token_refreshes()
410411
await self._telemetry_runtime_producer.record_streaming_event((StreamingEventTypes.TOKEN_REFRESH, 1000 * token.exp, get_current_epoch_time_ms()))
@@ -416,23 +417,11 @@ async def _trigger_connection_flow(self):
416417
self._status_tracker.reset()
417418

418419
try:
419-
try:
420-
token = await self._get_auth_token()
421-
except Exception as e:
422-
_LOGGER.error("error getting auth token: " + str(e))
423-
_LOGGER.debug("trace: ", exc_info=True)
424-
return
425-
420+
token = await self._get_auth_token()
426421
events_source = self._sse_client.start(token)
427-
self._done.clear()
428422
self._running = True
429423

430-
try:
431-
first_event = await anext(events_source)
432-
except StopAsyncIteration: # will enter here if there was an error
433-
await self._feedback_loop.put(Status.PUSH_RETRYABLE_ERROR)
434-
return
435-
424+
first_event = await anext(events_source)
436425
if first_event.data is not None:
437426
await self._event_handler(first_event)
438427

@@ -444,13 +433,17 @@ async def _trigger_connection_flow(self):
444433
async for event in events_source:
445434
await self._event_handler(event)
446435
await self._handle_connection_end() # TODO(mredolatti): this is not tested
436+
except AuthException as e:
437+
_LOGGER.error("error getting auth token: " + str(e))
438+
_LOGGER.debug("trace: ", exc_info=True)
439+
except StopAsyncIteration: # will enter here if there was an error
440+
await self._feedback_loop.put(Status.PUSH_RETRYABLE_ERROR)
447441
finally:
448442
if self._token_task is not None:
449443
self._token_task.cancel()
450444
self._token_task = None
451445
self._running = False
452446
await self._processor.update_workers_status(False)
453-
self._done.set()
454447

455448
async def _handle_message(self, event):
456449
"""

splitio/push/splitsse.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,15 @@ class _Status(Enum):
2222
ERRORED = 2
2323
CONNECTED = 3
2424

25+
def __init__(self, base_url):
26+
"""
27+
Construct a split sse client.
28+
29+
:param base_url: scheme + :// + host
30+
:type base_url: str
31+
"""
32+
self._base_url = base_url
33+
2534
@staticmethod
2635
def _format_channels(channels):
2736
"""
@@ -90,11 +99,11 @@ def __init__(self, event_callback, sdk_metadata, first_event_callback=None,
9099
:param client_key: client key.
91100
:type client_key: str
92101
"""
102+
SplitSSEClientBase.__init__(self, base_url)
93103
self._client = SSEClient(self._raw_event_handler)
94104
self._callback = event_callback
95105
self._on_connected = first_event_callback
96106
self._on_disconnected = connection_closed_callback
97-
self._base_url = base_url
98107
self._status = SplitSSEClient._Status.IDLE
99108
self._sse_first_event = None
100109
self._sse_connection_closed = None
@@ -178,7 +187,7 @@ def __init__(self, sdk_metadata, client_key=None, base_url='https://streaming.sp
178187
:param base_url: scheme + :// + host
179188
:type base_url: str
180189
"""
181-
self._base_url = base_url
190+
SplitSSEClientBase.__init__(self, base_url)
182191
self.status = SplitSSEClient._Status.IDLE
183192
self._metadata = headers_from_metadata(sdk_metadata, client_key)
184193
self._client = SSEClientAsync(self.KEEPALIVE_TIMEOUT)

splitio/push/workers.py

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from splitio.models.splits import from_raw
1212
from splitio.models.telemetry import UpdateFromSSE
13+
from splitio.push import SplitStorageException
1314
from splitio.push.parser import UpdateType
1415
from splitio.optional.loaders import asyncio
1516
from splitio.util.storage_helper import update_feature_flag_storage, update_feature_flag_storage_async
@@ -202,9 +203,28 @@ def is_running(self):
202203
"""Return whether the working is running."""
203204
return self._running
204205

206+
def _apply_iff_if_needed(self, event):
207+
if not self._check_instant_ff_update(event):
208+
return False
209+
210+
try:
211+
new_feature_flag = from_raw(json.loads(self._get_feature_flag_definition(event)))
212+
segment_list = update_feature_flag_storage(self._feature_flag_storage, [new_feature_flag], event.change_number)
213+
for segment_name in segment_list:
214+
if self._segment_storage.get(segment_name) is None:
215+
_LOGGER.debug('Fetching new segment %s', segment_name)
216+
self._segment_handler(segment_name, event.change_number)
217+
218+
self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
219+
return True
220+
221+
except Exception as e:
222+
raise SplitStorageException(e)
223+
205224
def _check_instant_ff_update(self, event):
206225
if event.update_type == UpdateType.SPLIT_UPDATE and event.compression is not None and event.previous_change_number == self._feature_flag_storage.get_change_number():
207226
return True
227+
208228
return False
209229

210230
def _run(self):
@@ -217,28 +237,19 @@ def _run(self):
217237
continue
218238
_LOGGER.debug('Processing feature flag update %d', event.change_number)
219239
try:
220-
if self._check_instant_ff_update(event):
221-
try:
222-
new_feature_flag = from_raw(json.loads(self._get_feature_flag_definition(event)))
223-
segment_list = update_feature_flag_storage(self._feature_flag_storage, [new_feature_flag], event.change_number)
224-
for segment_name in segment_list:
225-
if self._segment_storage.get(segment_name) is None:
226-
_LOGGER.debug('Fetching new segment %s', segment_name)
227-
self._segment_handler(segment_name, event.change_number)
228-
229-
self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
230-
continue
231-
except Exception as e:
232-
_LOGGER.error('Exception raised in updating feature flag')
233-
_LOGGER.debug('Exception information: ', exc_info=True)
234-
pass
240+
if self._apply_iff_if_needed(event):
241+
continue
242+
235243
sync_result = self._handler(event.change_number)
236244
if not sync_result.success and sync_result.error_code is not None and sync_result.error_code == 414:
237245
_LOGGER.error("URI too long exception caught, sync failed")
238246

239247
if not sync_result.success:
240248
_LOGGER.error("feature flags sync failed")
241249

250+
except SplitStorageException as e: # pylint: disable=broad-except
251+
_LOGGER.error('Exception Updating Feature Flag')
252+
_LOGGER.debug('Exception information: ', exc_info=True)
242253
except Exception as e: # pylint: disable=broad-except
243254
_LOGGER.error('Exception raised in feature flag synchronization')
244255
_LOGGER.debug('Exception information: ', exc_info=True)
@@ -297,6 +308,24 @@ def is_running(self):
297308
"""Return whether the working is running."""
298309
return self._running
299310

311+
async def _apply_iff_if_needed(self, event):
312+
if not await self._check_instant_ff_update(event):
313+
return False
314+
try:
315+
new_feature_flag = from_raw(json.loads(self._get_feature_flag_definition(event)))
316+
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, [new_feature_flag], event.change_number)
317+
for segment_name in segment_list:
318+
if await self._segment_storage.get(segment_name) is None:
319+
_LOGGER.debug('Fetching new segment %s', segment_name)
320+
await self._segment_handler(segment_name, event.change_number)
321+
322+
await self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
323+
return True
324+
325+
except Exception as e:
326+
raise SplitStorageException(e)
327+
328+
300329
async def _check_instant_ff_update(self, event):
301330
if event.update_type == UpdateType.SPLIT_UPDATE and event.compression is not None and event.previous_change_number == await self._feature_flag_storage.get_change_number():
302331
return True
@@ -312,22 +341,12 @@ async def _run(self):
312341
continue
313342
_LOGGER.debug('Processing split_update %d', event.change_number)
314343
try:
315-
if await self._check_instant_ff_update(event):
316-
try:
317-
new_feature_flag = from_raw(json.loads(self._get_feature_flag_definition(event)))
318-
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, [new_feature_flag], event.change_number)
319-
for segment_name in segment_list:
320-
if await self._segment_storage.get(segment_name) is None:
321-
_LOGGER.debug('Fetching new segment %s', segment_name)
322-
await self._segment_handler(segment_name, event.change_number)
323-
324-
await self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
325-
continue
326-
except Exception as e:
327-
_LOGGER.error('Exception raised in updating feature flag')
328-
_LOGGER.debug('Exception information: ', exc_info=True)
329-
pass
344+
if await self._apply_iff_if_needed(event):
345+
continue
330346
await self._handler(event.change_number)
347+
except SplitStorageException as e: # pylint: disable=broad-except
348+
_LOGGER.error('Exception Updating Feature Flag')
349+
_LOGGER.debug('Exception information: ', exc_info=True)
331350
except Exception as e: # pylint: disable=broad-except
332351
_LOGGER.error('Exception raised in split synchronization')
333352
_LOGGER.debug('Exception information: ', exc_info=True)

splitio/storage/redis.py

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,6 @@ def kill_locally(self, feature_flag_name, default_treatment, change_number):
157157
class RedisSplitStorage(RedisSplitStorageBase):
158158
"""Redis-based storage for feature flags."""
159159

160-
_FEATURE_FLAG_KEY = 'SPLITIO.split.{feature_flag_name}'
161-
_FEATURE_FLAG_TILL_KEY = 'SPLITIO.splits.till'
162-
_TRAFFIC_TYPE_KEY = 'SPLITIO.trafficType.{traffic_type_name}'
163-
_FLAG_SET_KEY = 'SPLITIO.flagSet.{flag_set}'
164-
165160
def __init__(self, redis_client, enable_caching=False, max_age=DEFAULT_MAX_AGE, config_flag_sets=[]):
166161
"""
167162
Class constructor.
@@ -213,7 +208,8 @@ def get_feature_flags_by_sets(self, flag_sets):
213208

214209
keys = [self._get_flag_set_key(flag_set) for flag_set in sets_to_fetch]
215210
pipe = self._pipe()
216-
[pipe.smembers(key) for key in keys]
211+
for key in keys:
212+
pipe.smembers(key)
217213
result_sets = pipe.execute()
218214
_LOGGER.debug("Fetchting Feature flags by set [%s] from redis" % (keys))
219215
_LOGGER.debug(result_sets)
@@ -342,7 +338,9 @@ def __init__(self, redis_client, enable_caching=False, max_age=DEFAULT_MAX_AGE,
342338
self.flag_set_filter = FlagSetsFilter(config_flag_sets)
343339
self._pipe = self.redis.pipeline
344340
if enable_caching:
345-
self._cache = LocalMemoryCacheAsync(None, None, max_age)
341+
self._feature_flag_cache = LocalMemoryCacheAsync(None, None, max_age)
342+
self._traffic_type_cache = LocalMemoryCacheAsync(None, None, max_age)
343+
346344

347345
async def get(self, feature_flag_name): # pylint: disable=method-hidden
348346
"""
@@ -359,15 +357,16 @@ async def get(self, feature_flag_name): # pylint: disable=method-hidden
359357
:type change_number: int
360358
"""
361359
try:
362-
if self._enable_caching and await self._cache.get_key(feature_flag_name) is not None:
363-
raw = await self._cache.get_key(feature_flag_name)
364-
else:
365-
raw = await self.redis.get(self._get_key(feature_flag_name))
360+
raw_feature_flags = None
361+
if self._enable_caching:
362+
raw_feature_flags = await self._feature_flag_cache.get_key(feature_flag_name)
363+
if raw_feature_flags is None:
364+
raw_feature_flags = await self.redis.get(self._get_key(feature_flag_name))
366365
if self._enable_caching:
367-
await self._cache.add_key(feature_flag_name, raw)
366+
await self._feature_flag_cache.add_key(feature_flag_name, raw_feature_flags)
368367
_LOGGER.debug("Fetchting feature flag [%s] from redis" % feature_flag_name)
369-
_LOGGER.debug(raw)
370-
return splits.from_raw(json.loads(raw)) if raw is not None else None
368+
_LOGGER.debug(raw_feature_flags)
369+
return splits.from_raw(json.loads(raw_feature_flags)) if raw_feature_flags is not None else None
371370

372371
except RedisAdapterException:
373372
_LOGGER.error('Error fetching feature flag from storage')
@@ -410,13 +409,13 @@ async def fetch_many(self, feature_flag_names):
410409
"""
411410
to_return = dict()
412411
try:
413-
if self._enable_caching and await self._cache.get_key(frozenset(feature_flag_names)) is not None:
414-
raw_feature_flags = await self._cache.get_key(frozenset(feature_flag_names))
415-
else:
416-
keys = [self._get_key(feature_flag_name) for feature_flag_name in feature_flag_names]
417-
raw_feature_flags = await self.redis.mget(keys)
412+
raw_feature_flags = None
413+
if self._enable_caching:
414+
raw_feature_flags = await self._feature_flag_cache.get_key(frozenset(feature_flag_names))
415+
if raw_feature_flags is None:
416+
raw_feature_flags = await self.redis.mget([self._get_key(feature_flag_name) for feature_flag_name in feature_flag_names])
418417
if self._enable_caching:
419-
await self._cache.add_key(frozenset(feature_flag_names), raw_feature_flags)
418+
await self._feature_flag_cache.add_key(frozenset(feature_flag_names), raw_feature_flags)
420419
for i in range(len(feature_flag_names)):
421420
feature_flag = None
422421
try:
@@ -439,13 +438,14 @@ async def is_valid_traffic_type(self, traffic_type_name): # pylint: disable=met
439438
:rtype: bool
440439
"""
441440
try:
442-
if self._enable_caching and await self._cache.get_key(traffic_type_name) is not None:
443-
raw = await self._cache.get_key(traffic_type_name)
444-
else:
445-
raw = await self.redis.get(self._get_traffic_type_key(traffic_type_name))
441+
raw_traffic_type = None
442+
if self._enable_caching:
443+
raw_traffic_type = await self._traffic_type_cache.get_key(traffic_type_name)
444+
if raw_traffic_type is None:
445+
raw_traffic_type = await self.redis.get(self._get_traffic_type_key(traffic_type_name))
446446
if self._enable_caching:
447-
await self._cache.add_key(traffic_type_name, raw)
448-
count = json.loads(raw) if raw else 0
447+
await self._traffic_type_cache.add_key(traffic_type_name, raw_traffic_type)
448+
count = json.loads(raw_traffic_type) if raw_traffic_type else 0
449449
return count > 0
450450

451451
except RedisAdapterException:

splitio/sync/split.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,7 @@ def _fetch_until(self, fetch_options, till=None):
112112
_LOGGER.error('Exception raised while fetching feature flags')
113113
_LOGGER.debug('Exception information: ', exc_info=True)
114114
raise exc
115-
fetched_feature_flags = []
116-
[fetched_feature_flags.append(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
115+
fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
117116
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes['till'])
118117
if feature_flag_changes['till'] == feature_flag_changes['since']:
119118
return feature_flag_changes['till'], segment_list

0 commit comments

Comments
 (0)