Skip to content

Commit b553502

Browse files
authored
Merge pull request #392 from splitio/iff-telemetry-update
Added telemetry to iff update
2 parents f779b23 + deb767c commit b553502

File tree

13 files changed

+115
-24
lines changed

13 files changed

+115
-24
lines changed

splitio/engine/telemetry.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
_LOGGER = logging.getLogger(__name__)
77

88
from splitio.storage.inmemmory import InMemoryTelemetryStorage
9-
from splitio.models.telemetry import CounterConstants
9+
from splitio.models.telemetry import CounterConstants, UpdateFromSSE
1010

1111
class TelemetryStorageProducer(object):
1212
"""Telemetry storage producer class."""
@@ -139,6 +139,10 @@ def record_session_length(self, session):
139139
"""Record session length."""
140140
self._telemetry_storage.record_session_length(session)
141141

142+
def record_update_from_sse(self, event):
143+
"""Record update from sse."""
144+
self._telemetry_storage.record_update_from_sse(event)
145+
142146
class TelemetryStorageConsumer(object):
143147
"""Telemetry storage consumer class."""
144148

@@ -271,6 +275,10 @@ def pop_streaming_events(self):
271275
"""Get and reset streaming events."""
272276
return self._telemetry_storage.pop_streaming_events()
273277

