@@ -101,6 +101,11 @@ class TimeoutException(Exception):
101101class SplitFactoryBase (object ): # pylint: disable=too-many-instance-attributes
102102 """Split Factory/Container class."""
103103
104+ def __init__ (self , sdk_key , storages ):
105+ self ._sdk_key = sdk_key
106+ self ._storages = storages
107+ self ._status = None
108+
104109 def _get_storage (self , name ):
105110 """
106111 Return a reference to the specified storage.
@@ -162,8 +167,7 @@ def __init__( # pylint: disable=too-many-arguments
162167 telemetry_producer = None ,
163168 telemetry_init_producer = None ,
164169 telemetry_submitter = None ,
165- preforked_initialization = False ,
166- manager_start_task = None
170+ preforked_initialization = False
167171 ):
168172 """
169173 Class constructor.
@@ -183,8 +187,7 @@ def __init__( # pylint: disable=too-many-arguments
183187 :param preforked_initialization: Whether should be instantiated as preforked or not.
184188 :type preforked_initialization: bool
185189 """
186- self ._sdk_key = sdk_key
187- self ._storages = storages
190+ SplitFactoryBase .__init__ (self , sdk_key , storages )
188191 self ._labels_enabled = labels_enabled
189192 self ._sync_manager = sync_manager
190193 self ._recorder = recorder
@@ -328,12 +331,12 @@ def __init__( # pylint: disable=too-many-arguments
328331 labels_enabled ,
329332 recorder ,
330333 sync_manager = None ,
331- sdk_ready_flag = None ,
332334 telemetry_producer = None ,
333335 telemetry_init_producer = None ,
334336 telemetry_submitter = None ,
335337 preforked_initialization = False ,
336- manager_start_task = None
338+ manager_start_task = None ,
339+ api_client = None
337340 ):
338341 """
339342 Class constructor.
@@ -353,8 +356,7 @@ def __init__( # pylint: disable=too-many-arguments
353356 :param preforked_initialization: Whether should be instantiated as preforked or not.
354357 :type preforked_initialization: bool
355358 """
356- self ._sdk_key = sdk_key
357- self ._storages = storages
359+ SplitFactoryBase .__init__ (self , sdk_key , storages )
358360 self ._labels_enabled = labels_enabled
359361 self ._sync_manager = sync_manager
360362 self ._recorder = recorder
@@ -368,6 +370,7 @@ def __init__( # pylint: disable=too-many-arguments
368370 self ._status = Status .NOT_INITIALIZED
369371 self ._sdk_ready_flag = asyncio .Event ()
370372 self ._ready_task = asyncio .get_running_loop ().create_task (self ._update_status_when_ready_async ())
373+ self ._api_client = api_client
371374
372375 async def _update_status_when_ready_async (self ):
373376 """Wait until the sdk is ready and update the status for async mode."""
@@ -445,10 +448,10 @@ async def destroy(self, destroyed_event=None):
445448 await self ._get_storage ('splits' ).redis .close ()
446449
447450 if isinstance (self ._sync_manager , ManagerAsync ) and isinstance (self ._telemetry_submitter , InMemoryTelemetrySubmitterAsync ):
448- await self ._telemetry_submitter . _telemetry_api . _client .close_session ()
451+ await self ._api_client .close_session ()
449452
450453 if isinstance (self ._sync_manager , ManagerAsync ) and self ._sync_manager ._streaming_enabled :
451- await self ._sync_manager ._push . _sse_client . _client . close_session ()
454+ await self ._sync_manager .close_sse_http_client ()
452455
453456 except Exception as e :
454457 _LOGGER .error ('Exception destroying factory.' )
@@ -465,24 +468,6 @@ def client(self):
465468 """
466469 return ClientAsync (self , self ._recorder , self ._labels_enabled )
467470
468-
469- async def resume (self ):
470- """
471- Function in charge of starting periodic/realtime synchronization after a fork.
472- """
473- if not self ._waiting_fork ():
474- _LOGGER .warning ('Cannot call resume' )
475- return
476- self ._sync_manager .recreate ()
477- self ._sdk_ready_flag = asyncio .Event ()
478- self ._sdk_internal_ready_flag = self ._sdk_ready_flag
479- self ._sync_manager ._ready_flag = self ._sdk_ready_flag
480- await self ._get_storage ('impressions' ).clear ()
481- await self ._get_storage ('events' ).clear ()
482- self ._preforked_initialization = False # reset for status updater
483- asyncio .get_running_loop ().create_task (self ._update_status_when_ready_async ())
484-
485-
486471def _wrap_impression_listener (listener , metadata ):
487472 """
488473 Wrap the impression listener if any.
@@ -749,19 +734,13 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=
749734
750735 await telemetry_init_producer .record_config (cfg , extra_cfg , total_flag_sets , invalid_flag_sets )
751736
752- if preforked_initialization :
753- await synchronizer .sync_all (max_retry_attempts = _MAX_RETRY_SYNC_ALL )
754- await synchronizer ._split_synchronizers ._segment_sync .shutdown ()
755-
756- return SplitFactoryAsync (api_key , storages , cfg ['labelsEnabled' ],
757- recorder , manager , None , telemetry_producer , telemetry_init_producer , telemetry_submitter , preforked_initialization = preforked_initialization )
758-
759737 manager_start_task = asyncio .get_running_loop ().create_task (manager .start ())
760738
761739 return SplitFactoryAsync (api_key , storages , cfg ['labelsEnabled' ],
762- recorder , manager , manager_start_task ,
740+ recorder , manager ,
763741 telemetry_producer , telemetry_init_producer ,
764- telemetry_submitter , manager_start_task = manager_start_task )
742+ telemetry_submitter , manager_start_task = manager_start_task ,
743+ api_client = http_client )
765744
766745def _build_redis_factory (api_key , cfg ):
767746 """Build and return a split factory with redis-based storage."""
0 commit comments