Skip to content

Commit 03fe538

Browse files
committed
Updating redis-telemetry PR
1 parent 08d49b2 commit 03fe538

File tree

24 files changed

+79
-115
lines changed

24 files changed

+79
-115
lines changed

splitio/api/auth.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from splitio.api import APIException
77
from splitio.api.commons import headers_from_metadata, record_telemetry
8-
from splitio.util.time import get_current_epoch_time
8+
from splitio.util.time import get_current_epoch_time_ms
99
from splitio.api.client import HttpClientException
1010
from splitio.models.token import from_raw
1111
from splitio.models.telemetry import HTTPExceptionsAndLatencies
@@ -39,15 +39,15 @@ def authenticate(self):
3939
:return: Json representation of an authentication.
4040
:rtype: splitio.models.token.Token
4141
"""
42-
start = get_current_epoch_time()
42+
start = get_current_epoch_time_ms()
4343
try:
4444
response = self._client.get(
4545
'auth',
4646
'/v2/auth',
4747
self._apikey,
4848
extra_headers=self._metadata,
4949
)
50-
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.TOKEN, self._telemetry_runtime_producer)
50+
record_telemetry(response.status_code, get_current_epoch_time_ms() - start, HTTPExceptionsAndLatencies.TOKEN, self._telemetry_runtime_producer)
5151
if 200 <= response.status_code < 300:
5252
payload = json.loads(response.body)
5353
return from_raw(payload)

splitio/api/commons.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""Commons module."""
2-
from splitio.util.time import get_current_epoch_time
2+
from splitio.util.time import get_current_epoch_time_ms
33

44
_CACHE_CONTROL = 'Cache-Control'
55
_CACHE_CONTROL_NO_CACHE = 'no-cache'
@@ -50,7 +50,7 @@ def record_telemetry(status_code, elapsed, metric_name, telemetry_runtime_produc
5050
"""
5151
telemetry_runtime_producer.record_sync_latency(metric_name, elapsed)
5252
if 200 <= status_code < 300:
53-
telemetry_runtime_producer.record_successful_sync(metric_name, get_current_epoch_time())
53+
telemetry_runtime_producer.record_successful_sync(metric_name, get_current_epoch_time_ms())
5454
return
5555
telemetry_runtime_producer.record_sync_error(metric_name, status_code)
5656

splitio/api/events.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from splitio.api import APIException
66
from splitio.api.client import HttpClientException
77
from splitio.api.commons import headers_from_metadata, record_telemetry
8-
from splitio.util.time import get_current_epoch_time
8+
from splitio.util.time import get_current_epoch_time_ms
99
from splitio.models.telemetry import HTTPExceptionsAndLatencies
1010

1111

@@ -65,7 +65,7 @@ def flush_events(self, events):
6565
:rtype: bool
6666
"""
6767
bulk = self._build_bulk(events)
68-
start = get_current_epoch_time()
68+
start = get_current_epoch_time_ms()
6969
try:
7070
response = self._client.post(
7171
'events',
@@ -74,7 +74,7 @@ def flush_events(self, events):
7474
body=bulk,
7575
extra_headers=self._metadata,
7676
)
77-
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.EVENT, self._telemetry_runtime_producer)
77+
record_telemetry(response.status_code, get_current_epoch_time_ms() - start, HTTPExceptionsAndLatencies.EVENT, self._telemetry_runtime_producer)
7878
if not 200 <= response.status_code < 300:
7979
raise APIException(response.body, response.status_code)
8080
except HttpClientException as exc:

splitio/api/impressions.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from splitio.api import APIException
77
from splitio.api.client import HttpClientException
88
from splitio.api.commons import headers_from_metadata, record_telemetry
9-
from splitio.util.time import get_current_epoch_time
9+
from splitio.util.time import get_current_epoch_time_ms
1010
from splitio.engine.impressions import ImpressionsMode
1111
from splitio.models.telemetry import HTTPExceptionsAndLatencies
1212

