Skip to content

Commit e43fee8

Browse files
authored
Merge pull request #504 from splitio/async-sse-timeouts
added SSE total and socket read timeouts
2 parents a81f503 + 0d2e69c commit e43fee8

File tree

13 files changed

+137
-115
lines changed

13 files changed

+137
-115
lines changed

splitio/client/client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ def get_treatment(self, key, feature_flag_name, attributes=None):
229229
return treatment
230230

231231
except:
232-
# TODO: maybe log here?
232+
_LOGGER.error('get_treatment failed')
233233
return CONTROL
234234

235235
def get_treatment_with_config(self, key, feature_flag_name, attributes=None):
@@ -252,7 +252,7 @@ def get_treatment_with_config(self, key, feature_flag_name, attributes=None):
252252
return self._get_treatment(MethodExceptionsAndLatencies.TREATMENT_WITH_CONFIG, key, feature_flag_name, attributes)
253253

254254
except Exception:
255-
# TODO: maybe log here?
255+
_LOGGER.error('get_treatment_with_config failed')
256256
return CONTROL, None
257257

258258
def _get_treatment(self, method, key, feature, attributes=None):
@@ -289,7 +289,7 @@ def _get_treatment(self, method, key, feature, attributes=None):
289289
ctx = self._context_factory.context_for(key, [feature])
290290
input_validator.validate_feature_flag_names({feature: ctx.flags.get(feature)}, 'get_' + method.value)
291291
result = self._evaluator.eval_with_context(key, bucketing, feature, attributes, ctx)
292-
except Exception as e: # toto narrow this
292+
except RuntimeError as e:
293293
_LOGGER.error('Error getting treatment for feature flag')
294294
_LOGGER.debug('Error: ', exc_info=True)
295295
self._telemetry_evaluation_producer.record_exception(method)
@@ -565,7 +565,7 @@ def _get_treatments(self, key, features, method, attributes=None):
565565
ctx = self._context_factory.context_for(key, features)
566566
input_validator.validate_feature_flag_names({feature: ctx.flags.get(feature) for feature in features}, 'get_' + method.value)
567567
results = self._evaluator.eval_many_with_context(key, bucketing, features, attributes, ctx)
568-
except Exception as e: # toto narrow this
568+
except RuntimeError as e:
569569
_LOGGER.error('Error getting treatment for feature flag')
570570
_LOGGER.debug('Error: ', exc_info=True)
571571
self._telemetry_evaluation_producer.record_exception(method)
@@ -696,7 +696,7 @@ async def get_treatment(self, key, feature_flag_name, attributes=None):
696696
return treatment
697697

698698
except:
699-
# TODO: maybe log here?
699+
_LOGGER.error('get_treatment failed')
700700
return CONTROL
701701

