3232from splitio .api .impressions import ImpressionsAPI
3333from splitio .api .events import EventsAPI
3434from splitio .api .auth import AuthAPI
35- from splitio .api .telemetry import TelemetryAPI , LocalhostTelemetryAPI
35+ from splitio .api .telemetry import TelemetryAPI
3636from splitio .util .time import get_current_epoch_time_ms
3737
3838# Tasks
5252from splitio .sync .impression import ImpressionSynchronizer , ImpressionsCountSynchronizer
5353from splitio .sync .event import EventSynchronizer
5454from splitio .sync .unique_keys import UniqueKeysSynchronizer , ClearFilterSynchronizer
55- from splitio .sync .telemetry import TelemetrySynchronizer
55+ from splitio .sync .telemetry import TelemetrySynchronizer , InMemoryTelemetrySubmitter , LocalhostTelemetrySubmitter , RedisTelemetrySubmitter
5656
5757
5858# Recorder
@@ -96,8 +96,7 @@ def __init__( # pylint: disable=too-many-arguments
9696 sync_manager = None ,
9797 sdk_ready_flag = None ,
9898 telemetry_producer = None ,
99- telemetry_init_consumer = None ,
100- telemetry_api = None ,
99+ telemetry_submitter = None ,
101100 preforked_initialization = False ,
102101 ):
103102 """
@@ -129,8 +128,7 @@ def __init__( # pylint: disable=too-many-arguments
129128 self ._telemetry_init_producer = None
130129 self ._telemetry_evaluation_producer = telemetry_producer .get_telemetry_evaluation_producer ()
131130 self ._telemetry_init_producer = telemetry_producer .get_telemetry_init_producer ()
132- self ._telemetry_init_consumer = telemetry_init_consumer
133- self ._telemetry_api = telemetry_api
131+ self ._telemetry_submitter = telemetry_submitter
134132 self ._ready_time = get_current_epoch_time_ms ()
135133 self ._start_status_updater ()
136134
@@ -153,17 +151,6 @@ def _start_status_updater(self):
153151 ready_updater .start ()
154152 else :
155153 self ._status = Status .READY
156- init_updater = threading .Thread (target = self ._update_redis_init ,
157- name = 'RedisInitUpdater' )
158- init_updater .setDaemon (True )
159- init_updater .start ()
160-
161- def _update_redis_init (self ):
162- """Push Config Telemetry into redis storage"""
163- redundant_factory_count , active_factory_count = _get_active_and_redundant_count ()
164- self ._storages ['telemetry' ].record_active_and_redundant_factories (active_factory_count , redundant_factory_count )
165- self ._storages ['telemetry' ].push_config_stats ()
166-
167154
168155 def _update_status_when_ready (self ):
169156 """Wait until the sdk is ready and update the status."""
@@ -174,7 +161,7 @@ def _update_status_when_ready(self):
174161 redundant_factory_count , active_factory_count = _get_active_and_redundant_count ()
175162 self ._telemetry_init_producer .record_active_and_redundant_factories (active_factory_count , redundant_factory_count )
176163
177- config_post_thread = threading .Thread (target = self ._telemetry_api . record_init ( self . _telemetry_init_consumer . get_config_stats () ), name = "PostConfigData" )
164+ config_post_thread = threading .Thread (target = self ._telemetry_submitter . synchronize_config ( ), name = "PostConfigData" )
178165 config_post_thread .setDaemon (True )
179166 config_post_thread .start ()
180167
@@ -371,6 +358,8 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
371358 'events' : InMemoryEventStorage (cfg ['eventsQueueSize' ], telemetry_runtime_producer ),
372359 }
373360
361+ telemetry_submitter = InMemoryTelemetrySubmitter (telemetry_consumer , storages ['splits' ], storages ['segments' ], apis ['telemetry' ])
362+
374363 unique_keys_synchronizer , clear_filter_sync , unique_keys_task , \
375364 clear_filter_task , impressions_count_sync , impressions_count_task , \
376365 imp_strategy = set_classes ('MEMORY' , cfg ['impressionsMode' ], apis )
@@ -386,7 +375,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
386375 cfg ['impressionsBulkSize' ]),
387376 EventSynchronizer (apis ['events' ], storages ['events' ], cfg ['eventsBulkSize' ]),
388377 impressions_count_sync ,
389- TelemetrySynchronizer (telemetry_consumer , storages [ 'splits' ], storages [ 'segments' ], apis [ 'telemetry' ] ),
378+ TelemetrySynchronizer (telemetry_submitter ),
390379 unique_keys_synchronizer ,
391380 clear_filter_sync ,
392381 )
@@ -433,7 +422,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
433422 synchronizer .sync_all (max_retry_attempts = _MAX_RETRY_SYNC_ALL )
434423 synchronizer ._split_synchronizers ._segment_sync .shutdown ()
435424 return SplitFactory (api_key , storages , cfg ['labelsEnabled' ],
436- recorder , manager , None , telemetry_producer , telemetry_consumer . get_telemetry_init_consumer (), apis ['telemetry' ], preforked_initialization = preforked_initialization )
425+ recorder , manager , None , telemetry_producer , apis ['telemetry' ], preforked_initialization = preforked_initialization )
437426
438427 initialization_thread = threading .Thread (target = manager .start , name = "SDKInitializer" )
439428 initialization_thread .setDaemon (True )
@@ -442,7 +431,9 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
442431 telemetry_producer .get_telemetry_init_producer ().record_config (cfg , extra_cfg )
443432
444433 return SplitFactory (api_key , storages , cfg ['labelsEnabled' ],
445- recorder , manager , sdk_ready_flag , telemetry_producer , telemetry_consumer .get_telemetry_init_consumer (), apis ['telemetry' ])
434+ recorder , manager , sdk_ready_flag ,
435+ telemetry_producer ,
436+ telemetry_submitter )
446437
447438def _build_redis_factory (api_key , cfg ):
448439 """Build and return a split factory with redis-based storage."""
@@ -458,8 +449,8 @@ def _build_redis_factory(api_key, cfg):
458449 'telemetry' : RedisTelemetryStorage (redis_adapter , sdk_metadata )
459450 }
460451 telemetry_producer = TelemetryStorageProducer (storages ['telemetry' ])
461- telemetry_consumer = TelemetryStorageConsumer (storages ['telemetry' ])
462452 telemetry_runtime_producer = telemetry_producer .get_telemetry_runtime_producer ()
453+ telemetry_submitter = RedisTelemetrySubmitter (storages ['telemetry' ])
463454
464455 data_sampling = cfg .get ('dataSampling' , DEFAULT_DATA_SAMPLING )
465456 if data_sampling < _MIN_DEFAULT_DATA_SAMPLING_ALLOWED :
@@ -508,23 +499,26 @@ def _build_redis_factory(api_key, cfg):
508499
509500 telemetry_producer .get_telemetry_init_producer ().record_config (cfg , {})
510501
511- return SplitFactory (
502+ split_factory = SplitFactory (
512503 api_key ,
513504 storages ,
514505 cfg ['labelsEnabled' ],
515506 recorder ,
516507 manager ,
517508 sdk_ready_flag = None ,
518- telemetry_api = redis_adapter ,
519509 telemetry_producer = telemetry_producer ,
520- telemetry_init_consumer = telemetry_consumer .get_telemetry_init_consumer ()
521510 )
511+ redundant_factory_count , active_factory_count = _get_active_and_redundant_count ()
512+ storages ['telemetry' ].record_active_and_redundant_factories (active_factory_count , redundant_factory_count )
513+ telemetry_submitter .synchronize_config ()
514+
515+ return split_factory
516+
522517
523518def _build_localhost_factory (cfg ):
524519 """Build and return a localhost factory for testing/development purposes."""
525520 telemetry_storage = LocalhostTelemetryStorage ()
526521 telemetry_producer = TelemetryStorageProducer (telemetry_storage )
527- telemetry_consumer = TelemetryStorageConsumer (telemetry_storage )
528522 telemetry_runtime_producer = telemetry_producer .get_telemetry_runtime_producer ()
529523 telemetry_evaluation_producer = telemetry_producer .get_telemetry_evaluation_producer ()
530524
@@ -566,51 +560,49 @@ def _build_localhost_factory(cfg):
566560 manager ,
567561 ready_event ,
568562 telemetry_producer = telemetry_producer ,
569- telemetry_init_consumer = telemetry_consumer .get_telemetry_init_consumer (),
570- telemetry_api = LocalhostTelemetryAPI ()
563+ telemetry_submitter = LocalhostTelemetrySubmitter (),
571564 )
572565
573566def get_factory (api_key , ** kwargs ):
574567 """Build and return the appropriate factory."""
575- try :
576- _INSTANTIATED_FACTORIES_LOCK .acquire ()
577- if _INSTANTIATED_FACTORIES :
578- if api_key in _INSTANTIATED_FACTORIES :
579- _LOGGER .warning (
580- "factory instantiation: You already have %d %s with this API Key. "
581- "We recommend keeping only one instance of the factory at all times "
582- "(Singleton pattern) and reusing it throughout your application." ,
583- _INSTANTIATED_FACTORIES [api_key ],
584- 'factory' if _INSTANTIATED_FACTORIES [api_key ] == 1 else 'factories'
585- )
586- else :
587- _LOGGER .warning (
588- "factory instantiation: You already have an instance of the Split factory. "
589- "Make sure you definitely want this additional instance. "
590- "We recommend keeping only one instance of the factory at all times "
591- "(Singleton pattern) and reusing it throughout your application."
592- )
593-
594- config = sanitize_config (api_key , kwargs .get ('config' , {}))
595-
596- if config ['operationMode' ] == 'localhost-standalone' :
597- return _build_localhost_factory (config )
598-
599- if config ['operationMode' ] == 'redis-consumer' :
600- return _build_redis_factory (api_key , config )
601-
602- return _build_in_memory_factory (
603- api_key ,
604- config ,
605- kwargs .get ('sdk_api_base_url' ),
606- kwargs .get ('events_api_base_url' ),
607- kwargs .get ('auth_api_base_url' ),
608- kwargs .get ('streaming_api_base_url' ),
609- kwargs .get ('telemetry_api_base_url' )
610- )
611- finally :
612- _INSTANTIATED_FACTORIES .update ([api_key ])
613- _INSTANTIATED_FACTORIES_LOCK .release ()
568+ _INSTANTIATED_FACTORIES_LOCK .acquire ()
569+ if _INSTANTIATED_FACTORIES :
570+ if api_key in _INSTANTIATED_FACTORIES :
571+ _LOGGER .warning (
572+ "factory instantiation: You already have %d %s with this API Key. "
573+ "We recommend keeping only one instance of the factory at all times "
574+ "(Singleton pattern) and reusing it throughout your application." ,
575+ _INSTANTIATED_FACTORIES [api_key ],
576+ 'factory' if _INSTANTIATED_FACTORIES [api_key ] == 1 else 'factories'
577+ )
578+ else :
579+ _LOGGER .warning (
580+ "factory instantiation: You already have an instance of the Split factory. "
581+ "Make sure you definitely want this additional instance. "
582+ "We recommend keeping only one instance of the factory at all times "
583+ "(Singleton pattern) and reusing it throughout your application."
584+ )
585+
586+ _INSTANTIATED_FACTORIES .update ([api_key ])
587+ _INSTANTIATED_FACTORIES_LOCK .release ()
588+
589+ config = sanitize_config (api_key , kwargs .get ('config' , {}))
590+
591+ if config ['operationMode' ] == 'localhost-standalone' :
592+ split_factory = _build_localhost_factory (config )
593+ elif config ['operationMode' ] == 'redis-consumer' :
594+ split_factory = _build_redis_factory (api_key , config )
595+ else :
596+ split_factory = _build_in_memory_factory (
597+ api_key ,
598+ config ,
599+ kwargs .get ('sdk_api_base_url' ),
600+ kwargs .get ('events_api_base_url' ),
601+ kwargs .get ('auth_api_base_url' ),
602+ kwargs .get ('streaming_api_base_url' ),
603+ kwargs .get ('telemetry_api_base_url' ))
604+
605+ return split_factory
614606
615607def _get_active_and_redundant_count ():
616608 redundant_factory_count = 0
0 commit comments