Skip to content

Commit fff2d44

Browse files
committed
adding tags to config telemetry to capture uwsgi worker
1 parent 3670ace commit fff2d44

File tree

5 files changed

+83
-15
lines changed

5 files changed

+83
-15
lines changed

splitio/api/telemetry.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
"""Impressions API module."""
22
import logging
3-
import time
43

54
from splitio.api import APIException
65
from splitio.api.client import HttpClientException

splitio/client/factory.py

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""A module for Split.io Factories."""
22
import logging
33
import threading
4+
import sys
45
from collections import Counter
56

67
from enum import Enum
@@ -96,6 +97,7 @@ def __init__( # pylint: disable=too-many-arguments
9697
sync_manager=None,
9798
sdk_ready_flag=None,
9899
telemetry_producer=None,
100+
telemetry_init_producer=None,
99101
telemetry_submitter=None,
100102
preforked_initialization=False,
101103
):
@@ -124,10 +126,8 @@ def __init__( # pylint: disable=too-many-arguments
124126
self._sdk_internal_ready_flag = sdk_ready_flag
125127
self._recorder = recorder
126128
self._preforked_initialization = preforked_initialization
127-
self._telemetry_evaluation_producer = None
128-
self._telemetry_init_producer = None
129129
self._telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()
130-
self._telemetry_init_producer = telemetry_producer.get_telemetry_init_producer()
130+
self._telemetry_init_producer = telemetry_init_producer
131131
self._telemetry_submitter = telemetry_submitter
132132
self._ready_time = get_current_epoch_time_ms()
133133
self._start_status_updater()
@@ -331,7 +331,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
331331
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
332332
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
333333
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()
334-
334+
telemetry_init_producer = telemetry_producer.get_telemetry_init_producer()
335335

336336
http_client = HttpClient(
337337
sdk_url=sdk_url,
@@ -418,21 +418,25 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
418418
telemetry_evaluation_producer
419419
)
420420

421+
telemetry_init_producer.record_config(cfg, extra_cfg)
422+
if int(_get_uwsgi_worker_id()) > -1:
423+
telemetry_init_producer.add_config_tag("initilization:uwsgi")
424+
telemetry_init_producer.add_config_tag("uwsgi_worker:#" + _get_uwsgi_worker_id())
425+
421426
if preforked_initialization:
422427
synchronizer.sync_all(max_retry_attempts=_MAX_RETRY_SYNC_ALL)
423428
synchronizer._split_synchronizers._segment_sync.shutdown()
429+
424430
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
425-
recorder, manager, None, telemetry_producer, apis['telemetry'], preforked_initialization=preforked_initialization)
431+
recorder, manager, None, telemetry_producer, telemetry_init_producer, telemetry_submitter, preforked_initialization=preforked_initialization)
426432

427433
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer")
428434
initialization_thread.setDaemon(True)
429435
initialization_thread.start()
430436

431-
telemetry_producer.get_telemetry_init_producer().record_config(cfg, extra_cfg)
432-
433437
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
434438
recorder, manager, sdk_ready_flag,
435-
telemetry_producer,
439+
telemetry_producer, telemetry_init_producer,
436440
telemetry_submitter)
437441

438442
def _build_redis_factory(api_key, cfg):
@@ -450,6 +454,7 @@ def _build_redis_factory(api_key, cfg):
450454
}
451455
telemetry_producer = TelemetryStorageProducer(storages['telemetry'])
452456
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
457+
telemetry_init_producer = telemetry_producer.get_telemetry_init_producer()
453458
telemetry_submitter = RedisTelemetrySubmitter(storages['telemetry'])
454459

455460
data_sampling = cfg.get('dataSampling', DEFAULT_DATA_SAMPLING)
@@ -497,7 +502,10 @@ def _build_redis_factory(api_key, cfg):
497502
initialization_thread.setDaemon(True)
498503
initialization_thread.start()
499504

500-
telemetry_producer.get_telemetry_init_producer().record_config(cfg, {})
505+
telemetry_init_producer.record_config(cfg, {})
506+
if int(_get_uwsgi_worker_id()) > -1:
507+
telemetry_init_producer.add_config_tag("initilization:uwsgi")
508+
telemetry_init_producer.add_config_tag("uwsgi_worker:#" + _get_uwsgi_worker_id())
501509

502510
split_factory = SplitFactory(
503511
api_key,
@@ -507,6 +515,7 @@ def _build_redis_factory(api_key, cfg):
507515
manager,
508516
sdk_ready_flag=None,
509517
telemetry_producer=telemetry_producer,
518+
telemetry_init_producer=telemetry_init_producer
510519
)
511520
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
512521
storages['telemetry'].record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
@@ -560,6 +569,7 @@ def _build_localhost_factory(cfg):
560569
manager,
561570
ready_event,
562571
telemetry_producer=telemetry_producer,
572+
telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(),
563573
telemetry_submitter=LocalhostTelemetrySubmitter(),
564574
)
565575

@@ -613,3 +623,12 @@ def _get_active_and_redundant_count():
613623
active_factory_count += _INSTANTIATED_FACTORIES[item]
614624
_INSTANTIATED_FACTORIES_LOCK.release()
615625
return redundant_factory_count, active_factory_count
626+
627+
def _get_uwsgi_worker_id():
628+
try:
629+
import uwsgi
630+
_LOGGER.debug("uwsgi lib detected")
631+
return str(uwsgi.worker_id())
632+
except ModuleNotFoundError:
633+
pass
634+
return "-1"

splitio/engine/telemetry.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ def record_not_ready_usage(self):
5151
def record_active_and_redundant_factories(self, active_factory_count, redundant_factory_count):
5252
self._telemetry_storage.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
5353

54+
def add_config_tag(self, tag):
55+
"""Record tag string."""
56+
self._telemetry_storage.add_config_tag(tag)
57+
5458
class TelemetryEvaluationProducer(object):
5559
"""Telemetry evaluation producer class."""
5660

@@ -150,12 +154,19 @@ def get_not_ready_usage(self):
150154
return self._telemetry_storage.get_not_ready_usage()
151155

152156
def get_config_stats(self):
153-
"""Get none-ready usage."""
154-
return self._telemetry_storage.get_config_stats()
157+
"""Get config stats."""
158+
config_stats = self._telemetry_storage.get_config_stats()
159+
config_stats.update({'t': self.pop_config_tags()})
160+
return config_stats
155161

156162
def get_config_stats_to_json(self):
163+
"""Get config stats in json."""
157164
return json.dumps(self._telemetry_storage.get_config_stats())
158165

166+
def pop_config_tags(self):
167+
"""Get and reset tags."""
168+
return self._telemetry_storage.pop_config_tags()
169+
159170
class TelemetryEvaluationConsumer(object):
160171
"""Telemetry evaluation consumer class."""
161172

@@ -215,7 +226,7 @@ def get_last_synchronization(self):
215226
return self._telemetry_storage.get_last_synchronization()['lastSynchronizations']
216227

217228
def pop_tags(self):
218-
"""Get and reset http errors."""
229+
"""Get and reset tags."""
219230
return self._telemetry_storage.pop_tags()
220231

221232
def pop_http_errors(self):

splitio/storage/inmemmory.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,7 @@ def __init__(self):
469469
"""Constructor"""
470470
self._lock = threading.RLock()
471471
self._reset_tags()
472+
self._reset_config_tags()
472473
self._method_exceptions = MethodExceptions()
473474
self._last_synchronization = LastSynchronization()
474475
self._counters = TelemetryCounters()
@@ -482,6 +483,10 @@ def _reset_tags(self):
482483
with self._lock:
483484
self._tags = []
484485

486+
def _reset_config_tags(self):
487+
with self._lock:
488+
self._config_tags = []
489+
485490
def record_config(self, config, extra_config):
486491
"""Record configurations."""
487492
self._tel_config.record_config(config, extra_config)
@@ -500,6 +505,12 @@ def add_tag(self, tag):
500505
if len(self._tags) < MAX_TAGS:
501506
self._tags.append(tag)
502507

508+
def add_config_tag(self, tag):
509+
"""Record tag string."""
510+
with self._lock:
511+
if len(self._config_tags) < MAX_TAGS:
512+
self._config_tags.append(tag)
513+
503514
def record_bur_time_out(self):
504515
"""Record block until ready timeout."""
505516
self._tel_config.record_bur_time_out()
@@ -575,6 +586,13 @@ def pop_tags(self):
575586
self._reset_tags()
576587
return tags
577588

589+
def pop_config_tags(self):
590+
"""Get and reset tags."""
591+
with self._lock:
592+
tags = self._config_tags
593+
self._reset_config_tags()
594+
return tags
595+
578596
def pop_latencies(self):
579597
"""Get and reset eval latencies."""
580598
return self._method_latencies.pop_all()

splitio/storage/redis.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Redis storage module."""
22
import json
33
import logging
4+
import threading
45

