Skip to content

Commit 08d49b2

Browse files
committed
Telemetry implementation for redis
1 parent 83a12da commit 08d49b2

File tree

5 files changed

+119
-6
lines changed

5 files changed

+119
-6
lines changed

splitio/client/factory.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,20 @@ def _start_status_updater(self):
152152
ready_updater.start()
153153
else:
154154
self._status = Status.READY
155+
ready_updater = threading.Thread(target=self._update_redis_telemetry_config,
156+
name='SDKRedisTelemetryConfig')
157+
ready_updater.setDaemon(True)
158+
ready_updater.start()
159+
160+
161+
def _update_redis_telemetry_config(self):
162+
"""Push Config Telemetry into storage."""
163+
self._telemetry_init_producer.record_ready_time(get_current_epoch_time() - self._ready_time)
164+
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
165+
self._telemetry_init_producer.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
166+
config_post_thread = threading.Thread(target=self._telemetry_api.record_init(self._telemetry_init_consumer.get_config_stats()), name="PostConfigData")
167+
config_post_thread.setDaemon(True)
168+
config_post_thread.start()
155169

156170
def _update_status_when_ready(self):
157171
"""Wait until the sdk is ready and update the status."""
@@ -469,14 +483,14 @@ def _build_redis_factory(api_key, cfg):
469483

470484
synchronizers = SplitSynchronizers(None, None, None, None,
471485
impressions_count_sync,
472-
None,
486+
TelemetrySynchronizer(telemetry_consumer, storages['splits'], storages['segments'], redis_adapter),
473487
unique_keys_synchronizer,
474488
clear_filter_sync
475489
)
476490

477491
tasks = SplitTasks(None, None, None, None,
478492
impressions_count_task,
479-
None,
493+
TelemetrySyncTask(synchronizers.telemetry_sync.synchronize_stats, cfg['metricsRefreshRate']),
480494
unique_keys_task,
481495
clear_filter_task
482496
)
@@ -494,6 +508,8 @@ def _build_redis_factory(api_key, cfg):
494508
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer")
495509
initialization_thread.setDaemon(True)
496510
initialization_thread.start()
511+
512+
telemetry_producer.get_telemetry_init_producer().record_config(cfg, {})
497513

498514
return SplitFactory(
499515
api_key,
@@ -502,6 +518,7 @@ def _build_redis_factory(api_key, cfg):
502518
recorder,
503519
manager,
504520
sdk_ready_flag=None,
521+
telemetry_api=redis_adapter,
505522
telemetry_producer=telemetry_producer,
506523
telemetry_init_consumer=telemetry_consumer.get_telemetry_init_consumer()
507524
)

splitio/storage/adapters/redis.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
"""Redis client wrapper with prefix support."""
22
from builtins import str
3+
import socket
4+
import logging
5+
from splitio.version import __version__
36

47
try:
58
from redis import StrictRedis
@@ -14,6 +17,10 @@ def missing_redis_dependencies(*_, **__):
1417
)
1518
StrictRedis = Sentinel = missing_redis_dependencies
1619

20+
_LOGGER = logging.getLogger(__name__)
21+
TELEMETRY_CONFIG_KEY = 'SPLITIO.telemetry.init'
22+
TELEMETRY_EXCEPTIONS_KEY = 'SPLITIO.telemetry.exceptions'
23+
TELEMETRY_LATENCIES_KEY = 'SPLITIO.telemetry.latencies'
1724

1825
class RedisAdapterException(Exception):
1926
"""Exception to be thrown when a redis command fails with an exception."""
@@ -241,6 +248,13 @@ def hget(self, name, key):
241248
except RedisError as exc:
242249
raise RedisAdapterException('Error executing hget operation') from exc
243250

251+
def hincrby(self, name, key, amount=1):
252+
"""Mimic original redis function but using user custom prefix."""
253+
try:
254+
return self._decorated.hincrby(self._prefix_helper.add_prefix(name), key, amount)
255+
except RedisError as exc:
256+
raise RedisAdapterException('Error executing hincrby operation') from exc
257+
244258
def incr(self, name, amount=1):
245259
"""Mimic original redis function but using user custom prefix."""
246260
try:
@@ -297,6 +311,54 @@ def pipeline(self):
297311
except RedisError as exc:
298312
raise RedisAdapterException('Error executing ttl operation') from exc
299313

