Skip to content

Commit c791b5a

Browse files
committed
Added telemetry to iff update
1 parent f779b23 commit c791b5a

File tree

9 files changed

+70
-11
lines changed

9 files changed

+70
-11
lines changed

splitio/engine/telemetry.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ def record_session_length(self, session):
139139
"""Record session length."""
140140
self._telemetry_storage.record_session_length(session)
141141

142+
def record_update_from_sse(self):
143+
"""Record session length."""
144+
self._telemetry_storage.record_update_from_sse()
145+
142146
class TelemetryStorageConsumer(object):
143147
"""Telemetry storage consumer class."""
144148

@@ -271,6 +275,10 @@ def pop_streaming_events(self):
271275
"""Get and reset streaming events."""
272276
return self._telemetry_storage.pop_streaming_events()
273277

278+
def pop_update_from_sse(self):
279+
"""Get and reset update from sse."""
280+
return self._telemetry_storage.pop_update_from_sse()
281+
274282
def get_session_length(self):
275283
"""Get session length"""
276284
return self._telemetry_storage.get_session_length()

splitio/models/telemetry.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,7 @@ def _reset_all(self):
482482
self._auth_rejections = 0
483483
self._token_refreshes = 0
484484
self._session_length = 0
485+
self._update_from_sse = 0
485486

486487
def record_impressions_value(self, resource, value):
487488
"""
@@ -519,6 +520,14 @@ def record_events_value(self, resource, value):
519520
else:
520521
return
521522

523+
def record_update_from_sse(self):
524+
"""
525+
Increament the update from sse resource by one.
526+
527+
"""
528+
with self._lock:
529+
self._update_from_sse += 1
530+
522531
def record_auth_rejections(self):
523532
"""
524533
Increament the auth rejection resource by one.
@@ -604,6 +613,18 @@ def pop_token_refreshes(self):
604613
self._token_refreshes = 0
605614
return token_refreshes
606615