@@ -94,7 +94,7 @@ def flush_impressions(self, impressions):
9494
:type impressions: list
9595
"""
9696
bulk = self._build_bulk(impressions)
97-
start = get_current_epoch_time()
97+
start = get_current_epoch_time_ms()
9898
try:
9999
response = self._client.post(
100100
'events',
@@ -103,7 +103,7 @@ def flush_impressions(self, impressions):
103103
body=bulk,
104104
extra_headers=self._metadata,
105105
)
106-
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.IMPRESSION, self._telemetry_runtime_producer)
106+
record_telemetry(response.status_code, get_current_epoch_time_ms() - start, HTTPExceptionsAndLatencies.IMPRESSION, self._telemetry_runtime_producer)
107107
if not 200 <= response.status_code < 300:
108108
raise APIException(response.body, response.status_code)
109109
except HttpClientException as exc:
@@ -121,7 +121,7 @@ def flush_counters(self, counters):
121121
:type impressions: list
122122
"""
123123
bulk = self._build_counters(counters)
124-
start = get_current_epoch_time()
124+
start = get_current_epoch_time_ms()
125125
try:
126126
response = self._client.post(
127127
'events',
@@ -130,7 +130,7 @@ def flush_counters(self, counters):
130130
body=bulk,
131131
extra_headers=self._metadata,
132132
)
133-
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.IMPRESSION_COUNT, self._telemetry_runtime_producer)
133+
record_telemetry(response.status_code, get_current_epoch_time_ms() - start, HTTPExceptionsAndLatencies.IMPRESSION_COUNT, self._telemetry_runtime_producer)
134134
if not 200 <= response.status_code < 300:
135135
raise APIException(response.body, response.status_code)
136136
except HttpClientException as exc:

splitio/api/segments.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from splitio.api import APIException
88
from splitio.api.commons import headers_from_metadata, build_fetch, record_telemetry
9-
from splitio.util.time import get_current_epoch_time
9+
from splitio.util.time import get_current_epoch_time_ms
1010
from splitio.api.client import HttpClientException
1111
from splitio.models.telemetry import HTTPExceptionsAndLatencies
1212

@@ -50,7 +50,7 @@ def fetch_segment(self, segment_name, change_number, fetch_options):
5050
:return: Json representation of a segmentChange response.
5151
:rtype: dict
5252
"""
53-
start = get_current_epoch_time()
53+
start = get_current_epoch_time_ms()
5454
try:
5555
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata)
5656
response = self._client.get(
@@ -60,7 +60,7 @@ def fetch_segment(self, segment_name, change_number, fetch_options):
6060
extra_headers=extra_headers,
6161
query=query,
6262
)
63-
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.SEGMENT, self._telemetry_runtime_producer)
63+
record_telemetry(response.status_code, get_current_epoch_time_ms() - start, HTTPExceptionsAndLatencies.SEGMENT, self._telemetry_runtime_producer)
6464
if 200 <= response.status_code < 300:
6565
return json.loads(response.body)
6666
else:

splitio/api/splits.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from splitio.api import APIException
88
from splitio.api.commons import headers_from_metadata, build_fetch, record_telemetry
9-
from splitio.util.time import get_current_epoch_time
9+
from splitio.util.time import get_current_epoch_time_ms
1010
from splitio.api.client import HttpClientException
1111
from splitio.models.telemetry import HTTPExceptionsAndLatencies
1212

@@ -45,7 +45,7 @@ def fetch_splits(self, change_number, fetch_options):
4545
:return: Json representation of a splitChanges response.
4646
:rtype: dict
4747
"""
48-
start = get_current_epoch_time()
48+
start = get_current_epoch_time_ms()
4949
try:
5050
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata)
5151
response = self._client.get(
@@ -55,7 +55,7 @@ def fetch_splits(self, change_number, fetch_options):
5555
extra_headers=extra_headers,
5656
query=query,
5757
)
58-
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.SPLIT, self._telemetry_runtime_producer)
58+
record_telemetry(response.status_code, get_current_epoch_time_ms() - start, HTTPExceptionsAndLatencies.SPLIT, self._telemetry_runtime_producer)
5959
if 200 <= response.status_code < 300:
6060
return json.loads(response.body)
6161
else:

splitio/api/telemetry.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from splitio.api import APIException
66
from splitio.api.client import HttpClientException
77
from splitio.api.commons import headers_from_metadata, record_telemetry
8-
from splitio.util.time import get_current_epoch_time
8+
from splitio.util.time import get_current_epoch_time_ms
99
from splitio.models.telemetry import HTTPExceptionsAndLatencies
1010

