Skip to content

Commit 653b722

Browse files
committed
Fixed streaming event data and polishing
1 parent a15e887 commit 653b722

File tree

19 files changed

+103
-79
lines changed

19 files changed

+103
-79
lines changed

splitio/api/auth.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@
22

33
import logging
44
import json
5-
import time
65

76
from splitio.api import APIException
8-
from splitio.api.commons import headers_from_metadata, record_telemetry
7+
from splitio.api.commons import headers_from_metadata, record_telemetry, get_current_epoch_time
98
from splitio.api.client import HttpClientException
109
from splitio.models.token import from_raw
1110
from splitio.models.telemetry import TOKEN
@@ -39,20 +38,20 @@ def authenticate(self):
3938
:return: Json representation of an authentication.
4039
:rtype: splitio.models.token.Token
4140
"""
42-
start = int(round(time.time() * 1000))
41+
start = get_current_epoch_time()
4342
try:
4443
response = self._client.get(
4544
'auth',
4645
'/v2/auth',
4746
self._apikey,
4847
extra_headers=self._metadata,
4948
)
50-
record_telemetry(response.status_code, int(round(time.time() * 1000)) - start, TOKEN, self._telemetry_runtime_producer)
49+
record_telemetry(response.status_code, get_current_epoch_time() - start, TOKEN, self._telemetry_runtime_producer)
5150
if 200 <= response.status_code < 300:
5251
payload = json.loads(response.body)
5352
return from_raw(payload)
5453
else:
55-
if response.status_code == 401:
54+
if (response.status_code >= 400 and response.status_code < 500):
5655
self._telemetry_runtime_producer.record_auth_rejections()
5756
raise APIException(response.body, response.status_code)
5857
except HttpClientException as exc:

splitio/api/client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
from collections import namedtuple
33

44
import requests
5+
import logging
6+
_LOGGER = logging.getLogger(__name__)
57

68
HttpResponse = namedtuple('HttpResponse', ['status_code', 'body'])
79

splitio/api/commons.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def record_telemetry(status_code, elapsed, metric_name, telemetry_runtime_produc
5050
"""
5151
telemetry_runtime_producer.record_sync_latency(metric_name, elapsed)
5252
if 200 <= status_code < 300:
53-
telemetry_runtime_producer.record_suceessful_sync(metric_name, int(round(time.time() * 1000)))
53+
telemetry_runtime_producer.record_successful_sync(metric_name, get_current_epoch_time())
5454
else:
5555
telemetry_runtime_producer.record_sync_error(metric_name, status_code)
5656

@@ -114,3 +114,12 @@ def build_fetch(change_number, fetch_options, metadata):
114114
if fetch_options.change_number is not None:
115115
query['till'] = fetch_options.change_number
116116
return query, extra_headers
117+
118+
def get_current_epoch_time():
119+
"""
120+
Get current epoch time in milliseconds
121+
122+
:return: epoch time
123+
:rtype: int
124+
"""
125+
return int(round(time.time() * 1000))

splitio/api/events.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from splitio.api import APIException
66
from splitio.api.client import HttpClientException
7-
from splitio.api.commons import headers_from_metadata, record_telemetry
7+
from splitio.api.commons import headers_from_metadata, record_telemetry, get_current_epoch_time
88
from splitio.models.telemetry import EVENT
99

1010