616+
def pop_update_from_sse(self):
617+
"""
618+
Pop update from sse
619+
620+
:return: token refreshes value
621+
:rtype: int
622+
"""
623+
with self._lock:
624+
update_from_sse = self._update_from_sse
625+
self._update_from_sse = 0
626+
return update_from_sse
627+
607628
class StreamingEvent(object):
608629
"""
609630
Streaming event class

splitio/push/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def __init__(self, auth_api, synchronizer, feedback_loop, sdk_metadata, telemetr
4444
"""
4545
self._auth_api = auth_api
4646
self._feedback_loop = feedback_loop
47-
self._processor = MessageProcessor(synchronizer)
47+
self._processor = MessageProcessor(synchronizer, telemetry_runtime_producer)
4848
self._status_tracker = PushStatusTracker(telemetry_runtime_producer)
4949
self._event_handlers = {
5050
EventType.MESSAGE: self._handle_message,

splitio/push/processor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
class MessageProcessor(object):
1010
"""Message processor class."""
1111

12-
def __init__(self, synchronizer):
12+
def __init__(self, synchronizer, telemetry_runtime_producer):
1313
"""
1414
Class constructor.
1515
@@ -19,8 +19,8 @@ def __init__(self, synchronizer):
1919
self._feature_flag_queue = Queue()
2020
self._segments_queue = Queue()
2121
self._synchronizer = synchronizer
22-
self._feature_flag_worker = SplitWorker(synchronizer.synchronize_splits, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage)
23-
self._segments_worker = SegmentWorker(synchronizer.synchronize_segment, self._segments_queue)
22+
self._feature_flag_worker = SplitWorker(synchronizer.synchronize_splits, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, telemetry_runtime_producer)
23+
self._segments_worker = SegmentWorker(synchronizer.synchronize_segment, self._segments_queue, telemetry_runtime_producer)
2424
self._handlers = {
2525
UpdateType.SPLIT_UPDATE: self._handle_feature_flag_update,
2626
UpdateType.SPLIT_KILL: self._handle_feature_flag_kill,

splitio/push/segmentworker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class SegmentWorker(object):
1111

1212
_centinel = object()
1313

14-
def __init__(self, synchronize_segment, segment_queue):
14+
def __init__(self, synchronize_segment, segment_queue, telemetry_runtime_producer):
1515
"""
1616
Class constructor.
1717
@@ -25,6 +25,7 @@ def __init__(self, synchronize_segment, segment_queue):
2525
self._handler = synchronize_segment
2626
self._running = False
2727
self._worker = None
28+
self._telemetry_runtime_producer = telemetry_runtime_producer
2829

2930
def is_running(self):
3031
"""Return whether the working is running."""

splitio/push/splitworker.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class SplitWorker(object):
2424

2525
_centinel = object()
2626

27-
def __init__(self, synchronize_feature_flag, feature_flag_queue, feature_flag_storage):
27+
def __init__(self, synchronize_feature_flag, feature_flag_queue, feature_flag_storage, telemetry_runtime_producer):
2828
"""
2929
Class constructor.
3030
@@ -44,6 +44,7 @@ def __init__(self, synchronize_feature_flag, feature_flag_queue, feature_flag_st
4444
CompressionMode.GZIP_COMPRESSION: lambda event: gzip.decompress(base64.b64decode(event.feature_flag_definition)).decode('utf-8'),
4545
CompressionMode.ZLIB_COMPRESSION: lambda event: zlib.decompress(base64.b64decode(event.feature_flag_definition)).decode('utf-8'),
4646
}
47+
self._telemetry_runtime_producer = telemetry_runtime_producer
4748

4849
def is_running(self):
4950
"""Return whether the working is running."""
@@ -75,6 +76,8 @@ def _run(self):
7576
new_split = from_raw(json.loads(self._get_feature_flag_definition(event)))
7677
if new_split.status == Status.ACTIVE:
7778
self._feature_flag_storage.put(new_split)
79+
self._telemetry_runtime_producer.record_update_from_sse()
80+
_LOGGER.debug('Feature flag %s is updated', new_split.name)
7881
else:
7982
self._feature_flag_storage.remove(new_split.name)
8083
self._feature_flag_storage.set_change_number(event.change_number)

splitio/storage/inmemmory.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,10 @@ def record_session_length(self, session):
563563
"""Record session length."""
564564
self._counters.record_session_length(session)
565565

566+
def record_update_from_sse(self):
567+
"""Record update from sse."""
568+
self._counters.record_update_from_sse()
569+
566570
def get_bur_time_outs(self):
567571
"""Get block until ready timeout."""
568572
return self._tel_config.get_bur_time_outs()
@@ -632,6 +636,10 @@ def get_session_length(self):
632636
"""Get session length"""
633637
return self._counters.get_session_length()
634638

639+
def pop_update_from_sse(self):
640+
"""Get and reset update from sse."""
641+
return self._counters.pop_update_from_sse()
642+
635643
class LocalhostTelemetryStorage():
636644
"""Localhost telemetry storage."""
637645
def do_nothing(*_, **__):

splitio/sync/telemetry.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ def _build_stats(self):
6060
merged_dict = {
6161
'spC': self._feature_flag_storage.get_splits_count(),
6262
'seC': self._segment_storage.get_segments_count(),
63-
'skC': self._segment_storage.get_segments_keys_count()
63+
'skC': self._segment_storage.get_segments_keys_count(),
64+
'ufs': {'sp': self._telemetry_runtime_consumer.pop_update_from_sse()}
6465
}
6566
merged_dict.update(self._telemetry_runtime_consumer.pop_formatted_stats())
6667
merged_dict.update(self._telemetry_evaluation_consumer.pop_formatted_stats())

tests/push/test_split_worker.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from splitio.api import APIException
77
from splitio.push.splitworker import SplitWorker
88
from splitio.push.parser import SplitChangeUpdate
9+
from splitio.engine.telemetry import TelemetryStorageProducer
10+
from splitio.storage.inmemmory import InMemoryTelemetryStorage
911

1012
change_number_received = None
1113

@@ -20,11 +22,14 @@ class SplitWorkerTests(object):
2022

2123
def test_on_error(self, mocker):
2224
q = queue.Queue()
25+
telemetry_storage = InMemoryTelemetryStorage()
26+
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
27+
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
2328

2429
def handler_sync(change_number):
2530
raise APIException('some')
2631

27-
split_worker = SplitWorker(handler_sync, q, mocker.Mock())
32+
split_worker = SplitWorker(handler_sync, q, mocker.Mock(), telemetry_runtime_producer)
2833
split_worker.start()
2934
assert split_worker.is_running()
3035

@@ -41,7 +46,10 @@ def handler_sync(change_number):
4146

4247
def test_handler(self, mocker):
4348
q = queue.Queue()
44-
split_worker = SplitWorker(handler_sync, q, mocker.Mock())
49+
telemetry_storage = InMemoryTelemetryStorage()
50+
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
51+
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
52+
split_worker = SplitWorker(handler_sync, q, mocker.Mock(), telemetry_runtime_producer)
4553

4654
global change_number_received
4755
assert not split_worker.is_running()
@@ -90,7 +98,10 @@ def set_change_number(new_change_number):
9098

9199
def test_compression(self, mocker):
92100
q = queue.Queue()
93-
split_worker = SplitWorker(handler_sync, q, mocker.Mock())
101+
telemetry_storage = InMemoryTelemetryStorage()
102+
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
103+
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
104+
split_worker = SplitWorker(handler_sync, q, mocker.Mock(), telemetry_runtime_producer)
94105
global change_number_received
95106
split_worker.start()
96107
def get_change_number():
@@ -111,18 +122,21 @@ def remove(feature_flag):
111122
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456790, 2345, 'eyJ0cmFmZmljVHlwZU5hbWUiOiJ1c2VyIiwiaWQiOiIzM2VhZmE1MC0xYTY1LTExZWQtOTBkZi1mYTMwZDk2OTA0NDUiLCJuYW1lIjoiYmlsYWxfc3BsaXQiLCJ0cmFmZmljQWxsb2NhdGlvbiI6MTAwLCJ0cmFmZmljQWxsb2NhdGlvblNlZWQiOi0xMzY0MTE5MjgyLCJzZWVkIjotNjA1OTM4ODQzLCJzdGF0dXMiOiJBQ1RJVkUiLCJraWxsZWQiOmZhbHNlLCJkZWZhdWx0VHJlYXRtZW50Ijoib2ZmIiwiY2hhbmdlTnVtYmVyIjoxNjg0MzQwOTA4NDc1LCJhbGdvIjoyLCJjb25maWd1cmF0aW9ucyI6e30sImNvbmRpdGlvbnMiOlt7ImNvbmRpdGlvblR5cGUiOiJST0xMT1VUIiwibWF0Y2hlckdyb3VwIjp7ImNvbWJpbmVyIjoiQU5EIiwibWF0Y2hlcnMiOlt7ImtleVNlbGVjdG9yIjp7InRyYWZmaWNUeXBlIjoidXNlciJ9LCJtYXRjaGVyVHlwZSI6IklOX1NFR01FTlQiLCJuZWdhdGUiOmZhbHNlLCJ1c2VyRGVmaW5lZFNlZ21lbnRNYXRjaGVyRGF0YSI6eyJzZWdtZW50TmFtZSI6ImJpbGFsX3NlZ21lbnQifX1dfSwicGFydGl0aW9ucyI6W3sidHJlYXRtZW50Ijoib24iLCJzaXplIjowfSx7InRyZWF0bWVudCI6Im9mZiIsInNpemUiOjEwMH1dLCJsYWJlbCI6ImluIHNlZ21lbnQgYmlsYWxfc2VnbWVudCJ9LHsiY29uZGl0aW9uVHlwZSI6IlJPTExPVVQiLCJtYXRjaGVyR3JvdXAiOnsiY29tYmluZXIiOiJBTkQiLCJtYXRjaGVycyI6W3sia2V5U2VsZWN0b3IiOnsidHJhZmZpY1R5cGUiOiJ1c2VyIn0sIm1hdGNoZXJUeXBlIjoiQUxMX0tFWVMiLCJuZWdhdGUiOmZhbHNlfV19LCJwYXJ0aXRpb25zIjpbeyJ0cmVhdG1lbnQiOiJvbiIsInNpemUiOjB9LHsidHJlYXRtZW50Ijoib2ZmIiwic2l6ZSI6MTAwfV0sImxhYmVsIjoiZGVmYXVsdCBydWxlIn1dfQ==', 0))
112123
time.sleep(0.1)
113124
assert self._feature_flag.name == 'bilal_split'
125+
assert telemetry_storage._counters._update_from_sse == 1
114126

115127
# compression 2
116128
self._feature_flag = None
117129
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456790, 2345, 'eJzEUtFq20AQ/JUwz2c4WZZr3ZupTQh1FKjcQinGrKU95cjpZE6nh9To34ssJ3FNX0sfd3Zm53b2TgietDbF9vXIGdUMha5lDwFTQiGOmTQlchLRPJlEEZeTVJZ6oimWZTpP5WyWQMCNyoOxZPft0ZoA8TZ5aW1TUDCNg4qk/AueM5dQkyiez6IonS6mAu0IzWWSxovFLBZoA4WuhcLy8/bh+xoCL8bagaXJtixQsqbOhq1nCjW7AIVGawgUz+Qqzrr6wB4qmi9m00/JIk7TZCpAtmqgpgJF47SpOn9+UQt16s9YaS71z9NHOYQFha9Pm83Tty0EagrFM/t733RHqIFZH4wb7LDMVh+Ecc4Lv+ZsuQiNH8hXF3hLv39XXNCHbJ+v7x/X2eDmuKLA74sPihVr47jMuRpWfxy1Kwo0GLQjmv1xpBFD3+96gSP5cLVouM7QQaA1vxhK9uKmd853bEZS9jsBSwe2UDDu7mJxd2Mo/muQy81m/2X9I7+N8R/FcPmUd76zjH7X/w4AAP//90glTw==', 2))
118130
time.sleep(0.1)
119131
assert self._feature_flag.name == 'bilal_split'
132+
assert telemetry_storage._counters._update_from_sse == 2
120133

121134
# compression 1
122135
self._feature_flag = None
123136
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456790, 2345, 'H4sIAAkVZWQC/8WST0+DQBDFv0qzZ0ig/BF6a2xjGismUk2MaZopzOKmy9Isy0EbvrtDwbY2Xo233Tdv5se85cCMBs5FtvrYYwIlsglratTMYiKns+chcAgc24UwsF0Xczt2cm5z8Jw8DmPH9wPyqr5zKyTITb2XwpA4TJ5KWWVgRKXYxHWcX/QUkVi264W+68bjaGyxupdCJ4i9KPI9UgyYpibI9Ha1eJnT/J2QsnNxkDVaLEcOjTQrjWBKVIasFefky95BFZg05Zb2mrhh5I9vgsiL44BAIIuKTeiQVYqLotHHLyLOoT1quRjub4fztQuLxj89LpePzytClGCyd9R3umr21ErOcitUh2PTZHY29HN2+JGixMxUujNfvMB3+u2pY1AXySad3z3Mk46msACDp8W7jhly4uUpFt3qD33vDAx0gLpXkx+P1GusbdcE24M2F4uaywwVEWvxSa1Oa13Vjvn2RXradm0xCVuUVBJqNCBGV0DrX4OcLpeb+/lreh3jH8Uw/JQj3UhkxPgCCurdEnADAAA=', 1))
124137
time.sleep(0.1)
125138
assert self._feature_flag.name == 'bilal_split'
139+
assert telemetry_storage._counters._update_from_sse == 3
126140

127141
# should call delete split
128142
self._feature_flag = None
@@ -134,7 +148,10 @@ def remove(feature_flag):
134148

135149
def test_edge_cases(self, mocker):
136150
q = queue.Queue()
137-
split_worker = SplitWorker(handler_sync, q, mocker.Mock())
151+
telemetry_storage = InMemoryTelemetryStorage()
152+
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
153+
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
154+
split_worker = SplitWorker(handler_sync, q, mocker.Mock(), telemetry_runtime_producer)
138155
global change_number_received
139156
split_worker.start()
140157

0 commit comments

Comments
 (0)