1111
_LOGGER = logging.getLogger(__name__)
@@ -34,7 +34,7 @@ def record_unique_keys(self, uniques):
3434
:param uniques: Unique Keys
3535
:type json
3636
"""
37-
start = get_current_epoch_time()
37+
start = get_current_epoch_time_ms()
3838
try:
3939
response = self._client.post(
4040
'telemetry',
@@ -43,7 +43,7 @@ def record_unique_keys(self, uniques):
4343
body=uniques,
4444
extra_headers=self._metadata
4545
)
46-
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.TELEMETRY, self._telemetry_runtime_producer)
46+
record_telemetry(response.status_code, get_current_epoch_time_ms() - start, HTTPExceptionsAndLatencies.TELEMETRY, self._telemetry_runtime_producer)
4747
if not 200 <= response.status_code < 300:
4848
raise APIException(response.body, response.status_code)
4949
except HttpClientException as exc:
@@ -60,7 +60,7 @@ def record_init(self, configs):
6060
:param configs: configs
6161
:type json
6262
"""
63-
start = get_current_epoch_time()
63+
start = get_current_epoch_time_ms()
6464
try:
6565
response = self._client.post(
6666
'telemetry',
@@ -69,7 +69,7 @@ def record_init(self, configs):
6969
body=configs,
7070
extra_headers=self._metadata,
7171
)
72-
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.TELEMETRY, self._telemetry_runtime_producer)
72+
record_telemetry(response.status_code, get_current_epoch_time_ms() - start, HTTPExceptionsAndLatencies.TELEMETRY, self._telemetry_runtime_producer)
7373
if not 200 <= response.status_code < 300:
7474
raise APIException(response.body, response.status_code)
7575
except HttpClientException as exc:
@@ -86,7 +86,7 @@ def record_stats(self, stats):
8686
:param stats: stats
8787
:type json
8888
"""
89-
start = get_current_epoch_time()
89+
start = get_current_epoch_time_ms()
9090
try:
9191
response = self._client.post(
9292
'telemetry',
@@ -95,7 +95,7 @@ def record_stats(self, stats):
9595
body=stats,
9696
extra_headers=self._metadata,
9797
)
98-
record_telemetry(response.status_code, get_current_epoch_time() - start, HTTPExceptionsAndLatencies.TELEMETRY, self._telemetry_runtime_producer)
98+
record_telemetry(response.status_code, get_current_epoch_time_ms() - start, HTTPExceptionsAndLatencies.TELEMETRY, self._telemetry_runtime_producer)
9999
if not 200 <= response.status_code < 300:
100100
raise APIException(response.body, response.status_code)
101101
except HttpClientException as exc:

splitio/client/client.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from splitio.models.events import Event, EventWrapper
88
from splitio.models.telemetry import get_latency_bucket_index, MethodExceptionsAndLatencies
99
from splitio.client import input_validator
10-
from splitio.util.time import get_current_epoch_time, utctime_ms
10+
from splitio.util.time import get_current_epoch_time_ms, utctime_ms
1111

1212
_LOGGER = logging.getLogger(__name__)
1313

@@ -92,7 +92,7 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
9292
_LOGGER.error("Client is not ready - no calls possible")
9393
return CONTROL, None
9494

95-
start = get_current_epoch_time()
95+
start = get_current_epoch_time_ms()
9696

9797
matching_key, bucketing_key = input_validator.validate_key(key, method_name)
9898
feature = input_validator.validate_feature_name(
@@ -148,7 +148,7 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
148148
_LOGGER.error("Client is not ready - no calls possible")
149149
return input_validator.generate_control_treatments(features, method_name)
150150

151-
start = get_current_epoch_time()
151+
start = get_current_epoch_time_ms()
152152

153153
matching_key, bucketing_key = input_validator.validate_key(key, method_name)
154154
if matching_key is None and bucketing_key is None:
@@ -208,7 +208,7 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
208208
_LOGGER.debug('Error: ', exc_info=True)
209209
self._telemetry_evaluation_producer.record_exception(self._get_method_constant(method_name[4:]))
210210

211-
self._telemetry_evaluation_producer.record_latency(self._get_method_constant(method_name[4:]), get_current_epoch_time() - start)
211+
self._telemetry_evaluation_producer.record_latency(self._get_method_constant(method_name[4:]), get_current_epoch_time_ms() - start)
212212
return treatments
213213
except Exception: # pylint: disable=broad-except
214214
self._telemetry_evaluation_producer.record_exception(self._get_method_constant(method_name[4:]))
@@ -346,7 +346,7 @@ def _record_stats(self, impressions, start, operation, method_name=None):
346346
:param operation: operation performed.
347347
:type operation: str
348348
"""
349-
end = get_current_epoch_time()
349+
end = get_current_epoch_time_ms()
350350
self._recorder.record_treatment_stats(impressions, get_latency_bucket_index(end - start),
351351
operation)
352352
if method_name is not None:
@@ -381,7 +381,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
381381
_LOGGER.warn("track: the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method")
382382
self._telemetry_init_producer.record_not_ready_usage()
383383

384-
start = get_current_epoch_time()
384+
start = get_current_epoch_time_ms()
385385
key = input_validator.validate_track_key(key)
386386
event_type = input_validator.validate_event_type(event_type)
387387
should_validate_existance = self.ready and self._factory._apikey != 'localhost' # pylint: disable=protected-access
@@ -412,7 +412,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
412412
event=event,
413413
size=size,
414414
)])
415-
self._telemetry_evaluation_producer.record_latency(MethodExceptionsAndLatencies.TRACK, get_current_epoch_time() - start)
415+
self._telemetry_evaluation_producer.record_latency(MethodExceptionsAndLatencies.TRACK, get_current_epoch_time_ms() - start)
416416
return return_flag
417417
except Exception: # pylint: disable=broad-except
418418
self._telemetry_evaluation_producer.record_exception(MethodExceptionsAndLatencies.TRACK)