@@ -64,7 +64,7 @@ def flush_events(self, events):
6464
:rtype: bool
6565
"""
6666
bulk = self._build_bulk(events)
67-
start = int(round(time.time() * 1000))
67+
start = get_current_epoch_time()
6868
try:
6969
response = self._client.post(
7070
'events',
@@ -73,7 +73,7 @@ def flush_events(self, events):
7373
body=bulk,
7474
extra_headers=self._metadata,
7575
)
76-
record_telemetry(response.status_code, int(round(time.time() * 1000)) - start, EVENT, self._telemetry_runtime_producer)
76+
record_telemetry(response.status_code, get_current_epoch_time() - start, EVENT, self._telemetry_runtime_producer)
7777
if not 200 <= response.status_code < 300:
7878
raise APIException(response.body, response.status_code)
7979
except HttpClientException as exc:

splitio/api/impressions.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22

33
import logging
44
from itertools import groupby
5-
import time
65

76
from splitio.api import APIException
87
from splitio.api.client import HttpClientException
9-
from splitio.api.commons import headers_from_metadata, record_telemetry
8+
from splitio.api.commons import headers_from_metadata, record_telemetry, get_current_epoch_time
109
from splitio.engine.impressions import ImpressionsMode
1110
from splitio.models.telemetry import IMPRESSION, IMPRESSION_COUNT
1211

@@ -94,7 +93,7 @@ def flush_impressions(self, impressions):
9493
:type impressions: list
9594
"""
9695
bulk = self._build_bulk(impressions)
97-
start = int(round(time.time() * 1000))
96+
start = get_current_epoch_time()
9897
try:
9998
response = self._client.post(
10099
'events',
@@ -103,7 +102,7 @@ def flush_impressions(self, impressions):
103102
body=bulk,
104103
extra_headers=self._metadata,
105104
)
106-
record_telemetry(response.status_code, int(round(time.time() * 1000)) - start, IMPRESSION, self._telemetry_runtime_producer)
105+
record_telemetry(response.status_code, get_current_epoch_time() - start, IMPRESSION, self._telemetry_runtime_producer)
107106
if not 200 <= response.status_code < 300:
108107
raise APIException(response.body, response.status_code)
109108
except HttpClientException as exc:
@@ -121,7 +120,7 @@ def flush_counters(self, counters):
121120
:type impressions: list
122121
"""
123122
bulk = self._build_counters(counters)
124-
start = int(round(time.time() * 1000))
123+
start = get_current_epoch_time()
125124
try:
126125
response = self._client.post(
127126
'events',
@@ -130,7 +129,7 @@ def flush_counters(self, counters):
130129
body=bulk,
131130
extra_headers=self._metadata,
132131
)
133-
record_telemetry(response.status_code, int(round(time.time() * 1000)) - start, IMPRESSION_COUNT, self._telemetry_runtime_producer)
132+
record_telemetry(response.status_code, get_current_epoch_time() - start, IMPRESSION_COUNT, self._telemetry_runtime_producer)
134133
if not 200 <= response.status_code < 300:
135134
raise APIException(response.body, response.status_code)
136135
except HttpClientException as exc:

splitio/api/segments.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import time
66

77
from splitio.api import APIException
8-
from splitio.api.commons import headers_from_metadata, build_fetch, record_telemetry
8+
from splitio.api.commons import headers_from_metadata, build_fetch, record_telemetry, get_current_epoch_time
99
from splitio.api.client import HttpClientException
1010
from splitio.models.telemetry import SEGMENT
1111

@@ -49,7 +49,7 @@ def fetch_segment(self, segment_name, change_number, fetch_options):
4949
:return: Json representation of a segmentChange response.
5050
:rtype: dict
5151
"""
52-
start = int(round(time.time() * 1000))
52+
start = get_current_epoch_time()
5353
try:
5454
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata)
5555
response = self._client.get(
@@ -59,7 +59,7 @@ def fetch_segment(self, segment_name, change_number, fetch_options):
5959
extra_headers=extra_headers,
6060
query=query,
6161
)
62-
record_telemetry(response.status_code, int(round(time.time() * 1000)) - start, SEGMENT, self._telemetry_runtime_producer)
62+
record_telemetry(response.status_code, get_current_epoch_time() - start, SEGMENT, self._telemetry_runtime_producer)
6363
if 200 <= response.status_code < 300:
6464
return json.loads(response.body)
6565
else:

splitio/api/splits.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import time
66

77
from splitio.api import APIException
8-
from splitio.api.commons import headers_from_metadata, build_fetch, record_telemetry
8+
from splitio.api.commons import headers_from_metadata, build_fetch, record_telemetry, get_current_epoch_time
99
from splitio.api.client import HttpClientException
1010
from splitio.models.telemetry import SPLIT
1111

