Skip to content

Commit 4e3448c

Browse files
authored
Merge pull request #280 from splitio/telemetry-redis
Telemetry redis
2 parents 77291f9 + feac1e4 commit 4e3448c

27 files changed

+508
-306
lines changed

splitio/client/client.py

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""A module for Split.io SDK API clients."""
22
import logging
3-
import time
3+
4+
from splitio.client.util import get_method_constant
45
from splitio.engine.evaluator import Evaluator, CONTROL
56
from splitio.engine.splitters import Splitter
67
from splitio.models.impressions import Impression, Label
@@ -123,7 +124,7 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
123124
except Exception: # pylint: disable=broad-except
124125
_LOGGER.error('Error getting treatment for feature')
125126
_LOGGER.debug('Error: ', exc_info=True)
126-
self._telemetry_evaluation_producer.record_exception(self._get_method_constant(method_name[4:]))
127+
self._telemetry_evaluation_producer.record_exception(get_method_constant(method_name[4:]))
127128
try:
128129
impression = self._build_impression(
129130
matching_key,
@@ -200,18 +201,18 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
200201
self._record_stats(
201202
[(i, attributes) for i in bulk_impressions],
202203
start,
203-
metric_name
204+
metric_name,
205+
method_name
204206
)
205207
except Exception: # pylint: disable=broad-except
206208
_LOGGER.error('%s: An exception when trying to store '
207209
'impressions.' % method_name)
208210
_LOGGER.debug('Error: ', exc_info=True)
209-
self._telemetry_evaluation_producer.record_exception(self._get_method_constant(method_name[4:]))
211+
self._telemetry_evaluation_producer.record_exception(get_method_constant(method_name[4:]))
210212

211-
self._telemetry_evaluation_producer.record_latency(self._get_method_constant(method_name[4:]), get_current_epoch_time_ms() - start)
212213
return treatments
213214
except Exception: # pylint: disable=broad-except
214-
self._telemetry_evaluation_producer.record_exception(self._get_method_constant(method_name[4:]))
215+
self._telemetry_evaluation_producer.record_exception(get_method_constant(method_name[4:]))
215216
_LOGGER.error('Error getting treatment for features')
216217
_LOGGER.debug('Error: ', exc_info=True)
217218
return input_validator.generate_control_treatments(list(features), method_name)
@@ -348,10 +349,7 @@ def _record_stats(self, impressions, start, operation, method_name=None):
348349
"""
349350
end = get_current_epoch_time_ms()
350351
self._recorder.record_treatment_stats(impressions, get_latency_bucket_index(end - start),
351-
operation)
352-
if method_name is not None:
353-
self._telemetry_evaluation_producer.record_latency(self._get_method_constant(method_name[4:]), end - start)
354-
352+
operation, method_name)
355353

