Skip to content

Commit ff52ac5

Browse files
committed
Minor fixes and updated/created tests
1 parent 327f6c5 commit ff52ac5

38 files changed

+1337
-665
lines changed

splitio/api/auth.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from splitio.api.commons import headers_from_metadata, record_telemetry, get_current_epoch_time
88
from splitio.api.client import HttpClientException
99
from splitio.models.token import from_raw
10-
from splitio.models.telemetry import TOKEN
10+
from splitio.models.telemetry import HTTPExceptionsAndLatencies
1111

1212
_LOGGER = logging.getLogger(__name__)
1313

@@ -46,7 +46,7 @@ def authenticate(self):
4646
self._apikey,
4747
extra_headers=self._metadata,
4848
)
49-
record_telemetry(response.status_code, get_current_epoch_time() - start, TOKEN, self._telemetry_runtime_producer)
49+
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.TOKEN, self._telemetry_runtime_producer)
5050
if 200 <= response.status_code < 300:
5151
payload = json.loads(response.body)
5252
return from_raw(payload)

splitio/api/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
_LOGGER = logging.getLogger(__name__)
77

88
HttpResponse = namedtuple('HttpResponse', ['status_code', 'body'])
9-
9+
HTTP_TIMEOUT = 1500
1010

1111
class HttpClientException(Exception):
1212
"""HTTP Client exception."""
@@ -44,7 +44,7 @@ def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None, t
4444
:param telemetry_url: Optional alternative telemetry URL.
4545
:type telemetry_url: str
4646
"""
47-
self._timeout = timeout/1000 if timeout else None # Convert ms to seconds.
47+
self._timeout = timeout/1000 if timeout else HTTP_TIMEOUT # Convert ms to seconds.
4848
self._urls = {
4949
'sdk': sdk_url if sdk_url is not None else self.SDK_URL,
5050
'events': events_url if events_url is not None else self.EVENTS_URL,

splitio/api/events.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from splitio.api import APIException
66
from splitio.api.client import HttpClientException
77
from splitio.api.commons import headers_from_metadata, record_telemetry, get_current_epoch_time
8-
from splitio.models.telemetry import EVENT
8+
from splitio.models.telemetry import HTTPExceptionsAndLatencies
99

1010

1111
_LOGGER = logging.getLogger(__name__)
@@ -73,7 +73,7 @@ def flush_events(self, events):
7373
body=bulk,
7474
extra_headers=self._metadata,
7575
)
76-
record_telemetry(response.status_code, get_current_epoch_time() - start, EVENT, self._telemetry_runtime_producer)
76+
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.EVENT, self._telemetry_runtime_producer)
7777
if not 200 <= response.status_code < 300:
7878
raise APIException(response.body, response.status_code)
7979
except HttpClientException as exc:

splitio/api/impressions.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from splitio.api.client import HttpClientException
88
from splitio.api.commons import headers_from_metadata, record_telemetry, get_current_epoch_time
99
from splitio.engine.impressions import ImpressionsMode
10-
from splitio.models.telemetry import IMPRESSION, IMPRESSION_COUNT
10+
from splitio.models.telemetry import HTTPExceptionsAndLatencies
1111

1212

1313
_LOGGER = logging.getLogger(__name__)
@@ -102,7 +102,7 @@ def flush_impressions(self, impressions):
102102
body=bulk,
103103
extra_headers=self._metadata,
104104
)
105-
record_telemetry(response.status_code, get_current_epoch_time() - start, IMPRESSION, self._telemetry_runtime_producer)
105+
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.IMPRESSION, self._telemetry_runtime_producer)
106106
if not 200 <= response.status_code < 300:
107107
raise APIException(response.body, response.status_code)
108108
except HttpClientException as exc:
@@ -129,7 +129,7 @@ def flush_counters(self, counters):
129129
body=bulk,
130130
extra_headers=self._metadata,
131131
)
132-
record_telemetry(response.status_code, get_current_epoch_time() - start, IMPRESSION_COUNT, self._telemetry_runtime_producer)
132+
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.IMPRESSION_COUNT, self._telemetry_runtime_producer)
133133
if not 200 <= response.status_code < 300:
134134
raise APIException(response.body, response.status_code)
135135
except HttpClientException as exc:

splitio/api/segments.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from splitio.api import APIException
88
from splitio.api.commons import headers_from_metadata, build_fetch, record_telemetry, get_current_epoch_time
99
from splitio.api.client import HttpClientException
10-
from splitio.models.telemetry import SEGMENT
10+
from splitio.models.telemetry import HTTPExceptionsAndLatencies
1111

1212

1313
_LOGGER = logging.getLogger(__name__)
@@ -59,7 +59,7 @@ def fetch_segment(self, segment_name, change_number, fetch_options):
5959
extra_headers=extra_headers,
6060
query=query,
6161
)
62-
record_telemetry(response.status_code, get_current_epoch_time() - start, SEGMENT, self._telemetry_runtime_producer)
62+
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.SEGMENT, self._telemetry_runtime_producer)
6363
if 200 <= response.status_code < 300:
6464
return json.loads(response.body)
6565
else:

splitio/api/splits.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from splitio.api import APIException
88
from splitio.api.commons import headers_from_metadata, build_fetch, record_telemetry, get_current_epoch_time
99
from splitio.api.client import HttpClientException
10-
from splitio.models.telemetry import SPLIT
10+
from splitio.models.telemetry import HTTPExceptionsAndLatencies
1111

1212
_LOGGER = logging.getLogger(__name__)
1313

@@ -54,7 +54,7 @@ def fetch_splits(self, change_number, fetch_options):
5454
extra_headers=extra_headers,
5555
query=query,
5656
)
57-
record_telemetry(response.status_code, get_current_epoch_time() - start, SPLIT, self._telemetry_runtime_producer)
57+
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.SPLIT, self._telemetry_runtime_producer)
5858
if 200 <= response.status_code < 300:
5959
return json.loads(response.body)
6060
else:

splitio/api/telemetry.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from splitio.api import APIException
66
from splitio.api.client import HttpClientException
77
from splitio.api.commons import headers_from_metadata, record_telemetry, get_current_epoch_time
8-
from splitio.models.telemetry import TELEMETRY
8+
from splitio.models.telemetry import HTTPExceptionsAndLatencies
99

1010
_LOGGER = logging.getLogger(__name__)
1111

@@ -42,7 +42,7 @@ def record_unique_keys(self, uniques):
4242
body=uniques,
4343
extra_headers=self._metadata
4444
)
45-
record_telemetry(response.status_code, get_current_epoch_time() - start, TELEMETRY, self._telemetry_runtime_producer)
45+
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.TELEMETRY, self._telemetry_runtime_producer)
4646
if not 200 <= response.status_code < 300:
4747
raise APIException(response.body, response.status_code)
4848
except HttpClientException as exc:
@@ -68,7 +68,7 @@ def record_init(self, configs):
6868
body=configs,
6969
extra_headers=self._metadata,
7070
)
71-
record_telemetry(response.status_code, get_current_epoch_time() - start, TELEMETRY, self._telemetry_runtime_producer)
71+
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.TELEMETRY, self._telemetry_runtime_producer)
7272
if not 200 <= response.status_code < 300:
7373
raise APIException(response.body, response.status_code)
7474
except HttpClientException as exc:
@@ -94,7 +94,7 @@ def record_stats(self, stats):
9494
body=stats,
9595
extra_headers=self._metadata,
9696
)
97-
record_telemetry(response.status_code, get_current_epoch_time() - start, TELEMETRY, self._telemetry_runtime_producer)
97+
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.TELEMETRY, self._telemetry_runtime_producer)
9898
if not 200 <= response.status_code < 300:
9999
raise APIException(response.body, response.status_code)
100100
except HttpClientException as exc:

splitio/client/client.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from splitio.engine.splitters import Splitter
66
from splitio.models.impressions import Impression, Label
77
from splitio.models.events import Event, EventWrapper
8-
from splitio.models.telemetry import get_latency_bucket_index, TRACK
8+
from splitio.models.telemetry import get_latency_bucket_index, MethodExceptionsAndLatencies
99
from splitio.client import input_validator
1010
from splitio.util import utctime_ms
1111
from splitio.api.commons import get_current_epoch_time
@@ -124,7 +124,8 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
124124
except Exception: # pylint: disable=broad-except
125125
_LOGGER.error('Error getting treatment for feature')
126126
_LOGGER.debug('Error: ', exc_info=True)
127-
self._telemetry_evaluation_producer.record_exception(method_name[4:])
127+
if not self._telemetry_evaluation_producer == None:
128+
self._telemetry_evaluation_producer.record_exception(method_name[4:])
128129
try:
129130
impression = self._build_impression(
130131
matching_key,
@@ -207,12 +208,15 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
207208
_LOGGER.error('%s: An exception when trying to store '
208209
'impressions.' % method_name)
209210
_LOGGER.debug('Error: ', exc_info=True)
210-
self._telemetry_evaluation_producer.record_exception(method_name[4:])
211+
if not self._telemetry_evaluation_producer == None:
212+
self._telemetry_evaluation_producer.record_exception(method_name[4:])
211213

212-
self._telemetry_evaluation_producer.record_latency(method_name[4:], get_current_epoch_time() - start)
214+
if not self._telemetry_evaluation_producer == None:
215+
self._telemetry_evaluation_producer.record_latency(method_name[4:], get_current_epoch_time() - start)
213216
return treatments
214217
except Exception: # pylint: disable=broad-except
215-
self._telemetry_evaluation_producer.record_exception(method_name)
218+
if not self._telemetry_evaluation_producer == None:
219+
self._telemetry_evaluation_producer.record_exception(method_name[4:])
216220
_LOGGER.error('Error getting treatment for features')
217221
_LOGGER.debug('Error: ', exc_info=True)
218222
return input_validator.generate_control_treatments(list(features), method_name)
@@ -350,7 +354,7 @@ def _record_stats(self, impressions, start, operation, method_name=None):
350354
end = get_current_epoch_time()
351355
self._recorder.record_treatment_stats(impressions, get_latency_bucket_index(end - start),
352356
operation)
353-
if not method_name == None:
357+
if not method_name == None and not self._telemetry_evaluation_producer == None:
354358
self._telemetry_evaluation_producer.record_latency(method_name[4:], end - start)
355359

356360

@@ -413,9 +417,11 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
413417
event=event,
414418
size=size,
415419
)])
416-
self._telemetry_evaluation_producer.record_latency(TRACK, get_current_epoch_time() - start)
420+
if not self._telemetry_evaluation_producer == None:
421+
self._telemetry_evaluation_producer.record_latency(MethodExceptionsAndLatencies.TRACK, get_current_epoch_time() - start)
417422
except Exception: # pylint: disable=broad-except
418-
self._telemetry_evaluation_producer.record_exception(TRACK)
423+
if not self._telemetry_evaluation_producer == None:
424+
self._telemetry_evaluation_producer.record_exception(MethodExceptionsAndLatencies.TRACK)
419425
_LOGGER.error('Error processing track event')
420426
_LOGGER.debug('Error: ', exc_info=True)
421427

splitio/client/factory.py

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,11 @@ def __init__( # pylint: disable=too-many-arguments
124124
self._sdk_internal_ready_flag = sdk_ready_flag
125125
self._recorder = recorder
126126
self._preforked_initialization = preforked_initialization
127-
self._telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()
128-
self._telemetry_init_producer = telemetry_producer.get_telemetry_init_producer()
127+
self._telemetry_evaluation_producer = None
128+
self._telemetry_init_producer = None
129+
if not telemetry_producer == None:
130+
self._telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()
131+
self._telemetry_init_producer = telemetry_producer.get_telemetry_init_producer()
129132
self._telemetry_init_consumer = telemetry_init_consumer
130133
self._telemetry_api = telemetry_api
131134
self._ready_time = get_current_epoch_time()
@@ -156,13 +159,14 @@ def _update_status_when_ready(self):
156159
self._sdk_internal_ready_flag.wait()
157160
self._status = Status.READY
158161
self._sdk_ready_flag.set()
159-
self._telemetry_init_producer.record_ready_time(get_current_epoch_time() - self._ready_time)
160-
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
161-
self._telemetry_init_producer.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
162+
if not self._telemetry_init_producer == None:
163+
self._telemetry_init_producer.record_ready_time(get_current_epoch_time() - self._ready_time)
164+
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
165+
self._telemetry_init_producer.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
162166

163-
config_post_thread = threading.Thread(target=self._telemetry_api.record_init(self._telemetry_init_consumer.get_config_stats()), name="PostConfigData")
164-
config_post_thread.setDaemon(True)
165-
config_post_thread.start()
167+
config_post_thread = threading.Thread(target=self._telemetry_api.record_init(self._telemetry_init_consumer.get_config_stats()), name="PostConfigData")
168+
config_post_thread.setDaemon(True)
169+
config_post_thread.start()
166170

167171

168172
def _get_storage(self, name):
@@ -329,7 +333,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
329333
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
330334
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
331335
telemetry_runtime_producer=telemetry_producer.get_telemetry_runtime_producer()
332-
telemetry_evaluation_producer=telemetry_producer.get_telemetry_evaluation_producer()
336+
# telemetry_evaluation_producer=telemetry_producer.get_telemetry_evaluation_producer()
333337

334338

335339
http_client = HttpClient(
@@ -365,8 +369,8 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
365369
imp_strategy = set_classes('MEMORY', cfg['impressionsMode'], apis)
366370

367371
imp_manager = ImpressionsManager(
368-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
369-
imp_strategy, telemetry_runtime_producer)
372+
imp_strategy, telemetry_runtime_producer,
373+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata))
370374

371375
synchronizers = SplitSynchronizers(
372376
SplitSynchronizer(apis['splits'], storages['splits']),
@@ -375,9 +379,9 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
375379
cfg['impressionsBulkSize']),
376380
EventSynchronizer(apis['events'], storages['events'], cfg['eventsBulkSize']),
377381
impressions_count_sync,
382+
TelemetrySynchronizer(telemetry_consumer, storages['splits'], storages['segments'], apis['telemetry']),
378383
unique_keys_synchronizer,
379384
clear_filter_sync,
380-
TelemetrySynchronizer(telemetry_consumer, storages['splits'], storages['segments'], apis['telemetry'])
381385
)
382386

383387
tasks = SplitTasks(
@@ -395,9 +399,9 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
395399
),
396400
EventsSyncTask(synchronizers.events_sync.synchronize_events, cfg['eventsPushRate']),
397401
impressions_count_task,
402+
TelemetrySyncTask(synchronizers.telemetry_sync.synchronize_stats, cfg['metricsRefreshRate']),
398403
unique_keys_task,
399404
clear_filter_task,
400-
TelemetrySyncTask(synchronizers.telemetry_sync.synchronize_stats, cfg['metricsRefreshRate']),
401405
)
402406

403407
synchronizer = Synchronizer(synchronizers, tasks)
@@ -421,7 +425,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
421425
synchronizer.sync_all()
422426
synchronizer._split_synchronizers._segment_sync.shutdown()
423427
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
424-
recorder, manager, preforked_initialization=preforked_initialization)
428+
recorder, manager, None, telemetry_producer, telemetry_consumer.get_telemetry_init_consumer(), apis['telemetry'], preforked_initialization=preforked_initialization)
425429

426430
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer")
427431
initialization_thread.setDaemon(True)
@@ -444,6 +448,11 @@ def _build_redis_factory(api_key, cfg):
444448
'impressions': RedisImpressionsStorage(redis_adapter, sdk_metadata),
445449
'events': RedisEventsStorage(redis_adapter, sdk_metadata),
446450
}
451+
telemetry_storage = InMemoryTelemetryStorage()
452+
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
453+
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
454+
telemetry_runtime_producer=telemetry_producer.get_telemetry_runtime_producer()
455+
447456
data_sampling = cfg.get('dataSampling', DEFAULT_DATA_SAMPLING)
448457
if data_sampling < _MIN_DEFAULT_DATA_SAMPLING_ALLOWED:
449458
_LOGGER.warning("dataSampling cannot be less than %.2f, defaulting to minimum",
@@ -455,17 +464,21 @@ def _build_redis_factory(api_key, cfg):
455464
imp_strategy = set_classes('REDIS', cfg['impressionsMode'], redis_adapter)
456465

457466
imp_manager = ImpressionsManager(
467+
imp_strategy,
468+
telemetry_runtime_producer,
458469
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
459-
imp_strategy)
470+
)
460471

461472
synchronizers = SplitSynchronizers(None, None, None, None,
462473
impressions_count_sync,
474+
None,
463475
unique_keys_synchronizer,
464476
clear_filter_sync
465477
)
466478

467479
tasks = SplitTasks(None, None, None, None,
468480
impressions_count_task,
481+
None,
469482
unique_keys_task,
470483
clear_filter_task
471484
)
@@ -490,9 +503,11 @@ def _build_redis_factory(api_key, cfg):
490503
cfg['labelsEnabled'],
491504
recorder,
492505
manager,
506+
sdk_ready_flag=None,
507+
telemetry_producer=telemetry_producer,
508+
telemetry_init_consumer=telemetry_consumer.get_telemetry_init_consumer()
493509
)
494510

495-
496511
def _build_localhost_factory(cfg):
497512
"""Build and return a localhost factory for testing/development purposes."""
498513
storages = {
@@ -520,7 +535,7 @@ def _build_localhost_factory(cfg):
520535
manager = Manager(ready_event, synchronizer, None, False, sdk_metadata)
521536
manager.start()
522537
recorder = StandardRecorder(
523-
ImpressionsManager(None, StrategyDebugMode()),
538+
ImpressionsManager(StrategyDebugMode()),
524539
storages['events'],
525540
storages['impressions'],
526541
)
@@ -530,7 +545,7 @@ def _build_localhost_factory(cfg):
530545
False,
531546
recorder,
532547
manager,
533-
ready_event
548+
ready_event,
534549
)
535550

536551
def get_factory(api_key, **kwargs):

0 commit comments

Comments
 (0)