@@ -44,7 +44,7 @@ def fetch_splits(self, change_number, fetch_options):
4444
:return: Json representation of a splitChanges response.
4545
:rtype: dict
4646
"""
47-
start = int(round(time.time() * 1000))
47+
start = get_current_epoch_time()
4848
try:
4949
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata)
5050
response = self._client.get(
@@ -54,7 +54,7 @@ def fetch_splits(self, change_number, fetch_options):
5454
extra_headers=extra_headers,
5555
query=query,
5656
)
57-
record_telemetry(response.status_code, int(round(time.time() * 1000)) - start, SPLIT, self._telemetry_runtime_producer)
57+
record_telemetry(response.status_code, get_current_epoch_time() - start, SPLIT, self._telemetry_runtime_producer)
5858
if 200 <= response.status_code < 300:
5959
return json.loads(response.body)
6060
else:

splitio/api/telemetry.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from splitio.api import APIException
66
from splitio.api.client import HttpClientException
7-
from splitio.api.commons import headers_from_metadata, record_telemetry
7+
from splitio.api.commons import headers_from_metadata, record_telemetry, get_current_epoch_time
88
from splitio.models.telemetry import TELEMETRY
99

1010
_LOGGER = logging.getLogger(__name__)
@@ -33,7 +33,7 @@ def record_unique_keys(self, uniques):
3333
:param uniques: Unique Keys
3434
:type json
3535
"""
36-
start = int(round(time.time() * 1000))
36+
start = get_current_epoch_time()
3737
try:
3838
response = self._client.post(
3939
'telemetry',
@@ -42,7 +42,7 @@ def record_unique_keys(self, uniques):
4242
body=uniques,
4343
extra_headers=self._metadata
4444
)
45-
record_telemetry(response.status_code, int(round(time.time() * 1000)) - start, TELEMETRY, self._telemetry_runtime_producer)
45+
record_telemetry(response.status_code, get_current_epoch_time() - start, TELEMETRY, self._telemetry_runtime_producer)
4646
if not 200 <= response.status_code < 300:
4747
raise APIException(response.body, response.status_code)
4848
except HttpClientException as exc:
@@ -59,7 +59,7 @@ def record_init(self, configs):
5959
:param configs: configs
6060
:type json
6161
"""
62-
start = int(round(time.time() * 1000))
62+
start = get_current_epoch_time()
6363
try:
6464
response = self._client.post(
6565
'telemetry',
@@ -68,7 +68,7 @@ def record_init(self, configs):
6868
body=configs,
6969
extra_headers=self._metadata,
7070
)
71-
record_telemetry(response.status_code, int(round(time.time() * 1000)) - start, TELEMETRY, self._telemetry_runtime_producer)
71+
record_telemetry(response.status_code, get_current_epoch_time() - start, TELEMETRY, self._telemetry_runtime_producer)
7272
if not 200 <= response.status_code < 300:
7373
raise APIException(response.body, response.status_code)
7474
except HttpClientException as exc:
@@ -85,7 +85,7 @@ def record_stats(self, stats):
8585
:param stats: stats
8686
:type json
8787
"""
88-
start = int(round(time.time() * 1000))
88+
start = get_current_epoch_time()
8989
try:
9090
response = self._client.post(
9191
'telemetry',
@@ -94,7 +94,7 @@ def record_stats(self, stats):
9494
body=stats,
9595
extra_headers=self._metadata,
9696
)
97-
record_telemetry(response.status_code, int(round(time.time() * 1000)) - start, TELEMETRY, self._telemetry_runtime_producer)
97+
record_telemetry(response.status_code, get_current_epoch_time() - start, TELEMETRY, self._telemetry_runtime_producer)
9898
if not 200 <= response.status_code < 300:
9999
raise APIException(response.body, response.status_code)
100100
except HttpClientException as exc:

splitio/client/client.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from splitio.models.telemetry import get_latency_bucket_index, TRACK
99
from splitio.client import input_validator
1010
from splitio.util import utctime_ms
11-
11+
from splitio.api.commons import get_current_epoch_time
1212

1313
_LOGGER = logging.getLogger(__name__)
1414