702702
async def get_treatment_with_config(self, key, feature_flag_name, attributes=None):
@@ -719,7 +719,7 @@ async def get_treatment_with_config(self, key, feature_flag_name, attributes=Non
719719
return await self._get_treatment(MethodExceptionsAndLatencies.TREATMENT_WITH_CONFIG, key, feature_flag_name, attributes)
720720

721721
except Exception:
722-
# TODO: maybe log here?
722+
_LOGGER.error('get_treatment_with_config failed')
723723
return CONTROL, None
724724

725725
async def _get_treatment(self, method, key, feature, attributes=None):

splitio/client/factory.py

Lines changed: 25 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ class TimeoutException(Exception):
101101
class SplitFactoryBase(object): # pylint: disable=too-many-instance-attributes
102102
"""Split Factory/Container class."""
103103

104+
def __init__(self, sdk_key, storages):
105+
self._sdk_key = sdk_key
106+
self._storages = storages
107+
self._status = None
108+
104109
def _get_storage(self, name):
105110
"""
106111
Return a reference to the specified storage.
@@ -162,8 +167,7 @@ def __init__( # pylint: disable=too-many-arguments
162167
telemetry_producer=None,
163168
telemetry_init_producer=None,
164169
telemetry_submitter=None,
165-
preforked_initialization=False,
166-
manager_start_task=None
170+
preforked_initialization=False
167171
):
168172
"""
169173
Class constructor.
@@ -183,8 +187,7 @@ def __init__( # pylint: disable=too-many-arguments
183187
:param preforked_initialization: Whether should be instantiated as preforked or not.
184188
:type preforked_initialization: bool
185189
"""
186-
self._sdk_key = sdk_key
187-
self._storages = storages
190+
SplitFactoryBase.__init__(self, sdk_key, storages)
188191
self._labels_enabled = labels_enabled
189192
self._sync_manager = sync_manager
190193
self._recorder = recorder
@@ -328,12 +331,11 @@ def __init__( # pylint: disable=too-many-arguments
328331
labels_enabled,
329332
recorder,
330333
sync_manager=None,
331-
sdk_ready_flag=None,
332334
telemetry_producer=None,
333335
telemetry_init_producer=None,
334336
telemetry_submitter=None,
335-
preforked_initialization=False,
336-
manager_start_task=None
337+
manager_start_task=None,
338+
api_client=None
337339
):
338340
"""
339341
Class constructor.
@@ -353,12 +355,10 @@ def __init__( # pylint: disable=too-many-arguments
353355
:param preforked_initialization: Whether should be instantiated as preforked or not.
354356
:type preforked_initialization: bool
355357
"""
356-
self._sdk_key = sdk_key
357-
self._storages = storages
358+
SplitFactoryBase.__init__(self, sdk_key, storages)
358359
self._labels_enabled = labels_enabled
359360
self._sync_manager = sync_manager
360361
self._recorder = recorder
361-
self._preforked_initialization = preforked_initialization
362362
self._telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()
363363
self._telemetry_init_producer = telemetry_init_producer
364364
self._telemetry_submitter = telemetry_submitter
@@ -367,16 +367,14 @@ def __init__( # pylint: disable=too-many-arguments
367367
self._manager_start_task = manager_start_task
368368
self._status = Status.NOT_INITIALIZED
369369
self._sdk_ready_flag = asyncio.Event()
370-
asyncio.get_running_loop().create_task(self._update_status_when_ready_async())
370+
self._ready_task = asyncio.get_running_loop().create_task(self._update_status_when_ready_async())
371+
self._api_client = api_client
371372

372373
async def _update_status_when_ready_async(self):
373374
"""Wait until the sdk is ready and update the status for async mode."""
374-
if self._preforked_initialization:
375-
self._status = Status.WAITING_FORK
376-
return
377-
378375
if self._manager_start_task is not None:
379376
await self._manager_start_task
377+
self._manager_start_task = None
380378
await self._telemetry_init_producer.record_ready_time(get_current_epoch_time_ms() - self._ready_time)
381379
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
382380
await self._telemetry_init_producer.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
@@ -430,14 +428,22 @@ async def destroy(self, destroyed_event=None):
430428

431429
try:
432430
_LOGGER.info('Factory destroy called, stopping tasks.')
431+
if self._manager_start_task is not None and not self._manager_start_task.done():
432+
self._manager_start_task.cancel()
433+
433434
if self._sync_manager is not None:
434435
await self._sync_manager.stop(True)
435436

437+
if not self._ready_task.done():
438+
self._ready_task.cancel()
439+
self._ready_task = None
440+
436441
if isinstance(self._storages['splits'], RedisSplitStorageAsync):
437442
await self._get_storage('splits').redis.close()
438443

439444
if isinstance(self._sync_manager, ManagerAsync) and isinstance(self._telemetry_submitter, InMemoryTelemetrySubmitterAsync):
440-
await self._telemetry_submitter._telemetry_api._client.close_session()
445+
await self._api_client.close_session()
446+
441447
except Exception as e:
442448
_LOGGER.error('Exception destroying factory.')
443449
_LOGGER.debug(str(e))
@@ -453,24 +459,6 @@ def client(self):
453459
"""
454460
return ClientAsync(self, self._recorder, self._labels_enabled)
455461

456-
457-
async def resume(self):
458-
"""
459-
Function in charge of starting periodic/realtime synchronization after a fork.
460-
"""
461-
if not self._waiting_fork():
462-
_LOGGER.warning('Cannot call resume')
463-
return
464-
self._sync_manager.recreate()
465-
self._sdk_ready_flag = asyncio.Event()
466-
self._sdk_internal_ready_flag = self._sdk_ready_flag
467-
self._sync_manager._ready_flag = self._sdk_ready_flag
468-
await self._get_storage('impressions').clear()
469-
await self._get_storage('events').clear()
470-
self._preforked_initialization = False # reset for status updater
471-
asyncio.get_running_loop().create_task(self._update_status_when_ready_async())
472-
473-
474462
def _wrap_impression_listener(listener, metadata):
475463
"""
476464
Wrap the impression listener if any.
@@ -718,8 +706,6 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=
718706

719707
synchronizer = SynchronizerAsync(synchronizers, tasks)
720708

721-
preforked_initialization = cfg.get('preforkedInitialization', False)
722-
723709
manager = ManagerAsync(synchronizer, apis['auth'], cfg['streamingEnabled'],
724710
sdk_metadata, telemetry_runtime_producer, streaming_api_base_url, api_key[-4:])
725711

@@ -739,19 +725,13 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=
739725

740726
await telemetry_init_producer.record_config(cfg, extra_cfg, total_flag_sets, invalid_flag_sets)
741727

742-
if preforked_initialization:
743-
await synchronizer.sync_all(max_retry_attempts=_MAX_RETRY_SYNC_ALL)
744-
await synchronizer._split_synchronizers._segment_sync.shutdown()
745-
746-
return SplitFactoryAsync(api_key, storages, cfg['labelsEnabled'],
747-
recorder, manager, None, telemetry_producer, telemetry_init_producer, telemetry_submitter, preforked_initialization=preforked_initialization)
748-
749728
manager_start_task = asyncio.get_running_loop().create_task(manager.start())
750729

751730
return SplitFactoryAsync(api_key, storages, cfg['labelsEnabled'],
752-
recorder, manager, manager_start_task,
731+
recorder, manager,
753732
telemetry_producer, telemetry_init_producer,
754-
telemetry_submitter, manager_start_task=manager_start_task)
733+
telemetry_submitter, manager_start_task=manager_start_task,
734+
api_client=http_client)
755735

756736
def _build_redis_factory(api_key, cfg):
757737
"""Build and return a split factory with redis-based storage."""

splitio/push/manager.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,10 +349,15 @@ async def stop(self, blocking=False):
349349

350350
if self._token_task:
351351
self._token_task.cancel()
352+
self._token_task = None
352353

353-
stop_task = asyncio.get_running_loop().create_task(self._stop_current_conn())
354354
if blocking:
355-
await stop_task
355+
await self._stop_current_conn()
356+
else:
357+
asyncio.get_running_loop().create_task(self._stop_current_conn())
358+
359+
async def close_sse_http_client(self):
360+
await self._sse_client.close_sse_http_client()
356361

357362
async def _event_handler(self, event):
358363
"""
@@ -382,6 +387,7 @@ async def _token_refresh(self, current_token):
382387
:param current_token: token (parsed) JWT
383388
:type current_token: splitio.models.token.Token
384389
"""
390+
_LOGGER.debug("Next token refresh in " + str(self._get_time_period(current_token)) + " seconds")
385391
await asyncio.sleep(self._get_time_period(current_token))
386392
await self._stop_current_conn()
387393
self._running_task = asyncio.get_running_loop().create_task(self._trigger_connection_flow())
@@ -441,6 +447,7 @@ async def _trigger_connection_flow(self):
441447
finally:
442448
if self._token_task is not None:
443449
self._token_task.cancel()
450+
self._token_task = None
444451
self._running = False
445452
await self._processor.update_workers_status(False)
446453
self._done.set()
@@ -530,4 +537,5 @@ async def _stop_current_conn(self):
530537
await self._sse_client.stop()
531538
self._running_task.cancel()
532539
await self._running_task
540+
self._running_task = None
533541
_LOGGER.debug("SplitSSE tasks are stopped")

splitio/push/splitsse.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ def __init__(self, sdk_metadata, client_key=None, base_url='https://streaming.sp
181181
self._base_url = base_url
182182
self.status = SplitSSEClient._Status.IDLE
183183
self._metadata = headers_from_metadata(sdk_metadata, client_key)
184-
self._client = SSEClientAsync(timeout=self.KEEPALIVE_TIMEOUT)
184+
self._client = SSEClientAsync(self.KEEPALIVE_TIMEOUT)
185185
self._event_source = None
186186
self._event_source_ended = asyncio.Event()
187187

@@ -219,7 +219,7 @@ async def start(self, token):
219219
_LOGGER.debug('stack trace: ', exc_info=True)
220220
finally:
221221
self.status = SplitSSEClient._Status.IDLE
222-
_LOGGER.debug('sse connection ended.')
222+
_LOGGER.debug('Split sse connection ended.')
223223
self._event_source_ended.set()
224224

225225
async def stop(self):
@@ -230,4 +230,13 @@ async def stop(self):
230230
return
231231

232232
await self._client.shutdown()
233-
await self._event_source_ended.wait()
233+
# catching exception to avoid task hanging
234+
try:
235+
await self._event_source_ended.wait()
236+
except asyncio.CancelledError as e:
237+
_LOGGER.error("Exception waiting for event source ended")
238+
_LOGGER.debug('stack trace: ', exc_info=True)
239+
pass
240+
241+
async def close_sse_http_client(self):
242+
await self._client.close_session()

0 commit comments

Comments
 (0)