splitio/client/factory.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from splitio.api.events import EventsAPI
3434
from splitio.api.auth import AuthAPI
3535
from splitio.api.telemetry import TelemetryAPI, LocalhostTelemetryAPI
36-
from splitio.util.time import get_current_epoch_time
36+
from splitio.util.time import get_current_epoch_time_ms
3737

3838
# Tasks
3939
from splitio.tasks.split_sync import SplitSynchronizationTask
@@ -130,7 +130,7 @@ def __init__( # pylint: disable=too-many-arguments
130130
self._telemetry_init_producer = telemetry_producer.get_telemetry_init_producer()
131131
self._telemetry_init_consumer = telemetry_init_consumer
132132
self._telemetry_api = telemetry_api
133-
self._ready_time = get_current_epoch_time()
133+
self._ready_time = get_current_epoch_time_ms()
134134
self._start_status_updater()
135135

136136
def _start_status_updater(self):
@@ -157,22 +157,21 @@ def _start_status_updater(self):
157157
ready_updater.setDaemon(True)
158158
ready_updater.start()
159159

160-
161160
def _update_redis_telemetry_config(self):
162161
"""Push Config Telemetry into storage."""
163-
self._telemetry_init_producer.record_ready_time(get_current_epoch_time() - self._ready_time)
162+
self._telemetry_init_producer.record_ready_time(get_current_epoch_time_ms() - self._ready_time)
164163
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
165164
self._telemetry_init_producer.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
166165
config_post_thread = threading.Thread(target=self._telemetry_api.record_init(self._telemetry_init_consumer.get_config_stats()), name="PostConfigData")
167166
config_post_thread.setDaemon(True)
168-
config_post_thread.start()
167+
config_post_thread.start()
169168

170169
def _update_status_when_ready(self):
171170
"""Wait until the sdk is ready and update the status."""
172171
self._sdk_internal_ready_flag.wait()
173172
self._status = Status.READY
174173
self._sdk_ready_flag.set()
175-
self._telemetry_init_producer.record_ready_time(get_current_epoch_time() - self._ready_time)
174+
self._telemetry_init_producer.record_ready_time(get_current_epoch_time_ms() - self._ready_time)
176175
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
177176
self._telemetry_init_producer.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
178177

@@ -463,7 +462,7 @@ def _build_redis_factory(api_key, cfg):
463462
telemetry_storage = InMemoryTelemetryStorage()
464463
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
465464
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
466-
telemetry_runtime_producer=telemetry_producer.get_telemetry_runtime_producer()
465+
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
467466

468467
data_sampling = cfg.get('dataSampling', DEFAULT_DATA_SAMPLING)
469468
if data_sampling < _MIN_DEFAULT_DATA_SAMPLING_ALLOWED:
@@ -508,7 +507,7 @@ def _build_redis_factory(api_key, cfg):
508507
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer")
509508
initialization_thread.setDaemon(True)
510509
initialization_thread.start()
511-
510+
512511
telemetry_producer.get_telemetry_init_producer().record_config(cfg, {})
513512

514513
return SplitFactory(
@@ -618,7 +617,7 @@ def _get_active_and_redundant_count():
618617
active_factory_count = 0
619618
_INSTANTIATED_FACTORIES_LOCK.acquire()
620619
for item in _INSTANTIATED_FACTORIES:
621-
redundant_factory_count = redundant_factory_count + _INSTANTIATED_FACTORIES[item] - 1
622-
active_factory_count = active_factory_count + _INSTANTIATED_FACTORIES[item]
620+
redundant_factory_count += _INSTANTIATED_FACTORIES[item] - 1
621+
active_factory_count += _INSTANTIATED_FACTORIES[item]
623622
_INSTANTIATED_FACTORIES_LOCK.release()
624623
return redundant_factory_count, active_factory_count

splitio/engine/impressions/impressions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class ImpressionsMode(Enum):
1414
class Manager(object): # pylint:disable=too-few-public-methods
1515
"""Impression manager."""
1616

17-
def __init__(self, strategy, telemetry_runtime_producer=None, listener=None):
17+
def __init__(self, strategy, telemetry_runtime_producer, listener=None):
1818
"""
1919
Construct a manger to track and forward impressions to the queue.
2020

0 commit comments

Comments
 (0)