314+
def record_init(self, *values):
315+
try:
316+
host_name, host_ip = self._get_host_info()
317+
return self.hset(TELEMETRY_CONFIG_KEY, 'python-' + __version__ + '/' + host_name+ '/' + host_ip, str(*values))
318+
except RedisError as exc:
319+
raise RedisAdapterException('Error pushing telemetry config operation') from exc
320+
321+
def _get_host_info(self):
322+
host_name = 'Unknown'
323+
host_ip = 'Unknown'
324+
try:
325+
host_name = socket.gethostname()
326+
host_ip = socket.gethostbyname(socket.gethostname())
327+
except:
328+
_LOGGER.debug("Could not get hostname or ip")
329+
pass
330+
return host_name, host_ip
331+
332+
def record_stats(self, values):
333+
try:
334+
host_name, host_ip = self._get_host_info()
335+
for item in values['mL']:
336+
bucket_number = 0
337+
for bucket in values['mL'][item]:
338+
if bucket > 0:
339+
self.hincrby(TELEMETRY_LATENCIES_KEY, 'python-' + __version__ + '/' + host_name+ '/' + host_ip + '/' +
340+
self._get_method_name(item) + '/' + str(bucket_number), bucket)
341+
bucket_number = bucket_number + 0
342+
for item in values['mE']:
343+
if values['mE'][item] > 0:
344+
self.hincrby(TELEMETRY_EXCEPTIONS_KEY, 'python-' + __version__ + '/' + host_name+ '/' + host_ip + '/' +
345+
self._get_method_name(item), values['mE'][item])
346+
except RedisError as exc:
347+
raise RedisAdapterException('Error pushing telemetry evaluation operation') from exc
348+
349+
def _get_method_name(self, item):
350+
if item == 't':
351+
return 'treatment'
352+
elif item == 'ts':
353+
return 'treatments'
354+
elif item == 'tc':
355+
return 'treatment_with_config'
356+
elif item == 'tcs':
357+
return 'treatments_with_config'
358+
elif item == 'tr':
359+
return 'track'
360+
else:
361+
return ''
300362

301363
class RedisPipelineAdapter(object):
302364
"""

splitio/storage/redis.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,23 @@ def get_split_names(self):
180180
_LOGGER.debug('Error: ', exc_info=True)
181181
return []
182182

183+
def get_splits_count(self):
184+
"""
185+
Return splits count.
186+
187+
:rtype: int
188+
"""
189+
return 0
190+
191+
def get_all_splits(self):
192+
"""
193+
Return all the splits in cache.
194+
195+
:return: 0
196+
:rtype: int
197+
"""
198+
return 0
199+
183200
def get_all_splits(self):
184201
"""
185202
Return all the splits in cache.
@@ -345,6 +362,22 @@ def segment_contains(self, segment_name, key):
345362
_LOGGER.debug('Error: ', exc_info=True)
346363
return None
347364

365+
def get_segments_count(self):
366+
"""
367+
Return segment count.
368+
369+
:return: 0
370+
:rtype: int
371+
"""
372+
return 0
373+
374+
def get_segments_keys_count(self):
375+
"""
376+
Return segment count.
377+
378+
:rtype: int
379+
"""
380+
return 0
348381

349382
class RedisImpressionsStorage(ImpressionStorage, ImpressionPipelinedStorage):
350383
"""Redis based event storage class."""

splitio/sync/synchronizer.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,13 +400,16 @@ def __init__(self, split_synchronizers, split_tasks):
400400
:type split_tasks: splitio.sync.synchronizer.SplitTasks
401401
"""
402402
self._split_synchronizers = split_synchronizers
403-
self._tasks = []
403+
self._tasks = [split_tasks.telemetry_task]
404404
if split_tasks.impressions_count_task is not None:
405405
self._tasks.append(split_tasks.impressions_count_task)
406406
if split_tasks.unique_keys_task is not None:
407407
self._tasks.append(split_tasks.unique_keys_task)
408408
if split_tasks.clear_filter_task is not None:
409409
self._tasks.append(split_tasks.clear_filter_task)
410+
self._periodic_data_recording_tasks = [
411+
split_tasks.telemetry_task
412+
]
410413

411414
def sync_all(self):
412415
"""

splitio/sync/telemetry.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
import json
2-
import logging
3-
_LOGGER = logging.getLogger(__name__)
42

53
from splitio.api.telemetry import TelemetryAPI
64
from splitio.engine.telemetry import TelemetryStorageConsumer
@@ -49,4 +47,4 @@ def _build_stats(self):
4947
}
5048
merged_dict.update(self._telemetry_runtime_consumer.pop_formatted_stats())
5149
merged_dict.update(self._telemetry_evaluation_consumer.pop_formatted_stats())
52-
return merged_dict
50+
return merged_dict

0 commit comments

Comments
 (0)