Skip to content

Commit 8d2e7d0

Browse files
authored
Merge pull request #273 from splitio/telemetry-inmemory
Telemetry inmemory
2 parents f623b2b + 2f7f38c commit 8d2e7d0

File tree

12 files changed

+2594
-5
lines changed

12 files changed

+2594
-5
lines changed

splitio/api/telemetry.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,56 @@ def record_unique_keys(self, uniques):
4141
if not 200 <= response.status_code < 300:
4242
raise APIException(response.body, response.status_code)
4343
except HttpClientException as exc:
44-
_LOGGER.error(
44+
_LOGGER.info(
4545
'Error posting unique keys because an exception was raised by the HTTPClient'
4646
)
4747
_LOGGER.debug('Error: ', exc_info=True)
4848
raise APIException('Unique keys not flushed properly.') from exc
49+
50+
def record_init(self, configs):
51+
"""
52+
Send init config data to the backend.
53+
54+
:param configs: configs
55+
:type json
56+
"""
57+
try:
58+
response = self._client.post(
59+
'telemetry',
60+
'/metrics/config',
61+
self._apikey,
62+
body=configs,
63+
extra_headers=self._metadata
64+
)
65+
if not 200 <= response.status_code < 300:
66+
raise APIException(response.body, response.status_code)
67+
except HttpClientException as exc:
68+
_LOGGER.info(
69+
'Error posting init config because an exception was raised by the HTTPClient'
70+
)
71+
_LOGGER.debug('Error: ', exc_info=True)
72+
raise APIException('Init config data not flushed properly.') from exc
73+
74+
def record_stats(self, stats):
75+
"""
76+
Send runtime stats to the backend.
77+
78+
:param configs: configs
79+
:type json
80+
"""
81+
try:
82+
response = self._client.post(
83+
'metrics',
84+
'/usage',
85+
self._apikey,
86+
body=stats,
87+
extra_headers=self._metadata
88+
)
89+
if not 200 <= response.status_code < 300:
90+
raise APIException(response.body, response.status_code)
91+
except HttpClientException as exc:
92+
_LOGGER.info(
93+
'Error posting runtime stats because an exception was raised by the HTTPClient'
94+
)
95+
_LOGGER.debug('Error: ', exc_info=True)
96+
raise APIException('Runtime stats not flushed properly.') from exc

splitio/client/factory.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,13 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
298298
if not input_validator.validate_factory_instantiation(api_key):
299299
return None
300300

301+
extra_cfg = {}
302+
extra_cfg['sdk_url'] = sdk_url
303+
extra_cfg['events_url'] = events_url
304+
extra_cfg['auth_url'] = auth_api_base_url
305+
extra_cfg['streaming_url'] = streaming_api_base_url
306+
extra_cfg['telemetry_api_url'] = telemetry_api_base_url
307+
301308
http_client = HttpClient(
302309
sdk_url=sdk_url,
303310
events_url=events_url,
@@ -496,7 +503,6 @@ def _build_localhost_factory(cfg):
496503
ready_event
497504
)
498505

499-
500506
def get_factory(api_key, **kwargs):
501507
"""Build and return the appropriate factory."""
502508
try:
@@ -538,3 +544,13 @@ def get_factory(api_key, **kwargs):
538544
finally:
539545
_INSTANTIATED_FACTORIES.update([api_key])
540546
_INSTANTIATED_FACTORIES_LOCK.release()
547+
548+
def _get_active_and_redundant_count():
549+
redundant_factory_count = 0
550+
active_factory_count = 0
551+
_INSTANTIATED_FACTORIES_LOCK.acquire()
552+
for item in _INSTANTIATED_FACTORIES:
553+
redundant_factory_count = redundant_factory_count + _INSTANTIATED_FACTORIES[item] - 1
554+
active_factory_count = active_factory_count + _INSTANTIATED_FACTORIES[item]
555+
_INSTANTIATED_FACTORIES_LOCK.release()
556+
return redundant_factory_count, active_factory_count

splitio/engine/telemetry.py

Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
"""Telemetry engine classes."""
2+
import json
3+
4+
from splitio.storage.inmemmory import InMemoryTelemetryStorage
5+
6+
class TelemetryStorageProducer(object):
7+
"""Telemetry storage producer class."""
8+
9+
def __init__(self, telemetry_storage):
10+
"""Initialize all producer classes."""
11+
self._telemetry_init_producer = TelemetryInitProducer(telemetry_storage)
12+
self._telemetry_evaluation_producer = TelemetryEvaluationProducer(telemetry_storage)
13+
self._telemetry_runtime_producer = TelemetryRuntimeProducer(telemetry_storage)
14+
15+
def get_telemetry_init_producer(self):
16+
"""get init producer instance."""
17+
return self._telemetry_init_producer
18+
19+
def get_telemetry_evaluation_producer(self):
20+
"""get evaluation producer instance."""
21+
return self._telemetry_evaluation_producer
22+
23+
def get_telemetry_runtime_producer(self):
24+
"""get runtime producer instance."""
25+
return self._telemetry_runtime_producer
26+
27+
class TelemetryInitProducer(object):
28+
"""Telemetry init producer class."""
29+
30+
def __init__(self, telemetry_storage):
31+
"""Constructor."""
32+
self._telemetry_storage = telemetry_storage
33+
34+
def record_config(self, config):
35+
"""Record configurations."""
36+
self._telemetry_storage.record_config(config)
37+
38+
def record_ready_time(self, ready_time):
39+
"""Record ready time."""
40+
self._telemetry_storage.record_ready_time(ready_time)
41+
42+
def record_bur_time_out(self):
43+
"""Record block until ready timeout."""
44+
self._telemetry_storage.record_bur_time_out()
45+
46+
def record_not_ready_usage(self):
47+
"""record non-ready usage."""
48+
self._telemetry_storage.record_not_ready_usage()
49+
50+
class TelemetryEvaluationProducer(object):
51+
"""Telemetry evaluation producer class."""
52+
53+
def __init__(self, telemetry_storage):
54+
"""Constructor."""
55+
self._telemetry_storage = telemetry_storage
56+
57+
def record_latency(self, method, latency):
58+
"""Record method latency time."""
59+
self._telemetry_storage.record_latency(method, latency)
60+
61+
def record_exception(self, method):
62+
"""Record method exception time."""
63+
self._telemetry_storage.record_exception(method)
64+
65+
class TelemetryRuntimeProducer(object):
66+
"""Telemetry runtime producer class."""
67+
68+
def __init__(self, telemetry_storage):
69+
"""Constructor."""
70+
self._telemetry_storage = telemetry_storage
71+
72+
def add_tag(self, tag):
73+
"""Record tag string."""
74+
self._telemetry_storage.add_tag(tag)
75+
76+
def record_impression_stats(self, data_type, count):
77+
"""Record impressions stats."""
78+
self._telemetry_storage.record_impression_stats(data_type, count)
79+
80+
def record_event_stats(self, data_type, count):
81+
"""Record events stats."""
82+
self._telemetry_storage.record_event_stats(data_type, count)
83+
84+
def record_suceessful_sync(self, resource, time):
85+
"""Record successful sync."""
86+
self._telemetry_storage.record_suceessful_sync(resource, time)
87+
88+
def record_sync_error(self, resource, status):
89+
"""Record sync error."""
90+
self._telemetry_storage.record_sync_error(resource, status)
91+
92+
def record_sync_latency(self, resource, latency):
93+
"""Record latency time."""
94+
self._telemetry_storage.record_sync_latency(resource, latency)
95+
96+
def record_auth_rejections(self):
97+
"""Record auth rejection."""
98+
self._telemetry_storage.record_auth_rejections()
99+
100+
def record_token_refreshes(self):
101+
"""Record sse token refresh."""
102+
self._telemetry_storage.record_token_refreshes()
103+
104+
def record_streaming_event(self, streaming_event):
105+
"""Record incoming streaming event."""
106+
self._telemetry_storage.record_streaming_event(streaming_event)
107+
108+
def record_session_length(self, session):
109+
"""Record session length."""
110+
self._telemetry_storage.record_session_length(session)
111+
112+
class TelemetryStorageConsumer(object):
113+
"""Telemetry storage consumer class."""
114+
115+
def __init__(self, telemetry_storage):
116+
"""Initialize all consumer classes."""
117+
self._telemetry_init_consumer = TelemetryInitConsumer(telemetry_storage)
118+
self._telemetry_evaluation_consumer = TelemetryEvaluationConsumer(telemetry_storage)
119+
self._telemetry_runtime_consumer = TelemetryRuntimeConsumer(telemetry_storage)
120+
121+
def get_telemetry_init_consumer(self):
122+
"""Get telemetry init instance"""
123+
return self._telemetry_init_consumer
124+
125+
def get_telemetry_evaluation_consumer(self):
126+
"""Get telemetry evaluation instance"""
127+
return self._telemetry_evaluation_consumer
128+
129+
def get_telemetry_runtime_consumer(self):
130+
"""Get telemetry runtime instance"""
131+
return self._telemetry_runtime_consumer
132+
133+
class TelemetryInitConsumer(object):
134+
"""Telemetry init consumer class."""
135+
136+
def __init__(self, telemetry_storage):
137+
"""Constructor."""
138+
self._telemetry_storage = telemetry_storage
139+
140+
def get_bur_time_outs(self):
141+
"""Get block until ready timeout."""
142+
return self._telemetry_storage.get_bur_time_outs()
143+
144+
def get_not_ready_usage(self):
145+
"""Get none-ready usage."""
146+
return self._telemetry_storage.get_not_ready_usage()
147+
148+
def get_config_stats(self):
149+
"""Get none-ready usage."""
150+
return self._telemetry_storage.get_config_stats()
151+
152+
def get_config_stats_to_json(self):
153+
config_stats = self._telemetry_storage.get_config_stats()
154+
return json.dumps({
155+
'oM': config_stats['operationMode'],
156+
'sT': config_stats['storageType'],
157+
'sE': config_stats['streamingEnabled'],
158+
'rR': config_stats['refreshRate'],
159+
'uO': config_stats['urlOverride'],
160+
'iQ': config_stats['impressionsQueueSize'],
161+
'eQ': config_stats['eventsQueueSize'],
162+
'iM': config_stats['impressionsMode'],
163+
'iL': config_stats['impressionListener'],
164+
'hP': config_stats['httpProxy'],
165+
'aF': config_stats['activeFactoryCount'],
166+
'rF': config_stats['redundantFactoryCount'],
167+
'bT': config_stats['blockUntilReadyTimeout'],
168+
'nR': config_stats['notReady'],
169+
'tR': config_stats['timeUntilReady']}
170+
)
171+
172+
class TelemetryEvaluationConsumer(object):
173+
"""Telemetry evaluation consumer class."""
174+
175+
def __init__(self, telemetry_storage):
176+
"""Constructor."""
177+
self._telemetry_storage = telemetry_storage
178+
179+
def pop_exceptions(self):
180+
"""Get and reset method exceptions."""
181+
return self._telemetry_storage.pop_exceptions()
182+
183+
def pop_latencies(self):
184+
"""Get and reset eval latencies."""
185+
return self._telemetry_storage.pop_latencies()
186+
187+
def pop_formatted_stats(self):
188+
"""Get formatted and reset stats."""
189+
exceptions = self.pop_exceptions()['methodExceptions']
190+
latencies = self.pop_latencies()['methodLatencies']
191+
return {
192+
**{'mE': {'t': exceptions['treatment'],
193+
'ts': exceptions['treatments'],
194+
'tc': exceptions['treatmentWithConfig'],
195+
'tcs': exceptions['treatmentsWithConfig'],
196+
'tr': exceptions['track']}
197+
},
198+
**{'mL': {'t': latencies['treatment'],
199+
'ts': latencies['treatments'],
200+
'tc': latencies['treatmentWithConfig'],
201+
'tcs': latencies['treatmentsWithConfig'],
202+
'tr': latencies['track']}
203+
},
204+
}
205+
206+
class TelemetryRuntimeConsumer(object):
207+
"""Telemetry runtime consumer class."""
208+
209+
def __init__(self, telemetry_storage):
210+
"""Constructor."""
211+
self._telemetry_storage = telemetry_storage
212+
213+
def get_impressions_stats(self, type):
214+
"""Get impressions stats"""
215+
return self._telemetry_storage.get_impressions_stats(type)
216+
217+
def get_events_stats(self, type):
218+
"""Get events stats"""
219+
return self._telemetry_storage.get_events_stats(type)
220+
221+
def get_last_synchronization(self):
222+
"""Get last sync"""
223+
return self._telemetry_storage.get_last_synchronization()['lastSynchronizations']
224+
225+
def pop_tags(self):
226+
"""Get and reset http errors."""
227+
return self._telemetry_storage.pop_tags()
228+
229+
def pop_http_errors(self):
230+
"""Get and reset http errors."""
231+
return self._telemetry_storage.pop_http_errors()
232+
233+
def pop_http_latencies(self):
234+
"""Get and reset http latencies."""
235+
return self._telemetry_storage.pop_http_latencies()
236+
237+
def pop_auth_rejections(self):
238+
"""Get and reset auth rejections."""
239+
return self._telemetry_storage.pop_auth_rejections()
240+
241+
def pop_token_refreshes(self):
242+
"""Get and reset token refreshes."""
243+
return self._telemetry_storage.pop_token_refreshes()
244+
245+
def pop_streaming_events(self):
246+
"""Get and reset streaming events."""
247+
return self._telemetry_storage.pop_streaming_events()
248+
249+
def get_session_length(self):
250+
"""Get session length"""
251+
return self._telemetry_storage.get_session_length()
252+
253+
def pop_formatted_stats(self):
254+
"""Get formatted and reset stats."""
255+
last_synchronization = self.get_last_synchronization()
256+
http_errors = self.pop_http_errors()['httpErrors']
257+
http_latencies = self.pop_http_latencies()['httpLatencies']
258+
return {
259+
**{'iQ': self.get_impressions_stats('impressionsQueued')},
260+
**{'iDe': self.get_impressions_stats('impressionsDeduped')},
261+
**{'iDr': self.get_impressions_stats('impressionsDropped')},
262+
**{'eQ': self.get_events_stats('eventsQueued')},
263+
**{'eD': self.get_events_stats('eventsDropped')},
264+
**{'lS': {'sp': last_synchronization['split'],
265+
'se': last_synchronization['segment'],
266+
'im': last_synchronization['impression'],
267+
'ic': last_synchronization['impressionCount'],
268+
'ev': last_synchronization['event'],
269+
'te': last_synchronization['telemetry'],
270+
'to': last_synchronization['token']}
271+
},
272+
**{'t': self.pop_tags()},
273+
**{'hE': {'sp': http_errors['split'],
274+
'se': http_errors['segment'],
275+
'im': http_errors['impression'],
276+
'ic': http_errors['impressionCount'],
277+
'ev': http_errors['event'],
278+
'te': http_errors['telemetry'],
279+
'to': http_errors['token']}
280+
},
281+
**{'hL': {'sp': http_latencies['split'],
282+
'se': http_latencies['segment'],
283+
'im': http_latencies['impression'],
284+
'ic': http_latencies['impressionCount'],
285+
'ev': http_latencies['event'],
286+
'te': http_latencies['telemetry'],
287+
'to': http_latencies['token']}
288+
},
289+
**{'aR': self.pop_auth_rejections()},
290+
**{'tR': self.pop_token_refreshes()},
291+
**{'sE': [{'e': event['type'],
292+
'd': event['data'],
293+
't': event['time']
294+
} for event in self.pop_streaming_events()['streamingEvents']]
295+
},
296+
**{'sL': self.get_session_length()}
297+
}

0 commit comments

Comments
 (0)