Skip to content

Commit bd585a4

Browse files
committed
Added timeouts and used one http session for SSE, added stopping manager tasks when destroy is called and removed references to tasks
1 parent 33f15fa commit bd585a4

File tree

12 files changed

+104
-61
lines changed

12 files changed

+104
-61
lines changed

splitio/client/factory.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ def __init__( # pylint: disable=too-many-arguments
367367
self._manager_start_task = manager_start_task
368368
self._status = Status.NOT_INITIALIZED
369369
self._sdk_ready_flag = asyncio.Event()
370-
asyncio.get_running_loop().create_task(self._update_status_when_ready_async())
370+
self._ready_task = asyncio.get_running_loop().create_task(self._update_status_when_ready_async())
371371

372372
async def _update_status_when_ready_async(self):
373373
"""Wait until the sdk is ready and update the status for async mode."""
@@ -377,6 +377,7 @@ async def _update_status_when_ready_async(self):
377377

378378
if self._manager_start_task is not None:
379379
await self._manager_start_task
380+
self._manager_start_task = None
380381
await self._telemetry_init_producer.record_ready_time(get_current_epoch_time_ms() - self._ready_time)
381382
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
382383
await self._telemetry_init_producer.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
@@ -430,14 +431,25 @@ async def destroy(self, destroyed_event=None):
430431

431432
try:
432433
_LOGGER.info('Factory destroy called, stopping tasks.')
434+
if self._manager_start_task is not None and not self._manager_start_task.done():
435+
self._manager_start_task.cancel()
436+
433437
if self._sync_manager is not None:
434438
await self._sync_manager.stop(True)
435439

440+
if not self._ready_task.done():
441+
self._ready_task.cancel()
442+
self._ready_task = None
443+
436444
if isinstance(self._storages['splits'], RedisSplitStorageAsync):
437445
await self._get_storage('splits').redis.close()
438446

439447
if isinstance(self._sync_manager, ManagerAsync) and isinstance(self._telemetry_submitter, InMemoryTelemetrySubmitterAsync):
440448
await self._telemetry_submitter._telemetry_api._client.close_session()
449+
450+
if isinstance(self._sync_manager, ManagerAsync) and self._sync_manager._streaming_enabled:
451+
await self._sync_manager._push._sse_client._client.close_session()
452+
441453
except Exception as e:
442454
_LOGGER.error('Exception destroying factory.')
443455
_LOGGER.debug(str(e))

splitio/push/manager.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,9 +350,10 @@ async def stop(self, blocking=False):
350350
if self._token_task:
351351
self._token_task.cancel()
352352

353-
stop_task = asyncio.get_running_loop().create_task(self._stop_current_conn())
354353
if blocking:
355-
await stop_task
354+
await self._stop_current_conn()
355+
else:
356+
asyncio.get_running_loop().create_task(self._stop_current_conn())
356357

357358
async def _event_handler(self, event):
358359
"""
@@ -382,6 +383,7 @@ async def _token_refresh(self, current_token):
382383
:param current_token: token (parsed) JWT
383384
:type current_token: splitio.models.token.Token
384385
"""
386+
_LOGGER.debug("Next token refresh in " + str(self._get_time_period(current_token)) + " seconds")
385387
await asyncio.sleep(self._get_time_period(current_token))
386388
await self._stop_current_conn()
387389
self._running_task = asyncio.get_running_loop().create_task(self._trigger_connection_flow())
@@ -441,6 +443,7 @@ async def _trigger_connection_flow(self):
441443
finally:
442444
if self._token_task is not None:
443445
self._token_task.cancel()
446+
self._token_task = None
444447
self._running = False
445448
self._done.set()
446449

@@ -529,4 +532,5 @@ async def _stop_current_conn(self):
529532
await self._sse_client.stop()
530533
self._running_task.cancel()
531534
await self._running_task
535+
self._running_task = None
532536
_LOGGER.debug("SplitSSE tasks are stopped")

splitio/push/splitsse.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ async def start(self, token):
219219
_LOGGER.debug('stack trace: ', exc_info=True)
220220
finally:
221221
self.status = SplitSSEClient._Status.IDLE
222-
_LOGGER.debug('sse connection ended.')
222+
_LOGGER.debug('Split sse connection ended.')
223223
self._event_source_ended.set()
224224

225225
async def stop(self):
@@ -230,7 +230,10 @@ async def stop(self):
230230
return
231231

232232
await self._client.shutdown()
233+
# catching exception to avoid task hanging
233234
try:
234235
await self._event_source_ended.wait()
235-
except asyncio.CancelledError:
236+
except asyncio.CancelledError as e:
237+
_LOGGER.error("Exception waiting for event source ended")
238+
_LOGGER.debug('stack trace: ', exc_info=True)
236239
pass

splitio/push/sse.py

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ def __init__(self, socket_read_timeout=_DEFAULT_SOCKET_READ_TIMEOUT):
155155
self._socket_read_timeout = socket_read_timeout + socket_read_timeout * .3
156156
self._response = None
157157
self._done = asyncio.Event()
158+
client_timeout = aiohttp.ClientTimeout(total=0, sock_read=self._socket_read_timeout)
159+
self._sess = aiohttp.ClientSession(timeout=client_timeout)
158160

159161
async def start(self, url, extra_headers=None): # pylint:disable=protected-access
160162
"""
@@ -168,46 +170,53 @@ async def start(self, url, extra_headers=None): # pylint:disable=protected-acce
168170
raise RuntimeError('Client already started.')
169171

170172
self._done.clear()
171-
client_timeout = aiohttp.ClientTimeout(total=0, sock_read=self._socket_read_timeout)
172-
async with aiohttp.ClientSession(timeout=client_timeout) as sess:
173-
try:
174-
async with sess.get(url, headers=get_headers(extra_headers)) as response:
175-
self._response = response
176-
event_builder = EventBuilder()
177-
async for line in response.content:
178-
if line.startswith(b':'):
179-
_LOGGER.debug("skipping emtpy line / comment")
180-
continue
181-
elif line in _EVENT_SEPARATORS:
182-
_LOGGER.debug("dispatching event: %s", event_builder.build())
183-
yield event_builder.build()
184-
event_builder = EventBuilder()
185-
else:
186-
event_builder.process_line(line)
187-
188-
except Exception as exc: # pylint:disable=broad-except
189-
if self._is_conn_closed_error(exc):
190-
_LOGGER.debug('sse connection ended.')
191-
return
192-
193-
_LOGGER.error('http client is throwing exceptions')
194-
_LOGGER.error('stack trace: ', exc_info=True)
195-
196-
finally:
197-
self._response = None
198-
self._done.set()
173+
try:
174+
async with self._sess.get(url, headers=get_headers(extra_headers)) as response:
175+
self._response = response
176+
event_builder = EventBuilder()
177+
async for line in response.content:
178+
if line.startswith(b':'):
179+
_LOGGER.debug("skipping emtpy line / comment")
180+
continue
181+
elif line in _EVENT_SEPARATORS:
182+
_LOGGER.debug("dispatching event: %s", event_builder.build())
183+
yield event_builder.build()
184+
event_builder = EventBuilder()
185+
else:
186+
event_builder.process_line(line)
187+
188+
except Exception as exc: # pylint:disable=broad-except
189+
if self._is_conn_closed_error(exc):
190+
_LOGGER.debug('sse connection ended.')
191+
return
192+
193+
_LOGGER.error('http client is throwing exceptions')
194+
_LOGGER.error('stack trace: ', exc_info=True)
195+
196+
finally:
197+
self._response = None
198+
self._done.set()
199199

200200
async def shutdown(self):
201201
"""Close connection"""
202202
if self._response:
203203
self._response.close()
204-
await self._done.wait()
204+
# catching exception to avoid task hanging
205+
try:
206+
await self._done.wait()
207+
except asyncio.CancelledError:
208+
_LOGGER.error("Exception waiting for event source ended")
209+
_LOGGER.debug('stack trace: ', exc_info=True)
210+
pass
205211

206212
@staticmethod
207213
def _is_conn_closed_error(exc):
208214
"""Check if the ReadError is caused by the connection being closed."""
209215
return isinstance(exc, aiohttp.ClientConnectionError) and str(exc) == "Connection closed"
210216

217+
async def close_session(self):
218+
if not self._sess.closed:
219+
await self._sess.close()
211220

212221
def get_headers(extra=None):
213222
"""

splitio/sync/manager.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -172,19 +172,20 @@ def __init__(self, synchronizer, auth_api, streaming_enabled, sdk_metadata, tele
172172
self._backoff = Backoff()
173173
self._queue = asyncio.Queue()
174174
self._push = PushManagerAsync(auth_api, synchronizer, self._queue, sdk_metadata, telemetry_runtime_producer, sse_url, client_key)
175-
self._push_status_handler_task = None
175+
self._stopped = False
176176

177177
async def start(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
178178
"""Start the SDK synchronization tasks."""
179+
self._stopped = False
179180
try:
180181
await self._synchronizer.sync_all(max_retry_attempts)
181-
self._synchronizer.start_periodic_data_recording()
182-
if self._streaming_enabled:
183-
self._push_status_handler_task = asyncio.get_running_loop().create_task(self._streaming_feedback_handler())
184-
self._push.start()
185-
else:
186-
self._synchronizer.start_periodic_fetching()
187-
182+
if not self._stopped:
183+
self._synchronizer.start_periodic_data_recording()
184+
if self._streaming_enabled:
185+
asyncio.get_running_loop().create_task(self._streaming_feedback_handler())
186+
self._push.start()
187+
else:
188+
self._synchronizer.start_periodic_fetching()
188189
except (APIException, RuntimeError):
189190
_LOGGER.error('Exception raised starting Split Manager')
190191
_LOGGER.debug('Exception information: ', exc_info=True)
@@ -201,8 +202,9 @@ async def stop(self, blocking):
201202
if self._streaming_enabled:
202203
self._push_status_handler_active = False
203204
await self._queue.put(self._CENTINEL_EVENT)
204-
await self._push.stop()
205+
await self._push.stop(blocking)
205206
await self._synchronizer.shutdown(blocking)
207+
self._stopped = True
206208

207209
async def _streaming_feedback_handler(self):
208210
"""

splitio/sync/synchronizer.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ def __init__(self, split_synchronizers, split_tasks):
520520
:type split_tasks: splitio.sync.synchronizer.SplitTasks
521521
"""
522522
SynchronizerInMemoryBase.__init__(self, split_synchronizers, split_tasks)
523-
self.stop_periodic_data_recording_task = None
523+
self._shutdown = False
524524

525525
async def _synchronize_segments(self):
526526
_LOGGER.debug('Starting segments synchronization')
@@ -551,6 +551,9 @@ async def synchronize_splits(self, till, sync_segments=True):
551551
:returns: whether the synchronization was successful or not.
552552
:rtype: bool
553553
"""
554+
if self._shutdown:
555+
return
556+
554557
_LOGGER.debug('Starting feature flags synchronization')
555558
try:
556559
new_segments = []
@@ -583,8 +586,9 @@ async def sync_all(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
583586
:param max_retry_attempts: apply max attempts if it set to absilute integer.
584587
:type max_retry_attempts: int
585588
"""
589+
self._shutdown = False
586590
retry_attempts = 0
587-
while True:
591+
while not self._shutdown:
588592
try:
589593
sync_result = await self.synchronize_splits(None, False)
590594
if not sync_result.success and sync_result.error_code is not None and sync_result.error_code == 414:
@@ -609,7 +613,8 @@ async def sync_all(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
609613
if retry_attempts > max_retry_attempts:
610614
break
611615
how_long = self._backoff.get()
612-
time.sleep(how_long)
616+
if not self._shutdown:
617+
time.sleep(how_long)
613618

614619
_LOGGER.error("Could not correctly synchronize feature flags and segments after %d attempts.", retry_attempts)
615620

@@ -621,6 +626,7 @@ async def shutdown(self, blocking):
621626
:type blocking: bool
622627
"""
623628
_LOGGER.debug('Shutting down tasks.')
629+
self._shutdown = True
624630
await self._split_synchronizers.segment_sync.shutdown()
625631
await self.stop_periodic_fetching()
626632
await self.stop_periodic_data_recording(blocking)
@@ -639,10 +645,11 @@ async def stop_periodic_data_recording(self, blocking):
639645
:type blocking: bool
640646
"""
641647
_LOGGER.debug('Stopping periodic data recording')
642-
stop_periodic_data_recording_task = asyncio.get_running_loop().create_task(self._stop_periodic_data_recording())
643648
if blocking:
644-
await stop_periodic_data_recording_task
649+
await self._stop_periodic_data_recording()
645650
_LOGGER.debug('all tasks finished successfully.')
651+
else:
652+
asyncio.get_running_loop().create_task(self._stop_periodic_data_recording())
646653

647654
async def _stop_periodic_data_recording(self):
648655
"""
@@ -798,7 +805,6 @@ def __init__(self, split_synchronizers, split_tasks):
798805
:type split_tasks: splitio.sync.synchronizer.SplitTasks
799806
"""
800807
RedisSynchronizerBase.__init__(self, split_synchronizers, split_tasks)
801-
self.stop_periodic_data_recording_task = None
802808

803809
async def shutdown(self, blocking):
804810
"""
@@ -829,7 +835,7 @@ async def stop_periodic_data_recording(self, blocking):
829835
await self._stop_periodic_data_recording()
830836
_LOGGER.debug('all tasks finished successfully.')
831837
else:
832-
self.stop_periodic_data_recording_task = asyncio.get_running_loop().create_task(self._stop_periodic_data_recording)
838+
asyncio.get_running_loop().create_task(self._stop_periodic_data_recording)
833839

834840

835841

splitio/tasks/util/asynctask.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ def start(self):
288288
return
289289
# Start execution
290290
self._completion_event.clear()
291-
self._wrapper_task = asyncio.get_running_loop().create_task(self._execution_wrapper())
291+
asyncio.get_running_loop().create_task(self._execution_wrapper())
292292

293293
async def stop(self, wait_for_completion=False):
294294
"""

splitio/tasks/util/workerpool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ async def _do_work(self, message):
178178

179179
def start(self):
180180
"""Start the workers."""
181-
self._task = asyncio.get_running_loop().create_task(self._schedule_work())
181+
asyncio.get_running_loop().create_task(self._schedule_work())
182182

183183
async def submit_work(self, jobs):
184184
"""

tests/client/test_factory.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -699,9 +699,9 @@ class SplitFactoryAsyncTests(object):
699699
@pytest.mark.asyncio
700700
async def test_flag_sets_counts(self):
701701
factory = await get_factory_async("none", config={
702-
'flagSetsFilter': ['set1', 'set2', 'set3']
702+
'flagSetsFilter': ['set1', 'set2', 'set3'],
703+
'streamEnabled': False
703704
})
704-
705705
assert factory._telemetry_init_producer._telemetry_storage._tel_config._flag_sets == 3
706706
assert factory._telemetry_init_producer._telemetry_storage._tel_config._flag_sets_invalid == 0
707707
await factory.destroy()
@@ -741,7 +741,7 @@ async def synchronize_config(*_):
741741
mocker.patch('splitio.sync.telemetry.InMemoryTelemetrySubmitterAsync.synchronize_config', new=synchronize_config)
742742

743743
# Start factory and make assertions
744-
factory = await get_factory_async('some_api_key')
744+
factory = await get_factory_async('some_api_key', config={'streamingEmabled': False})
745745
assert isinstance(factory, SplitFactoryAsync)
746746
assert isinstance(factory._storages['splits'], inmemmory.InMemorySplitStorageAsync)
747747
assert isinstance(factory._storages['segments'], inmemmory.InMemorySegmentStorageAsync)
@@ -859,6 +859,10 @@ async def stop(*_):
859859
pass
860860
factory._sync_manager.stop = stop
861861

862+
async def start(*_):
863+
pass
864+
factory._sync_manager.start = start
865+
862866
try:
863867
await factory.block_until_ready(1)
864868
except:

tests/integration/test_client_e2e.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2002,7 +2002,7 @@ async def _setup_method(self):
20022002
await redis_client.set(split_storage._get_key(split['name']), json.dumps(split))
20032003
if split.get('sets') is not None:
20042004
for flag_set in split.get('sets'):
2005-
redis_client.sadd(split_storage._get_flag_set_key(flag_set), split['name'])
2005+
await redis_client.sadd(split_storage._get_flag_set_key(flag_set), split['name'])
20062006

20072007
await redis_client.set(split_storage._FEATURE_FLAG_TILL_KEY, data['till'])
20082008

@@ -2217,7 +2217,7 @@ async def _setup_method(self):
22172217
await redis_client.set(split_storage._get_key(split['name']), json.dumps(split))
22182218
if split.get('sets') is not None:
22192219
for flag_set in split.get('sets'):
2220-
redis_client.sadd(split_storage._get_flag_set_key(flag_set), split['name'])
2220+
await redis_client.sadd(split_storage._get_flag_set_key(flag_set), split['name'])
22212221
await redis_client.set(split_storage._FEATURE_FLAG_TILL_KEY, data['till'])
22222222

22232223
segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentEmployeesChanges.json')

0 commit comments

Comments
 (0)