356354
def track(self, key, traffic_type, event_type, value=None, properties=None):
357355
"""
@@ -411,21 +409,11 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
411409
return_flag = self._recorder.record_track_stats([EventWrapper(
412410
event=event,
413411
size=size,
414-
)])
415-
self._telemetry_evaluation_producer.record_latency(MethodExceptionsAndLatencies.TRACK, get_current_epoch_time_ms() - start)
412+
)], get_current_epoch_time_ms() - start)
416413
return return_flag
417414
except Exception: # pylint: disable=broad-except
418415
self._telemetry_evaluation_producer.record_exception(MethodExceptionsAndLatencies.TRACK)
419416
_LOGGER.error('Error processing track event')
420417
_LOGGER.debug('Error: ', exc_info=True)
421418
return False
422419

423-
def _get_method_constant(self, method):
424-
if method == 'treatment':
425-
return MethodExceptionsAndLatencies.TREATMENT
426-
elif method == 'treatments':
427-
return MethodExceptionsAndLatencies.TREATMENTS
428-
elif method == 'treatment_with_config':
429-
return MethodExceptionsAndLatencies.TREATMENT_WITH_CONFIG
430-
elif method == 'treatments_with_config':
431-
return MethodExceptionsAndLatencies.TREATMENTS_WITH_CONFIG

splitio/client/factory.py

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
InMemoryImpressionStorage, InMemoryEventStorage, InMemoryTelemetryStorage, LocalhostTelemetryStorage
2424
from splitio.storage.adapters import redis
2525
from splitio.storage.redis import RedisSplitStorage, RedisSegmentStorage, RedisImpressionsStorage, \
26-
RedisEventsStorage
26+
RedisEventsStorage, RedisTelemetryStorage
2727

2828
# APIs
2929
from splitio.api.client import HttpClient
@@ -66,7 +66,7 @@
6666
_INSTANTIATED_FACTORIES = Counter()
6767
_INSTANTIATED_FACTORIES_LOCK = threading.RLock()
6868
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED = 0.1 # 10%
69-
69+
_MAX_RETRY_SYNC_ALL = 3
7070

7171
class Status(Enum):
7272
"""Factory Status."""
@@ -152,6 +152,9 @@ def _start_status_updater(self):
152152
ready_updater.start()
153153
else:
154154
self._status = Status.READY
155+
#Push Config Telemetry into redis storage
156+
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
157+
self._telemetry_init_producer.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
155158

156159
def _update_status_when_ready(self):
157160
"""Wait until the sdk is ready and update the status."""
@@ -330,8 +333,8 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
330333
telemetry_storage = InMemoryTelemetryStorage()
331334
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
332335
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
333-
telemetry_runtime_producer=telemetry_producer.get_telemetry_runtime_producer()
334-
# telemetry_evaluation_producer=telemetry_producer.get_telemetry_evaluation_producer()
336+
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
337+
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()
335338

336339

337340
http_client = HttpClient(
@@ -352,9 +355,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
352355
'telemetry': TelemetryAPI(http_client, api_key, sdk_metadata, telemetry_runtime_producer),
353356
}
354357

355-
if not input_validator.validate_apikey_type(apis['segments']):
356-
return None
357-
358358
storages = {
359359
'splits': InMemorySplitStorage(),
360360
'segments': InMemorySegmentStorage(),
@@ -417,10 +417,11 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
417417
imp_manager,
418418
storages['events'],
419419
storages['impressions'],
420+
telemetry_evaluation_producer
420421
)
421422

422423
if preforked_initialization:
423-
synchronizer.sync_all()
424+
synchronizer.sync_all(max_retry_attempts=_MAX_RETRY_SYNC_ALL)
424425
synchronizer._split_synchronizers._segment_sync.shutdown()
425426
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
426427
recorder, manager, None, telemetry_producer, telemetry_consumer.get_telemetry_init_consumer(), apis['telemetry'], preforked_initialization=preforked_initialization)
@@ -440,16 +441,16 @@ def _build_redis_factory(api_key, cfg):
440441
redis_adapter = redis.build(cfg)
441442
cache_enabled = cfg.get('redisLocalCacheEnabled', False)
442443
cache_ttl = cfg.get('redisLocalCacheTTL', 5)
444+
telemetry_storage = RedisTelemetryStorage(redis_adapter, sdk_metadata)
445+
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
446+
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
447+
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
443448
storages = {
444449
'splits': RedisSplitStorage(redis_adapter, cache_enabled, cache_ttl),
445450
'segments': RedisSegmentStorage(redis_adapter),
446451
'impressions': RedisImpressionsStorage(redis_adapter, sdk_metadata),
447-
'events': RedisEventsStorage(redis_adapter, sdk_metadata),
452+
'events': RedisEventsStorage(redis_adapter, sdk_metadata)
448453
}
449-
telemetry_storage = InMemoryTelemetryStorage()
450-
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
451-
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
452-
telemetry_runtime_producer=telemetry_producer.get_telemetry_runtime_producer()
453454

454455
data_sampling = cfg.get('dataSampling', DEFAULT_DATA_SAMPLING)
455456
if data_sampling < _MIN_DEFAULT_DATA_SAMPLING_ALLOWED:
@@ -487,6 +488,7 @@ def _build_redis_factory(api_key, cfg):
487488
imp_manager,
488489
storages['events'],
489490
storages['impressions'],
491+
telemetry_storage,
490492
data_sampling,
491493
)
492494

@@ -495,13 +497,16 @@ def _build_redis_factory(api_key, cfg):
495497
initialization_thread.setDaemon(True)
496498
initialization_thread.start()
497499

500+
telemetry_producer.get_telemetry_init_producer().record_config(cfg, {})
501+
498502
return SplitFactory(
499503
api_key,
500504
storages,
501505
cfg['labelsEnabled'],
502506
recorder,
503507
manager,
504508
sdk_ready_flag=None,
509+
telemetry_api=redis_adapter,
505510
telemetry_producer=telemetry_producer,
506511
telemetry_init_consumer=telemetry_consumer.get_telemetry_init_consumer()
507512
)
@@ -512,6 +517,7 @@ def _build_localhost_factory(cfg):
512517
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
513518
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
514519
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
520+
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()
515521

516522
storages = {
517523
'splits': InMemorySplitStorage(),
@@ -541,6 +547,7 @@ def _build_localhost_factory(cfg):
541547
ImpressionsManager(StrategyDebugMode(), telemetry_runtime_producer),
542548
storages['events'],
543549
storages['impressions'],
550+
telemetry_evaluation_producer
544551
)
545552
return SplitFactory(
546553
'localhost',

splitio/client/input_validator.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -448,31 +448,6 @@ def filter(self, record):
448448
return record.name not in ('SegmentsAPI', 'HttpClient')
449449

450450

451-
def validate_apikey_type(segment_api):
452-
"""
453-
Try to guess if the apikey is of browser type and let the user know.
454-
455-
:param segment_api: Segments API client.
456-
:type segment_api: splitio.api.segments.SegmentsAPI
457-
"""
458-
api_messages_filter = _ApiLogFilter()
459-
_logger = logging.getLogger('splitio.api.segments')
460-
try:
461-
_logger.addFilter(api_messages_filter) # pylint: disable=protected-access
462-
segment_api.fetch_segment('__SOME_INVALID_SEGMENT__', -1, FetchOptions())
463-
except APIException as exc:
464-
if exc.status_code == 403:
465-
_LOGGER.error('factory instantiation: you passed a browser type '
466-
+ 'api_key, please grab an api key from the Split '
467-
+ 'console that is of type sdk')
468-
return False
469-
finally:
470-
_logger.removeFilter(api_messages_filter) # pylint: disable=protected-access
471-
472-
# True doesn't mean that the APIKEY is right, only that it's not of type "browser"
473-
return True
474-
475-
476451
def validate_factory_instantiation(apikey):
477452
"""
478453
Check if the factory if being instantiated with the appropriate arguments.

