|
16 | 16 | from splitio.engine.impressions.manager import Counter as ImpressionsCounter |
17 | 17 | from splitio.engine.impressions.strategies import StrategyNoneMode, StrategyDebugMode, StrategyOptimizedMode |
18 | 18 | from splitio.engine.impressions.adapters import InMemorySenderAdapter, RedisSenderAdapter |
19 | | -from splitio.engine.telemetry import TelemetryStorageProducer, TelemetryStorageConsumer, RedisTelemetryInitProducer, RedisTelemetryStorageProducer |
| 19 | +from splitio.engine.telemetry import TelemetryStorageProducer, TelemetryStorageConsumer |
20 | 20 |
|
21 | 21 | # Storage |
22 | 22 | from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \ |
@@ -160,8 +160,9 @@ def _start_status_updater(self): |
160 | 160 | def _update_redis_init(self): |
161 | 161 | """Push Config Telemetry into redis storage""" |
162 | 162 | redundant_factory_count, active_factory_count = _get_active_and_redundant_count() |
163 | | - self._telemetry_init_producer.record_active_and_redundant_factories(active_factory_count, redundant_factory_count) |
164 | | - self._telemetry_init_producer.push_config() |
| 163 | + self._storages['telemetry'].record_active_and_redundant_factories(active_factory_count, redundant_factory_count) |
| 164 | + self._storages['telemetry'].push_config_stats() |
| 165 | + |
165 | 166 |
|
166 | 167 | def _update_status_when_ready(self): |
167 | 168 | """Wait until the sdk is ready and update the status.""" |
@@ -448,16 +449,16 @@ def _build_redis_factory(api_key, cfg): |
448 | 449 | redis_adapter = redis.build(cfg) |
449 | 450 | cache_enabled = cfg.get('redisLocalCacheEnabled', False) |
450 | 451 | cache_ttl = cfg.get('redisLocalCacheTTL', 5) |
451 | | - telemetry_storage = RedisTelemetryStorage(redis_adapter, sdk_metadata) |
452 | | - telemetry_producer = RedisTelemetryStorageProducer(telemetry_storage) |
453 | | - telemetry_consumer = TelemetryStorageConsumer(telemetry_storage) |
454 | | - telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() |
455 | 452 | storages = { |
456 | 453 | 'splits': RedisSplitStorage(redis_adapter, cache_enabled, cache_ttl), |
457 | 454 | 'segments': RedisSegmentStorage(redis_adapter), |
458 | 455 | 'impressions': RedisImpressionsStorage(redis_adapter, sdk_metadata), |
459 | | - 'events': RedisEventsStorage(redis_adapter, sdk_metadata) |
| 456 | + 'events': RedisEventsStorage(redis_adapter, sdk_metadata), |
| 457 | + 'telemetry': RedisTelemetryStorage(redis_adapter, sdk_metadata) |
460 | 458 | } |
| 459 | + telemetry_producer = TelemetryStorageProducer(storages['telemetry']) |
| 460 | + telemetry_consumer = TelemetryStorageConsumer(storages['telemetry']) |
| 461 | + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() |
461 | 462 |
|
462 | 463 | data_sampling = cfg.get('dataSampling', DEFAULT_DATA_SAMPLING) |
463 | 464 | if data_sampling < _MIN_DEFAULT_DATA_SAMPLING_ALLOWED: |
@@ -495,7 +496,7 @@ def _build_redis_factory(api_key, cfg): |
495 | 496 | imp_manager, |
496 | 497 | storages['events'], |
497 | 498 | storages['impressions'], |
498 | | - telemetry_storage, |
| 499 | + storages['telemetry'], |
499 | 500 | data_sampling, |
500 | 501 | ) |
501 | 502 |
|
|
0 commit comments