Skip to content

Commit fb0d244

Browse files
committed
refactor redis telemetry storage
1 parent 1d50b3c commit fb0d244

File tree

8 files changed

+208
-132
lines changed

8 files changed

+208
-132
lines changed

splitio/client/client.py

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""A module for Split.io SDK API clients."""
22
import logging
3-
import time
3+
4+
from splitio.client.util import get_method_constant
45
from splitio.engine.evaluator import Evaluator, CONTROL
56
from splitio.engine.splitters import Splitter
67
from splitio.models.impressions import Impression, Label
@@ -123,7 +124,7 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
123124
except Exception: # pylint: disable=broad-except
124125
_LOGGER.error('Error getting treatment for feature')
125126
_LOGGER.debug('Error: ', exc_info=True)
126-
self._telemetry_evaluation_producer.record_exception(self._get_method_constant(method_name[4:]))
127+
self._telemetry_evaluation_producer.record_exception(get_method_constant(method_name[4:]))
127128
try:
128129
impression = self._build_impression(
129130
matching_key,
@@ -200,18 +201,18 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
200201
self._record_stats(
201202
[(i, attributes) for i in bulk_impressions],
202203
start,
203-
metric_name
204+
metric_name,
205+
method_name
204206
)
205207
except Exception: # pylint: disable=broad-except
206208
_LOGGER.error('%s: An exception when trying to store '
207209
'impressions.' % method_name)
208210
_LOGGER.debug('Error: ', exc_info=True)
209-
self._telemetry_evaluation_producer.record_exception(self._get_method_constant(method_name[4:]))
211+
self._telemetry_evaluation_producer.record_exception(get_method_constant(method_name[4:]))
210212

211-
self._telemetry_evaluation_producer.record_latency(self._get_method_constant(method_name[4:]), get_current_epoch_time_ms() - start)
212213
return treatments
213214
except Exception: # pylint: disable=broad-except
214-
self._telemetry_evaluation_producer.record_exception(self._get_method_constant(method_name[4:]))
215+
self._telemetry_evaluation_producer.record_exception(get_method_constant(method_name[4:]))
215216
_LOGGER.error('Error getting treatment for features')
216217
_LOGGER.debug('Error: ', exc_info=True)
217218
return input_validator.generate_control_treatments(list(features), method_name)
@@ -348,10 +349,7 @@ def _record_stats(self, impressions, start, operation, method_name=None):
348349
"""
349350
end = get_current_epoch_time_ms()
350351
self._recorder.record_treatment_stats(impressions, get_latency_bucket_index(end - start),
351-
operation)
352-
if method_name is not None:
353-
self._telemetry_evaluation_producer.record_latency(self._get_method_constant(method_name[4:]), end - start)
354-
352+
operation, method_name)
355353