@@ -93,7 +93,7 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
9393
_LOGGER.error("Client is not ready - no calls possible")
9494
return CONTROL, None
9595

96-
start = int(round(time.time() * 1000))
96+
start = get_current_epoch_time()
9797

9898
matching_key, bucketing_key = input_validator.validate_key(key, method_name)
9999
feature = input_validator.validate_feature_name(
@@ -149,7 +149,7 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
149149
_LOGGER.error("Client is not ready - no calls possible")
150150
return input_validator.generate_control_treatments(features, method_name)
151151

152-
start = int(round(time.time() * 1000))
152+
start = get_current_epoch_time()
153153

154154
matching_key, bucketing_key = input_validator.validate_key(key, method_name)
155155
if matching_key is None and bucketing_key is None:
@@ -209,7 +209,7 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
209209
_LOGGER.debug('Error: ', exc_info=True)
210210
self._telemetry_evaluation_producer.record_exception(method_name[4:])
211211

212-
self._telemetry_evaluation_producer.record_latency(method_name[4:], int(round(time.time() * 1000)) - start)
212+
self._telemetry_evaluation_producer.record_latency(method_name[4:], get_current_epoch_time() - start)
213213
return treatments
214214
except Exception: # pylint: disable=broad-except
215215
self._telemetry_evaluation_producer.record_exception(method_name)
@@ -347,7 +347,7 @@ def _record_stats(self, impressions, start, operation, method_name=None):
347347
:param operation: operation performed.
348348
:type operation: str
349349
"""
350-
end = int(round(time.time() * 1000))
350+
end = get_current_epoch_time()
351351
self._recorder.record_treatment_stats(impressions, get_latency_bucket_index(end - start),
352352
operation)
353353
if not method_name == None:
@@ -382,7 +382,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
382382
_LOGGER.warn("track: the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method")
383383
self._telemetry_init_producer.record_not_ready_usage()
384384

385-
start = int(round(time.time() * 1000))
385+
start = get_current_epoch_time()
386386
key = input_validator.validate_track_key(key)
387387
event_type = input_validator.validate_event_type(event_type)
388388
should_validate_existance = self.ready and self._factory._apikey != 'localhost' # pylint: disable=protected-access
@@ -412,7 +412,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
412412
event=event,
413413
size=size,
414414
)])
415-
self._telemetry_evaluation_producer.record_latency(TRACK, int(round(time.time() * 1000)) - start)
415+
self._telemetry_evaluation_producer.record_latency(TRACK, get_current_epoch_time() - start)
416416
if not return_flag:
417417
self._telemetry_evaluation_producer.record_exception(TRACK)
418418

splitio/client/factory.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import logging
33
import threading
44
from collections import Counter
5-
import time
65

76
from enum import Enum
87

@@ -34,6 +33,7 @@
3433
from splitio.api.events import EventsAPI
3534
from splitio.api.auth import AuthAPI
3635
from splitio.api.telemetry import TelemetryAPI
36+
from splitio.api.commons import get_current_epoch_time
3737

3838
# Tasks
3939
from splitio.tasks.split_sync import SplitSynchronizationTask
@@ -128,7 +128,7 @@ def __init__( # pylint: disable=too-many-arguments
128128
self._telemetry_init_producer = telemetry_producer.get_telemetry_init_producer()
129129
self._telemetry_init_consumer = telemetry_init_consumer
130130
self._telemetry_api = telemetry_api
131-
self._ready_time = int(round(time.time() * 1000))
131+
self._ready_time = get_current_epoch_time()
132132
self._start_status_updater()
133133

134134
def _start_status_updater(self):
@@ -156,7 +156,7 @@ def _update_status_when_ready(self):
156156
self._sdk_internal_ready_flag.wait()
157157
self._status = Status.READY
158158
self._sdk_ready_flag.set()
159-
self._telemetry_init_producer.record_ready_time(int(round(time.time() * 1000)) - self._ready_time)
159+
self._telemetry_init_producer.record_ready_time(get_current_epoch_time() - self._ready_time)
160160
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
161161
self._telemetry_init_producer.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
162162

0 commit comments

Comments
 (0)