splitio/client/util.py

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,41 @@
11
"""General purpose SDK utilities."""
22

3-
import socket
43
from collections import namedtuple
54
from splitio.version import __version__
5+
from splitio.util.host_info import get_hostname, get_ip
6+
7+
from splitio.models.telemetry import MethodExceptionsAndLatencies
8+
9+
_MAP_METHOD_TO_ENUM = {'treatment': MethodExceptionsAndLatencies.TREATMENT,
10+
'treatments': MethodExceptionsAndLatencies.TREATMENTS,
11+
'treatment_with_config': MethodExceptionsAndLatencies.TREATMENT_WITH_CONFIG,
12+
'treatments_with_config': MethodExceptionsAndLatencies.TREATMENTS_WITH_CONFIG,
13+
'track': MethodExceptionsAndLatencies.TRACK
14+
}
615

716
SdkMetadata = namedtuple(
817
'SdkMetadata',
918
['sdk_version', 'instance_name', 'instance_ip']
1019
)
1120

21+
def _get_hostname_and_ip(config):
22+
"""
23+
Get current hostname and IP address if config parameters are not set.
1224
13-
def _get_ip():
14-
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
15-
try:
16-
# doesn't even have to be reachable
17-
sock.connect(('10.255.255.255', 1))
18-
ip_address = sock.getsockname()[0]
19-
except Exception: # pylint: disable=broad-except
20-
ip_address = 'unknown'
21-
finally:
22-
sock.close()
23-
return ip_address
24-
25-
26-
def _get_hostname(ip_address):
27-
return 'unknown' if ip_address == 'unknown' else 'ip-' + ip_address.replace('.', '-')
28-
25+
:param config: User supplied config augmented with defaults.
26+
:type config: dict
2927
30-
def _get_hostname_and_ip(config):
28+
:return: IP address and Hostname
29+
:rtype: Tuple (str, str)
30+
"""
3131
if config.get('IPAddressesEnabled') is False:
3232
return 'NA', 'NA'
3333
ip_from_config = config.get('machineIp')
3434
machine_from_config = config.get('machineName')
35-
ip_address = ip_from_config if ip_from_config is not None else _get_ip()
36-
hostname = machine_from_config if machine_from_config is not None else _get_hostname(ip_address)
35+
ip_address = ip_from_config if ip_from_config is not None else get_ip()
36+
hostname = machine_from_config if machine_from_config is not None else get_hostname()
3737
return ip_address, hostname
3838