356354
def track(self, key, traffic_type, event_type, value=None, properties=None):
357355
"""
@@ -411,21 +409,11 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
411409
return_flag = self._recorder.record_track_stats([EventWrapper(
412410
event=event,
413411
size=size,
414-
)])
415-
self._telemetry_evaluation_producer.record_latency(MethodExceptionsAndLatencies.TRACK, get_current_epoch_time_ms() - start)
412+
)], get_current_epoch_time_ms() - start)
416413
return return_flag
417414
except Exception: # pylint: disable=broad-except
418415
self._telemetry_evaluation_producer.record_exception(MethodExceptionsAndLatencies.TRACK)
419416
_LOGGER.error('Error processing track event')
420417
_LOGGER.debug('Error: ', exc_info=True)
421418
return False
422419

423-
def _get_method_constant(self, method):
424-
if method == 'treatment':
425-
return MethodExceptionsAndLatencies.TREATMENT
426-
elif method == 'treatments':
427-
return MethodExceptionsAndLatencies.TREATMENTS
428-
elif method == 'treatment_with_config':
429-
return MethodExceptionsAndLatencies.TREATMENT_WITH_CONFIG
430-
elif method == 'treatments_with_config':
431-
return MethodExceptionsAndLatencies.TREATMENTS_WITH_CONFIG

splitio/client/factory.py

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
InMemoryImpressionStorage, InMemoryEventStorage, InMemoryTelemetryStorage, LocalhostTelemetryStorage
2424
from splitio.storage.adapters import redis
2525
from splitio.storage.redis import RedisSplitStorage, RedisSegmentStorage, RedisImpressionsStorage, \
26-
RedisEventsStorage
26+
RedisEventsStorage, RedisTelemetryStorage
2727

2828
# APIs
2929
from splitio.api.client import HttpClient
@@ -152,19 +152,9 @@ def _start_status_updater(self):
152152
ready_updater.start()
153153
else:
154154
self._status = Status.READY
155-
ready_updater = threading.Thread(target=self._update_redis_telemetry_config,
156-
name='SDKRedisTelemetryConfig')
157-
ready_updater.setDaemon(True)
158-
ready_updater.start()
159-
160-
def _update_redis_telemetry_config(self):
161-
"""Push Config Telemetry into storage."""
162-
self._telemetry_init_producer.record_ready_time(get_current_epoch_time_ms() - self._ready_time)
163-
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
164-
self._telemetry_init_producer.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
165-
config_post_thread = threading.Thread(target=self._telemetry_api.record_init(self._telemetry_init_consumer.get_config_stats()), name="PostConfigData")
166-
config_post_thread.setDaemon(True)
167-
config_post_thread.start()
155+
#Push Config Telemetry into redis storage
156+
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
157+
self._telemetry_init_producer.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
168158

169159
def _update_status_when_ready(self):
170160
"""Wait until the sdk is ready and update the status."""
@@ -343,8 +333,8 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
343333
telemetry_storage = InMemoryTelemetryStorage()
344334
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
345335
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
346-
telemetry_runtime_producer=telemetry_producer.get_telemetry_runtime_producer()
347-
# telemetry_evaluation_producer=telemetry_producer.get_telemetry_evaluation_producer()
336+
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
337+
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()
348338

349339

350340
http_client = HttpClient(
@@ -430,6 +420,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
430420
imp_manager,
431421
storages['events'],
432422
storages['impressions'],
423+
telemetry_evaluation_producer
433424
)
434425

435426
if preforked_initialization:
@@ -453,16 +444,16 @@ def _build_redis_factory(api_key, cfg):
453444
redis_adapter = redis.build(cfg)
454445
cache_enabled = cfg.get('redisLocalCacheEnabled', False)
455446
cache_ttl = cfg.get('redisLocalCacheTTL', 5)
447+
telemetry_storage = RedisTelemetryStorage(redis_adapter, sdk_metadata)
448+
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
449+
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
450+
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
456451
storages = {
457452
'splits': RedisSplitStorage(redis_adapter, cache_enabled, cache_ttl),
458453
'segments': RedisSegmentStorage(redis_adapter),
459454
'impressions': RedisImpressionsStorage(redis_adapter, sdk_metadata),
460-
'events': RedisEventsStorage(redis_adapter, sdk_metadata),
455+
'events': RedisEventsStorage(redis_adapter, sdk_metadata)
461456
}
462-
telemetry_storage = InMemoryTelemetryStorage()
463-
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
464-
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
465-
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
466457

467458
data_sampling = cfg.get('dataSampling', DEFAULT_DATA_SAMPLING)
468459
if data_sampling < _MIN_DEFAULT_DATA_SAMPLING_ALLOWED:
@@ -482,14 +473,14 @@ def _build_redis_factory(api_key, cfg):
482473

483474
synchronizers = SplitSynchronizers(None, None, None, None,
484475
impressions_count_sync,
485-
TelemetrySynchronizer(telemetry_consumer, storages['splits'], storages['segments'], redis_adapter),
476+
None,
486477
unique_keys_synchronizer,
487478
clear_filter_sync
488479
)
489480

490481
tasks = SplitTasks(None, None, None, None,
491482
impressions_count_task,
492-
TelemetrySyncTask(synchronizers.telemetry_sync.synchronize_stats, cfg['metricsRefreshRate']),
483+
None,
493484
unique_keys_task,
494485
clear_filter_task
495486
)
@@ -500,6 +491,7 @@ def _build_redis_factory(api_key, cfg):
500491
imp_manager,
501492
storages['events'],
502493
storages['impressions'],
494+
telemetry_storage,
503495
data_sampling,
504496
)
505497

@@ -528,6 +520,7 @@ def _build_localhost_factory(cfg):
528520
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
529521
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
530522
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
523+
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()
531524

532525
storages = {
533526
'splits': InMemorySplitStorage(),
@@ -557,6 +550,7 @@ def _build_localhost_factory(cfg):
557550
ImpressionsManager(StrategyDebugMode(), telemetry_runtime_producer),
558551
storages['events'],
559552
storages['impressions'],
553+
telemetry_evaluation_producer
560554
)
561555
return SplitFactory(
562556
'localhost',

splitio/client/util.py

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,25 @@
11
"""General purpose SDK utilities."""
22

3-
import socket
43
from collections import namedtuple
54
from splitio.version import __version__
5+
from splitio.util.host_info import get_hostname, get_ip
6+
7+
from splitio.models.telemetry import MethodExceptionsAndLatencies
68

79
SdkMetadata = namedtuple(
810
'SdkMetadata',
911
['sdk_version', 'instance_name', 'instance_ip']
1012
)
1113

12-
13-
def _get_ip():
14-
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
15-
try:
16-
# doesn't even have to be reachable
17-
sock.connect(('10.255.255.255', 1))
18-
ip_address = sock.getsockname()[0]
19-
except Exception: # pylint: disable=broad-except
20-
ip_address = 'unknown'
21-
finally:
22-
sock.close()
23-
return ip_address
24-
25-
26-
def _get_hostname(ip_address):
27-
return 'unknown' if ip_address == 'unknown' else 'ip-' + ip_address.replace('.', '-')
28-
29-
3014
def _get_hostname_and_ip(config):
3115
if config.get('IPAddressesEnabled') is False:
3216
return 'NA', 'NA'
3317
ip_from_config = config.get('machineIp')
3418
machine_from_config = config.get('machineName')
35-
ip_address = ip_from_config if ip_from_config is not None else _get_ip()
36-
hostname = machine_from_config if machine_from_config is not None else _get_hostname(ip_address)
19+
ip_address = ip_from_config if ip_from_config is not None else get_ip()
20+
hostname = machine_from_config if machine_from_config is not None else get_hostname()
3721
return ip_address, hostname
3822

39-
4023
def get_metadata(config):
4124
"""
4225
Gather SDK metadata and return a tuple with such info.
@@ -50,3 +33,15 @@ def get_metadata(config):
5033
version = 'python-%s' % __version__
5134
ip_address, hostname = _get_hostname_and_ip(config)
5235
return SdkMetadata(version, hostname, ip_address)
36+
37+
def get_method_constant(method):
38+
if method == 'treatment':
39+
return MethodExceptionsAndLatencies.TREATMENT
40+
elif method == 'treatments':
41+
return MethodExceptionsAndLatencies.TREATMENTS
42+
elif method == 'treatment_with_config':
43+
return MethodExceptionsAndLatencies.TREATMENT_WITH_CONFIG
44+
elif method == 'treatments_with_config':
45+
return MethodExceptionsAndLatencies.TREATMENTS_WITH_CONFIG
46+
elif method == 'track':
47+
return MethodExceptionsAndLatencies.TRACK