56
from splitio.models.impressions import Impression
67
from splitio.models import splits, segments
@@ -12,7 +13,7 @@
1213

1314

1415
_LOGGER = logging.getLogger(__name__)
15-
16+
MAX_TAGS = 10
1617

1718
class RedisSplitStorage(SplitStorage):
1819
"""Redis-based storage for splits."""
@@ -594,13 +595,25 @@ def __init__(self, redis_client, sdk_metadata):
594595
:param sdk_metadata: SDK & Machine information.
595596
:type sdk_metadata: splitio.client.util.SdkMetadata
596597
"""
598+
self._lock = threading.RLock()
599+
self._reset_config_tags()
597600
self._redis_client = redis_client
598601
self._sdk_metadata = sdk_metadata
599602
self._method_latencies = MethodLatencies()
600603
self._method_exceptions = MethodExceptions()
601604
self._tel_config = TelemetryConfig()
602605
self._make_pipe = redis_client.pipeline
603606

607+
def _reset_config_tags(self):
608+
with self._lock:
609+
self._config_tags = []
610+
611+
def add_config_tag(self, tag):
612+
"""Record tag string."""
613+
with self._lock:
614+
if len(self._config_tags) < MAX_TAGS:
615+
self._config_tags.append(tag)
616+
604617
def record_config(self, config, extra_config):
605618
"""
606619
initilize telemetry objects
@@ -610,6 +623,13 @@ def record_config(self, config, extra_config):
610623
"""
611624
self._tel_config.record_config(config, extra_config)
612625

626+
def pop_config_tags(self):
627+
"""Get and reset tags."""
628+
with self._lock:
629+
tags = self._config_tags
630+
self._reset_config_tags()
631+
return tags
632+
613633
def push_config_stats(self):
614634
"""push config stats to redis."""
615635
self._redis_client.hset(self._TELEMETRY_CONFIG_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip, str(self._format_config_stats()))
@@ -621,7 +641,8 @@ def _format_config_stats(self):
621641
'aF': config_stats['aF'],
622642
'rF': config_stats['rF'],
623643
'sT': config_stats['sT'],
624-
'oM': config_stats['oM']
644+
'oM': config_stats['oM'],
645+
't': self.pop_config_tags()
625646
})
626647

627648
def record_active_and_redundant_factories(self, active_factory_count, redundant_factory_count):

0 commit comments

Comments
 (0)