278+
def pop_update_from_sse(self, event):
279+
"""Get and reset update from sse."""
280+
return self._telemetry_storage.pop_update_from_sse(event)
281+
274282
def get_session_length(self):
275283
"""Get session length"""
276284
return self._telemetry_storage.get_session_length()
@@ -292,6 +300,7 @@ def pop_formatted_stats(self):
292300
'iDr': self.get_impressions_stats(CounterConstants.IMPRESSIONS_DROPPED),
293301
'eQ': self.get_events_stats(CounterConstants.EVENTS_QUEUED),
294302
'eD': self.get_events_stats(CounterConstants.EVENTS_DROPPED),
303+
'ufs': {event.value: self.pop_update_from_sse(event) for event in UpdateFromSSE},
295304
'lS': {'sp': last_synchronization['split'],
296305
'se': last_synchronization['segment'],
297306
'im': last_synchronization['impression'],

splitio/models/telemetry.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ class OperationMode(Enum):
131131
CONSUMER = 'consumer'
132132
PARTIAL_CONSUMER = 'partial_consumer'
133133

134+
class UpdateFromSSE(Enum):
135+
"""Update from sse constants"""
136+
SPLIT_UPDATE = 'sp'
137+
134138
def get_latency_bucket_index(micros):
135139
"""
136140
Find the bucket index for a measured latency.
@@ -482,6 +486,7 @@ def _reset_all(self):
482486
self._auth_rejections = 0
483487
self._token_refreshes = 0
484488
self._session_length = 0
489+
self._update_from_sse = {}
485490

486491
def record_impressions_value(self, resource, value):
487492
"""
@@ -519,17 +524,27 @@ def record_events_value(self, resource, value):
519524
else:
520525
return
521526

527+
def record_update_from_sse(self, event):
528+
"""
529+
Increment the update from sse resource by one.
530+
531+
"""
532+
with self._lock:
533+
if event.value not in self._update_from_sse:
534+
self._update_from_sse[event.value] = 0
535+
self._update_from_sse[event.value] += 1
536+
522537
def record_auth_rejections(self):
523538
"""
524-
Increament the auth rejection resource by one.
539+
Increment the auth rejection resource by one.
525540
526541
"""
527542
with self._lock:
528543
self._auth_rejections += 1
529544

530545
def record_token_refreshes(self):
531546
"""
532-
Increament the token refreshes resource by one.
547+
Increment the token refreshes resource by one.
533548
534549
"""
535550
with self._lock:
@@ -604,6 +619,18 @@ def pop_token_refreshes(self):
604619
self._token_refreshes = 0
605620
return token_refreshes
606621

622+
def pop_update_from_sse(self, event):
623+
"""
624+
Pop update from sse
625+
626+
:return: update from sse value
627+
:rtype: int
628+
"""
629+
with self._lock:
630+
update_from_sse = self._update_from_sse[event.value]
631+
self._update_from_sse[event.value] = 0
632+
return update_from_sse
633+
607634
class StreamingEvent(object):
608635
"""
609636
Streaming event class

splitio/push/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def __init__(self, auth_api, synchronizer, feedback_loop, sdk_metadata, telemetr
4444
"""
4545
self._auth_api = auth_api
4646
self._feedback_loop = feedback_loop
47-
self._processor = MessageProcessor(synchronizer)
47+
self._processor = MessageProcessor(synchronizer, telemetry_runtime_producer)
4848
self._status_tracker = PushStatusTracker(telemetry_runtime_producer)
4949
self._event_handlers = {
5050
EventType.MESSAGE: self._handle_message,

splitio/push/processor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
class MessageProcessor(object):
1010
"""Message processor class."""
1111

12-
def __init__(self, synchronizer):
12+
def __init__(self, synchronizer, telemetry_runtime_producer):
1313
"""
1414
Class constructor.
1515
@@ -19,7 +19,7 @@ def __init__(self, synchronizer):
1919
self._feature_flag_queue = Queue()
2020
self._segments_queue = Queue()
2121
self._synchronizer = synchronizer
22-
self._feature_flag_worker = SplitWorker(synchronizer.synchronize_splits, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage)
22+
self._feature_flag_worker = SplitWorker(synchronizer.synchronize_splits, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, telemetry_runtime_producer)
2323
self._segments_worker = SegmentWorker(synchronizer.synchronize_segment, self._segments_queue)
2424
self._handlers = {
2525
UpdateType.SPLIT_UPDATE: self._handle_feature_flag_update,

splitio/push/splitworker.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
from enum import Enum
99

1010
from splitio.models.splits import from_raw, Status
11+
from splitio.models.telemetry import UpdateFromSSE
1112
from splitio.push.parser import UpdateType
1213

14+
1315
_LOGGER = logging.getLogger(__name__)
1416

1517
class CompressionMode(Enum):
@@ -24,7 +26,7 @@ class SplitWorker(object):
2426

2527
_centinel = object()
2628

27-
def __init__(self, synchronize_feature_flag, feature_flag_queue, feature_flag_storage):
29+
def __init__(self, synchronize_feature_flag, feature_flag_queue, feature_flag_storage, telemetry_runtime_producer):
2830
"""
2931
Class constructor.
3032
@@ -44,6 +46,7 @@ def __init__(self, synchronize_feature_flag, feature_flag_queue, feature_flag_st
4446
CompressionMode.GZIP_COMPRESSION: lambda event: gzip.decompress(base64.b64decode(event.feature_flag_definition)).decode('utf-8'),
4547
CompressionMode.ZLIB_COMPRESSION: lambda event: zlib.decompress(base64.b64decode(event.feature_flag_definition)).decode('utf-8'),
4648
}
49+
self._telemetry_runtime_producer = telemetry_runtime_producer
4750

4851
def is_running(self):
4952
"""Return whether the working is running."""
@@ -75,9 +78,11 @@ def _run(self):
7578
new_split = from_raw(json.loads(self._get_feature_flag_definition(event)))
7679
if new_split.status == Status.ACTIVE:
7780
self._feature_flag_storage.put(new_split)
81+
_LOGGER.debug('Feature flag %s is updated', new_split.name)
7882
else:
7983
self._feature_flag_storage.remove(new_split.name)
8084
self._feature_flag_storage.set_change_number(event.change_number)
85+
self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
8186
continue
8287
except Exception as e:
8388
_LOGGER.error('Exception raised in updating feature flag')

splitio/storage/inmemmory.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,10 @@ def record_session_length(self, session):
563563
"""Record session length."""
564564
self._counters.record_session_length(session)
565565

566+
def record_update_from_sse(self, event):
567+
"""Record update from sse."""
568+
self._counters.record_update_from_sse(event)
569+
566570
def get_bur_time_outs(self):
567571
"""Get block until ready timeout."""
568572
return self._tel_config.get_bur_time_outs()
@@ -632,6 +636,10 @@ def get_session_length(self):
632636
"""Get session length"""
633637
return self._counters.get_session_length()
634638

639+
def pop_update_from_sse(self, event):
640+
"""Get and reset update from sse."""
641+
return self._counters.pop_update_from_sse(event)
642+
635643
class LocalhostTelemetryStorage():
636644
"""Localhost telemetry storage."""
637645
def do_nothing(*_, **__):

splitio/sync/telemetry.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from splitio.api.telemetry import TelemetryAPI
55
from splitio.engine.telemetry import TelemetryStorageConsumer
6+
from splitio.models.telemetry import UpdateFromSSE
67

78
class TelemetrySynchronizer(object):
89
"""Telemetry synchronizer class."""

tests/engine/test_telemetry.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,13 @@ def test_record_token_refreshes(self, mocker):
163163
telemetry_runtime_producer.record_token_refreshes()
164164
assert(mocker.called)
165165

166+
@mock.patch('splitio.storage.inmemmory.InMemoryTelemetryStorage.record_update_from_sse')
167+
def test_record_update_from_sse(self, mocker):
168+
telemetry_storage = InMemoryTelemetryStorage()
169+
telemetry_runtime_producer = TelemetryRuntimeProducer(telemetry_storage)
170+
telemetry_runtime_producer.record_update_from_sse('sp')
171+
assert(mocker.called)
172+
166173
def test_record_streaming_event(self, mocker):
167174
telemetry_storage = mocker.Mock()
168175
telemetry_runtime_producer = TelemetryRuntimeProducer(telemetry_storage)
@@ -290,6 +297,12 @@ def test_pop_auth_rejections(self, mocker):
290297
telemetry_runtime_consumer = TelemetryRuntimeConsumer(telemetry_storage)
291298
telemetry_runtime_consumer.pop_auth_rejections()
292299

300+
@mock.patch('splitio.storage.inmemmory.InMemoryTelemetryStorage.pop_update_from_sse')
301+
def test_pop_auth_rejections(self, mocker):
302+
telemetry_storage = InMemoryTelemetryStorage()
303+
telemetry_runtime_consumer = TelemetryRuntimeConsumer(telemetry_storage)
304+
telemetry_runtime_consumer.pop_update_from_sse('sp')
305+
293306
@mock.patch('splitio.storage.inmemmory.InMemoryTelemetryStorage.pop_token_refreshes')
294307
def test_pop_token_refreshes(self, mocker):
295308
telemetry_storage = InMemoryTelemetryStorage()

tests/models/test_telemetry_model.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from splitio.models.telemetry import StorageType, OperationMode, MethodLatencies, MethodExceptions, \
77
HTTPLatencies, HTTPErrors, LastSynchronization, TelemetryCounters, TelemetryConfig, \
8-
StreamingEvent, StreamingEvents, get_latency_bucket_index
8+
StreamingEvent, StreamingEvents, UpdateFromSSE
99

1010
import splitio.models.telemetry as ModelTelemetry
1111

@@ -195,6 +195,7 @@ def test_telemetry_counters(self):
195195
assert(telemetry_counter._events_queued == 0)
196196
assert(telemetry_counter._auth_rejections == 0)
197197
assert(telemetry_counter._token_refreshes == 0)
198+
assert(telemetry_counter._update_from_sse == {})
198199

199200
telemetry_counter.record_session_length(20)
200201
assert(telemetry_counter.get_session_length() == 20)
@@ -219,6 +220,11 @@ def test_telemetry_counters(self):
219220
assert(telemetry_counter._events_queued == 30)
220221
telemetry_counter.record_events_value(ModelTelemetry.CounterConstants.EVENTS_DROPPED, 1)
221222
assert(telemetry_counter._events_dropped == 1)
223+
telemetry_counter.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
224+
assert(telemetry_counter._update_from_sse[UpdateFromSSE.SPLIT_UPDATE.value] == 1)
225+
updates = telemetry_counter.pop_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
226+
assert(telemetry_counter._update_from_sse[UpdateFromSSE.SPLIT_UPDATE.value] == 0)
227+
assert(updates == 1)
222228

223229
def test_streaming_event(self, mocker):
224230
streaming_event = StreamingEvent((ModelTelemetry.StreamingEventTypes.CONNECTION_ESTABLISHED, 'split', 1234))

tests/push/test_manager.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,13 @@ def test_split_change(self, mocker):
145145
processor_mock = mocker.Mock(spec=MessageProcessor)
146146
mocker.patch('splitio.push.manager.MessageProcessor', new=processor_mock)
147147

148-
telemetry_storage = InMemoryTelemetryStorage()
149-
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
150-
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
151-
manager = PushManager(mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock(), telemetry_runtime_producer)
148+
telemetry_runtime_producer = mocker.Mock()
149+
synchronizer = mocker.Mock()
150+
manager = PushManager(mocker.Mock(), synchronizer, mocker.Mock(), mocker.Mock(), telemetry_runtime_producer)
152151
manager._event_handler(sse_event)
153152
assert parse_event_mock.mock_calls == [mocker.call(sse_event)]
154153
assert processor_mock.mock_calls == [
155-
mocker.call(Any()),
154+
mocker.call(synchronizer, telemetry_runtime_producer),
156155
mocker.call().handle(update_message)
157156
]
158157

@@ -167,11 +166,13 @@ def test_split_kill(self, mocker):
167166
processor_mock = mocker.Mock(spec=MessageProcessor)
168167
mocker.patch('splitio.push.manager.MessageProcessor', new=processor_mock)
169168

170-
manager = PushManager(mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock())
169+
telemetry_runtime_producer = mocker.Mock()
170+
synchronizer = mocker.Mock()
171+
manager = PushManager(mocker.Mock(), synchronizer, mocker.Mock(), mocker.Mock(), telemetry_runtime_producer)
171172
manager._event_handler(sse_event)
172173
assert parse_event_mock.mock_calls == [mocker.call(sse_event)]
173174
assert processor_mock.mock_calls == [
174-
mocker.call(Any()),
175+
mocker.call(synchronizer, telemetry_runtime_producer),
175176
mocker.call().handle(update_message)
176177
]
177178

@@ -186,11 +187,13 @@ def test_segment_change(self, mocker):
186187
processor_mock = mocker.Mock(spec=MessageProcessor)
187188
mocker.patch('splitio.push.manager.MessageProcessor', new=processor_mock)
188189

189-
manager = PushManager(mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock())
190+
telemetry_runtime_producer = mocker.Mock()
191+
synchronizer = mocker.Mock()
192+
manager = PushManager(mocker.Mock(), synchronizer, mocker.Mock(), mocker.Mock(), telemetry_runtime_producer)
190193
manager._event_handler(sse_event)
191194
assert parse_event_mock.mock_calls == [mocker.call(sse_event)]
192195
assert processor_mock.mock_calls == [
193-
mocker.call(Any()),
196+
mocker.call(synchronizer, telemetry_runtime_producer),
194197
mocker.call().handle(update_message)
195198
]
196199

0 commit comments

Comments
 (0)