splitio/recorder/recorder.py

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
import logging
44
import random
55

6-
6+
from splitio.client.util import get_method_constant
77
from splitio.client.config import DEFAULT_DATA_SAMPLING
8-
8+
from splitio.models.telemetry import MethodExceptionsAndLatencies
99

1010
_LOGGER = logging.getLogger(__name__)
1111

@@ -41,7 +41,7 @@ def record_track_stats(self, events):
4141
class StandardRecorder(StatsRecorder):
4242
"""StandardRecorder class."""
4343

44-
def __init__(self, impressions_manager, event_storage, impression_storage):
44+
def __init__(self, impressions_manager, event_storage, impression_storage, telemetry_evaluation_producer):
4545
"""
4646
Class constructor.
4747
@@ -55,8 +55,9 @@ def __init__(self, impressions_manager, event_storage, impression_storage):
5555
self._impressions_manager = impressions_manager
5656
self._event_sotrage = event_storage
5757
self._impression_storage = impression_storage
58+
self._telemetry_evaluation_producer = telemetry_evaluation_producer
5859

59-
def record_treatment_stats(self, impressions, latency, operation):
60+
def record_treatment_stats(self, impressions, latency, operation, method_name):
6061
"""
6162
Record stats for treatment evaluation.
6263
@@ -68,27 +69,30 @@ def record_treatment_stats(self, impressions, latency, operation):
6869
:type operation: str
6970
"""
7071
try:
72+
if method_name is not None:
73+
self._telemetry_evaluation_producer.record_latency(get_method_constant(method_name[4:]), latency)
7174
impressions = self._impressions_manager.process_impressions(impressions)
7275
self._impression_storage.put(impressions)
7376
except Exception: # pylint: disable=broad-except
7477
_LOGGER.error('Error recording impressions')
7578
_LOGGER.debug('Error: ', exc_info=True)
7679

77-
def record_track_stats(self, event):
80+
def record_track_stats(self, event, latency):
7881
"""
7982
Record stats for tracking events.
8083
8184
:param event: events tracked
8285
:type event: splitio.models.events.EventWrapper
8386
"""
87+
self._telemetry_evaluation_producer.record_latency(MethodExceptionsAndLatencies.TRACK, latency)
8488
return self._event_sotrage.put(event)
8589

8690

8791
class PipelinedRecorder(StatsRecorder):
8892
"""PipelinedRecorder class."""
8993

9094
def __init__(self, pipe, impressions_manager, event_storage,
91-
impression_storage, data_sampling=DEFAULT_DATA_SAMPLING):
95+
impression_storage, telemetry_redis_storage, data_sampling=DEFAULT_DATA_SAMPLING):
9296
"""
9397
Class constructor.
9498
@@ -108,8 +112,9 @@ def __init__(self, pipe, impressions_manager, event_storage,
108112
self._event_sotrage = event_storage
109113
self._impression_storage = impression_storage
110114
self._data_sampling = data_sampling
115+
self._telemetry_redis_storage = telemetry_redis_storage
111116

112-
def record_treatment_stats(self, impressions, latency, operation):
117+
def record_treatment_stats(self, impressions, latency, operation, method_name):
113118
"""
114119
Record stats for treatment evaluation.
115120
@@ -131,22 +136,23 @@ def record_treatment_stats(self, impressions, latency, operation):
131136
impressions = self._impressions_manager.process_impressions(impressions)
132137
if not impressions:
133138
return
134-
# pipe = self._make_pipe()
135-
# self._impression_storage.add_impressions_to_pipe(impressions, pipe)
136-
# self._telemetry_storage.add_latency_to_pipe(operation, latency, pipe)
137-
# result = pipe.execute()
138-
# if len(result) == 2:
139-
# self._impression_storage.expire_key(result[0], len(impressions))
140-
self._impression_storage.put(impressions)
139+
pipe = self._make_pipe()
140+
self._impression_storage.add_impressions_to_pipe(impressions, pipe)
141+
result = pipe.execute()
142+
if len(result) == 2:
143+
self._impression_storage.expire_key(result[0], len(impressions))
144+
if method_name is not None:
145+
self._telemetry_redis_storage.record_latency(method_name[4:], latency)
141146
except Exception: # pylint: disable=broad-except
142147
_LOGGER.error('Error recording impressions')
143148
_LOGGER.debug('Error: ', exc_info=True)
144149

145-
def record_track_stats(self, event):
150+
def record_track_stats(self, event, latency):
146151
"""
147152
Record stats for tracking events.
148153
149154
:param event: events tracked
150155
:type event: splitio.models.events.EventWrapper
151156
"""
157+
self._telemetry_redis_storage.record_latency(MethodExceptionsAndLatencies.TRACK.value, latency)
152158
return self._event_sotrage.put(event)

0 commit comments

Comments
 (0)