39-
4039
def get_metadata(config):
4140
"""
4241
Gather SDK metadata and return a tuple with such info.
@@ -50,3 +49,12 @@ def get_metadata(config):
5049
version = 'python-%s' % __version__
5150
ip_address, hostname = _get_hostname_and_ip(config)
5251
return SdkMetadata(version, hostname, ip_address)
52+
53+
def get_method_constant(method):
54+
"""
55+
Get method name mapped to the Method Enum object
56+
57+
:return: method name
58+
:rtype: str
59+
"""
60+
return _MAP_METHOD_TO_ENUM[method]

splitio/engine/impressions/__init__.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,21 @@ def set_classes(storage_mode, impressions_mode, api_adapter):
1414
clear_filter_task = None
1515
impressions_count_sync = None
1616
impressions_count_task = None
17+
sender_adapter = None
1718
if storage_mode == 'REDIS':
18-
redis_sender_adapter = RedisSenderAdapter(api_adapter)
19-
api_telemetry_adapter = redis_sender_adapter
20-
api_impressions_adapter = redis_sender_adapter
19+
sender_adapter = RedisSenderAdapter(api_adapter)
20+
api_telemetry_adapter = sender_adapter
21+
api_impressions_adapter = sender_adapter
2122
else:
2223
api_telemetry_adapter = api_adapter['telemetry']
2324
api_impressions_adapter = api_adapter['impressions']
25+
sender_adapter = InMemorySenderAdapter(api_telemetry_adapter)
2426

2527
if impressions_mode == ImpressionsMode.NONE:
2628
imp_counter = ImpressionsCounter()
2729
imp_strategy = StrategyNoneMode(imp_counter)
2830
clear_filter_sync = ClearFilterSynchronizer(imp_strategy.get_unique_keys_tracker())
29-
unique_keys_synchronizer = UniqueKeysSynchronizer(InMemorySenderAdapter(api_telemetry_adapter), imp_strategy.get_unique_keys_tracker())
31+
unique_keys_synchronizer = UniqueKeysSynchronizer(sender_adapter, imp_strategy.get_unique_keys_tracker())
3032
unique_keys_task = UniqueKeysSyncTask(unique_keys_synchronizer.send_all)
3133
clear_filter_task = ClearFilterSyncTask(clear_filter_sync.clear_all)
3234
imp_strategy.get_unique_keys_tracker().set_queue_full_hook(unique_keys_task.flush)

splitio/engine/impressions/adapters.py

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,16 @@ def flush_counters(self, to_send):
9191
:param uniques: unique keys disctionary
9292
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
9393
"""
94-
bulk_counts = self._build_counters(to_send)
9594
try:
96-
inserted = self._redis_client.rpush(self.IMP_COUNT_QUEUE_KEY, *bulk_counts)
97-
self._expire_keys(self.IMP_COUNT_QUEUE_KEY, self.IMP_COUNT_KEY_DEFAULT_TTL, inserted, len(bulk_counts))
95+
resulted = 0
96+
counted = 0
97+
pipe = self._redis_client.pipeline()
98+
for pf_count in to_send:
99+
pipe.hincrby(self.IMP_COUNT_QUEUE_KEY, pf_count.feature + "::" + str(pf_count.timeframe), pf_count.count)
100+
counted += pf_count.count
101+
resulted = sum(pipe.execute())
102+
self._expire_keys(self.IMP_COUNT_QUEUE_KEY,
103+
self.IMP_COUNT_KEY_DEFAULT_TTL, resulted, counted)
98104
return True
99105
except RedisAdapterException:
100106
_LOGGER.error('Something went wrong when trying to add counters to redis')
@@ -124,23 +130,3 @@ def _uniques_formatter(self, uniques):
124130
:rtype: json
125131
"""
126132
return [json.dumps({'f': feature, 'ks': list(keys)}) for feature, keys in uniques.items()]
127-
128-
def _build_counters(self, counters):
129-
"""
130-
Build an impression bulk formatted as the API expects it.
131-
132-
:param counters: List of impression counters per feature.
133-
:type counters: list[splitio.engine.impressions.Counter.CountPerFeature]
134-
135-
:return: dict with list of impression count dtos
136-
:rtype: dict
137-
"""
138-
return json.dumps({
139-
'pf': [
140-
{
141-
'f': pf_count.feature,
142-
'm': pf_count.timeframe,
143-
'rc': pf_count.count
144-
} for pf_count in counters
145-
]
146-
})

0 commit comments

Comments
 (0)