3535from splitio .api .telemetry import TelemetryAPI
3636from splitio .api .auth import AuthAPI
3737
38-
3938# Tasks
4039from splitio .tasks .split_sync import SplitSynchronizationTask
4140from splitio .tasks .segment_sync import SegmentSynchronizationTask
5352from splitio .sync .event import EventSynchronizer
5453from splitio .sync .telemetry import TelemetrySynchronizer
5554
55+ # Recorder
56+ from splitio .recorder .recorder import StandardRecorder , PipelinedRecorder
57+
5658# Localhost stuff
5759from splitio .client .localhost import LocalhostEventsStorage , LocalhostImpressionsStorage , \
5860 LocalhostTelemetryStorage
@@ -69,6 +71,7 @@ class Status(Enum):
6971 NOT_INITIALIZED = 'NOT_INITIALIZED'
7072 READY = 'READY'
7173 DESTROYED = 'DESTROYED'
74+ WAITING_FORK = 'WAITING_FORK'
7275
7376
7477class TimeoutException (Exception ):
@@ -85,9 +88,10 @@ def __init__( # pylint: disable=too-many-arguments
8588 apikey ,
8689 storages ,
8790 labels_enabled ,
88- impressions_manager ,
91+ recorder ,
8992 sync_manager = None ,
9093 sdk_ready_flag = None ,
94+ preforked_initialization = False ,
9195 ):
9296 """
9397 Class constructor.
@@ -102,20 +106,31 @@ def __init__( # pylint: disable=too-many-arguments
102106 :type sync_manager: splitio.sync.manager.Manager
103107 :param sdk_ready_flag: Event to set when the sdk is ready.
104108 :type sdk_ready_flag: threading.Event
105- :param impression_manager: Impressions manager instance
106- :type impression_listener: ImpressionsManager
109+ :param recorder: StatsRecorder instance
110+ :type recorder: StatsRecorder
111+ :param preforked_initialization: Whether should be instantiated as preforked or not.
112+ :type preforked_initialization: bool
107113 """
108114 self ._apikey = apikey
109115 self ._storages = storages
110116 self ._labels_enabled = labels_enabled
111117 self ._sync_manager = sync_manager
112118 self ._sdk_internal_ready_flag = sdk_ready_flag
113- self ._sdk_ready_flag = threading .Event ()
114- self ._impressions_manager = impressions_manager
119+ self ._recorder = recorder
120+ self ._preforked_initialization = preforked_initialization
121+ self ._start_status_updater ()
115122
123+ def _start_status_updater (self ):
124+ """
125+ Perform status updater
126+ """
127+ if self ._preforked_initialization :
128+ self ._status = Status .WAITING_FORK
129+ return
116130 # If we have a ready flag, it means we have sync tasks that need to finish
117131 # before the SDK client becomes ready.
118132 if self ._sdk_internal_ready_flag is not None :
133+ self ._sdk_ready_flag = threading .Event ()
119134 self ._status = Status .NOT_INITIALIZED
120135 # add a listener that updates the status to READY once the flag is set.
121136 ready_updater = threading .Thread (target = self ._update_status_when_ready ,
@@ -150,7 +165,7 @@ def client(self):
150165 This client is only a set of references to structures hold by the factory.
151166 Creating one a fast operation and safe to be used anywhere.
152167 """
153- return Client (self , self ._impressions_manager , self ._labels_enabled )
168+ return Client (self , self ._recorder , self ._labels_enabled )
154169
155170 def manager (self ):
156171 """
@@ -230,6 +245,38 @@ def destroyed(self):
230245 """
231246 return self ._status == Status .DESTROYED
232247
248+ def _waiting_fork (self ):
249+ """
250+ Return whether the factory is waiting to be recreated by forking or not.
251+
252+ :return: True if the factory is waiting to be recreated by forking. False otherwise.
253+ :rtype: bool
254+ """
255+ return self ._status == Status .WAITING_FORK
256+
257+ def resume (self ):
258+ """
259+ Function in charge of starting periodic/realtime synchronization after a fork.
260+ """
261+ if not self ._waiting_fork ():
262+ _LOGGER .warning ('Cannot call resume' )
263+ return
264+ self ._sync_manager .recreate ()
265+ sdk_ready_flag = threading .Event ()
266+ self ._sdk_internal_ready_flag = sdk_ready_flag
267+ self ._sync_manager ._ready_flag = sdk_ready_flag
268+ self ._get_storage ('telemetry' ).clear ()
269+ self ._get_storage ('impressions' ).clear ()
270+ self ._get_storage ('events' ).clear ()
271+ initialization_thread = threading .Thread (
272+ target = self ._sync_manager .start ,
273+ name = "SDKInitializer" ,
274+ )
275+ initialization_thread .setDaemon (True )
276+ initialization_thread .start ()
277+ self ._preforked_initialization = False # reset for status updater
278+ self ._start_status_updater ()
279+
233280
234281def _wrap_impression_listener (listener , metadata ):
235282 """
@@ -280,7 +327,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
280327 }
281328
282329 imp_manager = ImpressionsManager (
283- storages ['impressions' ].put ,
284330 cfg ['impressionsMode' ],
285331 True ,
286332 _wrap_impression_listener (cfg ['impressionListener' ], sdk_metadata ))
@@ -318,19 +364,34 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
318364
319365 synchronizer = Synchronizer (synchronizers , tasks )
320366
321- sdk_ready_flag = threading .Event ()
367+ preforked_initialization = cfg .get ('preforkedInitialization' , False )
368+
369+ sdk_ready_flag = threading .Event () if not preforked_initialization else None
322370 manager = Manager (sdk_ready_flag , synchronizer , apis ['auth' ], cfg ['streamingEnabled' ],
323371 streaming_api_base_url )
324372
373+ storages ['events' ].set_queue_full_hook (tasks .events_task .flush )
374+ storages ['impressions' ].set_queue_full_hook (tasks .impressions_task .flush )
375+
376+ recorder = StandardRecorder (
377+ imp_manager ,
378+ storages ['telemetry' ],
379+ storages ['events' ],
380+ storages ['impressions' ],
381+ )
382+
383+ if preforked_initialization :
384+ synchronizer .sync_all ()
385+ synchronizer ._split_synchronizers ._segment_sync .shutdown ()
386+ return SplitFactory (api_key , storages , cfg ['labelsEnabled' ],
387+ recorder , manager , preforked_initialization = preforked_initialization )
388+
325389 initialization_thread = threading .Thread (target = manager .start , name = "SDKInitializer" )
326390 initialization_thread .setDaemon (True )
327391 initialization_thread .start ()
328392
329- storages ['events' ].set_queue_full_hook (tasks .events_task .flush )
330- storages ['impressions' ].set_queue_full_hook (tasks .impressions_task .flush )
331-
332393 return SplitFactory (api_key , storages , cfg ['labelsEnabled' ],
333- imp_manager , manager , sdk_ready_flag )
394+ recorder , manager , sdk_ready_flag )
334395
335396
336397def _build_redis_factory (api_key , cfg ):
@@ -346,12 +407,19 @@ def _build_redis_factory(api_key, cfg):
346407 'events' : RedisEventsStorage (redis_adapter , sdk_metadata ),
347408 'telemetry' : RedisTelemetryStorage (redis_adapter , sdk_metadata )
348409 }
410+ recorder = PipelinedRecorder (
411+ redis_adapter .pipeline ,
412+ ImpressionsManager (cfg ['impressionsMode' ], False ,
413+ _wrap_impression_listener (cfg ['impressionListener' ], sdk_metadata )),
414+ storages ['telemetry' ],
415+ storages ['events' ],
416+ storages ['impressions' ],
417+ )
349418 return SplitFactory (
350419 api_key ,
351420 storages ,
352421 cfg ['labelsEnabled' ],
353- ImpressionsManager (storages ['impressions' ].put , cfg ['impressionsMode' ], False ,
354- _wrap_impression_listener (cfg ['impressionListener' ], sdk_metadata ))
422+ recorder ,
355423 )
356424
357425
@@ -366,12 +434,22 @@ def _build_uwsgi_factory(api_key, cfg):
366434 'events' : UWSGIEventStorage (uwsgi_adapter ),
367435 'telemetry' : UWSGITelemetryStorage (uwsgi_adapter )
368436 }
437+ recorder = StandardRecorder (
438+ ImpressionsManager (cfg ['impressionsMode' ], True ,
439+ _wrap_impression_listener (cfg ['impressionListener' ], sdk_metadata )),
440+ storages ['telemetry' ],
441+ storages ['events' ],
442+ storages ['impressions' ],
443+ )
444+ _LOGGER .warning (
445+ "Beware: uwsgi-cache based operation mode is soon to be deprecated. Please consider " +
446+ "redis if you need a centralized point of syncrhonization, or in-memory (with preforking " +
447+ "support enabled) if running uwsgi with a master and several http workers)" )
369448 return SplitFactory (
370449 api_key ,
371450 storages ,
372451 cfg ['labelsEnabled' ],
373- ImpressionsManager (storages ['impressions' ].put , cfg ['impressionsMode' ], True ,
374- _wrap_impression_listener (cfg ['impressionListener' ], sdk_metadata ))
452+ recorder ,
375453 )
376454
377455
@@ -401,12 +479,17 @@ def _build_localhost_factory(cfg):
401479 synchronizer = LocalhostSynchronizer (synchronizers , tasks )
402480 manager = Manager (ready_event , synchronizer , None , False )
403481 manager .start ()
404-
482+ recorder = StandardRecorder (
483+ ImpressionsManager (cfg ['impressionsMode' ], True , None ),
484+ storages ['telemetry' ],
485+ storages ['events' ],
486+ storages ['impressions' ],
487+ )
405488 return SplitFactory (
406489 'localhost' ,
407490 storages ,
408491 False ,
409- ImpressionsManager ( storages [ 'impressions' ]. put , cfg [ 'impressionsMode' ], True , None ) ,
492+ recorder ,
410493 manager ,
411494 ready_event
412495 )
0 commit comments