Skip to content

Commit 523d239

Browse files
authored
Merge pull request #417 from splitio/development
Development
2 parents c85ce86 + 2847d68 commit 523d239

25 files changed

+685
-243
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
* @splitio/sdk

CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
9.5.0 (Jul 18, 2023)
2+
- Improved streaming architecture implementation to apply feature flag updates from the notification received which is now enhanced, improving efficiency and reliability of the whole update system.
3+
14
9.4.2 (May 15, 2023)
25
- Updated terminology on the SDKs codebase to be more aligned with current standard without causing a breaking change. The core change is the term split for feature flag on things like logs and code documentation comments.
36
- Added detailed debug logging for redis adapter.

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
'pyyaml>=5.4',
2222
'docopt>=0.6.2',
2323
'enum34;python_version<"3.4"',
24-
'bloom-filter2>=2.0.0',
24+
'bloom-filter2>=2.0.0'
2525
]
2626

2727
with open(path.join(path.abspath(path.dirname(__file__)), 'splitio', 'version.py')) as f:
@@ -44,7 +44,7 @@
4444
'uwsgi': ['uwsgi>=2.0.0'],
4545
'cpphash': ['mmh3cffi==0.2.1'],
4646
},
47-
setup_requires=['pytest-runner'],
47+
setup_requires=['pytest-runner', 'pluggy==1.0.0;python_version<"3.7"'],
4848
classifiers=[
4949
'Environment :: Console',
5050
'Intended Audience :: Developers',

splitio/engine/telemetry.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
_LOGGER = logging.getLogger(__name__)
77

88
from splitio.storage.inmemmory import InMemoryTelemetryStorage
9-
from splitio.models.telemetry import CounterConstants
9+
from splitio.models.telemetry import CounterConstants, UpdateFromSSE
1010

1111
class TelemetryStorageProducer(object):
1212
"""Telemetry storage producer class."""
@@ -139,6 +139,10 @@ def record_session_length(self, session):
139139
"""Record session length."""
140140
self._telemetry_storage.record_session_length(session)
141141

142+
def record_update_from_sse(self, event):
143+
"""Record update from sse."""
144+
self._telemetry_storage.record_update_from_sse(event)
145+
142146
class TelemetryStorageConsumer(object):
143147
"""Telemetry storage consumer class."""
144148

@@ -271,6 +275,10 @@ def pop_streaming_events(self):
271275
"""Get and reset streaming events."""
272276
return self._telemetry_storage.pop_streaming_events()
273277

278+
def pop_update_from_sse(self, event):
279+
"""Get and reset update from sse."""
280+
return self._telemetry_storage.pop_update_from_sse(event)
281+
274282
def get_session_length(self):
275283
"""Get session length"""
276284
return self._telemetry_storage.get_session_length()
@@ -292,6 +300,7 @@ def pop_formatted_stats(self):
292300
'iDr': self.get_impressions_stats(CounterConstants.IMPRESSIONS_DROPPED),
293301
'eQ': self.get_events_stats(CounterConstants.EVENTS_QUEUED),
294302
'eD': self.get_events_stats(CounterConstants.EVENTS_DROPPED),
303+
'ufs': {event.value: self.pop_update_from_sse(event) for event in UpdateFromSSE},
295304
'lS': {'sp': last_synchronization['split'],
296305
'se': last_synchronization['segment'],
297306
'im': last_synchronization['impression'],

splitio/models/telemetry.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ class OperationMode(Enum):
131131
CONSUMER = 'consumer'
132132
PARTIAL_CONSUMER = 'partial_consumer'
133133

134+
class UpdateFromSSE(Enum):
135+
"""Update from sse constants"""
136+
SPLIT_UPDATE = 'sp'
137+
134138
def get_latency_bucket_index(micros):
135139
"""
136140
Find the bucket index for a measured latency.
@@ -482,6 +486,7 @@ def _reset_all(self):
482486
self._auth_rejections = 0
483487
self._token_refreshes = 0
484488
self._session_length = 0
489+
self._update_from_sse = {}
485490

486491
def record_impressions_value(self, resource, value):
487492
"""
@@ -519,17 +524,27 @@ def record_events_value(self, resource, value):
519524
else:
520525
return
521526

527+
def record_update_from_sse(self, event):
528+
"""
529+
Increment the update from sse resource by one.
530+
531+
"""
532+
with self._lock:
533+
if event.value not in self._update_from_sse:
534+
self._update_from_sse[event.value] = 0
535+
self._update_from_sse[event.value] += 1
536+
522537
def record_auth_rejections(self):
523538
"""
524-
Increament the auth rejection resource by one.
539+
Increment the auth rejection resource by one.
525540
526541
"""
527542
with self._lock:
528543
self._auth_rejections += 1
529544

530545
def record_token_refreshes(self):
531546
"""
532-
Increament the token refreshes resource by one.
547+
Increment the token refreshes resource by one.
533548
534549
"""
535550
with self._lock:
@@ -604,6 +619,18 @@ def pop_token_refreshes(self):
604619
self._token_refreshes = 0
605620
return token_refreshes
606621

622+
def pop_update_from_sse(self, event):
623+
"""
624+
Pop update from sse
625+
626+
:return: update from sse value
627+
:rtype: int
628+
"""
629+
with self._lock:
630+
update_from_sse = self._update_from_sse[event.value]
631+
self._update_from_sse[event.value] = 0
632+
return update_from_sse
633+
607634
class StreamingEvent(object):
608635
"""
609636
Streaming event class

splitio/push/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def __init__(self, auth_api, synchronizer, feedback_loop, sdk_metadata, telemetr
4444
"""
4545
self._auth_api = auth_api
4646
self._feedback_loop = feedback_loop
47-
self._processor = MessageProcessor(synchronizer)
47+
self._processor = MessageProcessor(synchronizer, telemetry_runtime_producer)
4848
self._status_tracker = PushStatusTracker(telemetry_runtime_producer)
4949
self._event_handlers = {
5050
EventType.MESSAGE: self._handle_message,

splitio/push/parser.py

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from splitio.util.time import utctime_ms
88
from splitio.push.sse import SSE_EVENT_ERROR, SSE_EVENT_MESSAGE
99

10-
1110
class EventType(Enum):
1211
"""Event type enumeration."""
1312

@@ -277,7 +276,7 @@ def __str__(self):
277276

278277

279278
class BaseUpdate(BaseMessage, metaclass=abc.ABCMeta):
280-
"""Split data update notification."""
279+
"""Feature flag data update notification."""
281280

282281
def __init__(self, channel, timestamp, change_number):
283282
"""
@@ -324,11 +323,14 @@ def change_number(self):
324323

325324

326325
class SplitChangeUpdate(BaseUpdate):
327-
"""Split Change notification."""
326+
"""Feature flag Change notification."""
328327

329-
def __init__(self, channel, timestamp, change_number):
328+
def __init__(self, channel, timestamp, change_number, previous_change_number, feature_flag_definition, compression):
330329
"""Class constructor."""
331330
BaseUpdate.__init__(self, channel, timestamp, change_number)
331+
self._previous_change_number = previous_change_number
332+
self._feature_flag_definition = feature_flag_definition
333+
self._compression = compression
332334

333335
@property
334336
def update_type(self): # pylint:disable=no-self-use
@@ -340,18 +342,48 @@ def update_type(self): # pylint:disable=no-self-use
340342
"""
341343
return UpdateType.SPLIT_UPDATE
342344

345+
@property
346+
def previous_change_number(self): # pylint:disable=no-self-use
347+
"""
348+
Return previous change number
349+
350+
:returns: The previous change number
351+
:rtype: int
352+
"""
353+
return self._previous_change_number
354+
355+
@property
356+
def feature_flag_definition(self): # pylint:disable=no-self-use
357+
"""
358+
Return feature flag definition
359+
360+
:returns: The new feature flag definition
361+
:rtype: str
362+
"""
363+
return self._feature_flag_definition
364+
365+
@property
366+
def compression(self): # pylint:disable=no-self-use
367+
"""
368+
Return previous compression type
369+
370+
:returns: The compression type
371+
:rtype: int
372+
"""
373+
return self._compression
374+
343375
def __str__(self):
344376
"""Return string representation."""
345377
return "SplitChange - changeNumber=%d" % (self.change_number)
346378

347379

348380
class SplitKillUpdate(BaseUpdate):
349-
"""Split Kill notification."""
381+
"""Feature flag Kill notification."""
350382

351-
def __init__(self, channel, timestamp, change_number, split_name, default_treatment): # pylint:disable=too-many-arguments
383+
def __init__(self, channel, timestamp, change_number, feature_flag_name, default_treatment): # pylint:disable=too-many-arguments
352384
"""Class constructor."""
353385
BaseUpdate.__init__(self, channel, timestamp, change_number)
354-
self._split_name = split_name
386+
self._feature_flag_name = feature_flag_name
355387
self._default_treatment = default_treatment
356388

357389
@property
@@ -365,14 +397,14 @@ def update_type(self): # pylint:disable=no-self-use
365397
return UpdateType.SPLIT_KILL
366398

367399
@property
368-
def split_name(self):
400+
def feature_flag_name(self):
369401
"""
370-
Return the name of the killed split.
402+
Return the name of the killed feature flag.
371403
372-
:returns: name of the killed split
404+
:returns: name of the killed feature flag
373405
:rtype: str
374406
"""
375-
return self._split_name
407+
return self._feature_flag_name
376408

377409
@property
378410
def default_treatment(self):
@@ -387,7 +419,7 @@ def default_treatment(self):
387419
def __str__(self):
388420
"""Return string representation."""
389421
return "SplitKill - changeNumber=%d, name=%s, defaultTreatment=%s" % \
390-
(self.change_number, self.split_name, self.default_treatment)
422+
(self.change_number, self.feature_flag_name, self.default_treatment)
391423

392424

393425
class SegmentChangeUpdate(BaseUpdate):
@@ -471,9 +503,9 @@ def _parse_update(channel, timestamp, data):
471503
"""
472504
update_type = UpdateType(data['type'])
473505
change_number = data['changeNumber']
474-
if update_type == UpdateType.SPLIT_UPDATE:
475-
return SplitChangeUpdate(channel, timestamp, change_number)
476-
elif update_type == UpdateType.SPLIT_KILL:
506+
if update_type == UpdateType.SPLIT_UPDATE and change_number is not None:
507+
return SplitChangeUpdate(channel, timestamp, change_number, data.get('pcn'), data.get('d'), data.get('c'))
508+
elif update_type == UpdateType.SPLIT_KILL and change_number is not None:
477509
return SplitKillUpdate(channel, timestamp, change_number,
478510
data['splitName'], data['defaultTreatment'])
479511
elif update_type == UpdateType.SEGMENT_UPDATE:

splitio/push/processor.py

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,47 +6,46 @@
66
from splitio.push.splitworker import SplitWorker
77
from splitio.push.segmentworker import SegmentWorker
88

9-
109
class MessageProcessor(object):
1110
"""Message processor class."""
1211

13-
def __init__(self, synchronizer):
12+
def __init__(self, synchronizer, telemetry_runtime_producer):
1413
"""
1514
Class constructor.
1615
1716
:param synchronizer: synchronizer component
1817
:type synchronizer: splitio.sync.synchronizer.Synchronizer
1918
"""
20-
self._split_queue = Queue()
19+
self._feature_flag_queue = Queue()
2120
self._segments_queue = Queue()
2221
self._synchronizer = synchronizer
23-
self._split_worker = SplitWorker(synchronizer.synchronize_splits, self._split_queue)
22+
self._feature_flag_worker = SplitWorker(synchronizer.synchronize_splits, synchronizer.synchronize_segment, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, synchronizer.segment_storage, telemetry_runtime_producer)
2423
self._segments_worker = SegmentWorker(synchronizer.synchronize_segment, self._segments_queue)
2524
self._handlers = {
26-
UpdateType.SPLIT_UPDATE: self._handle_split_update,
27-
UpdateType.SPLIT_KILL: self._handle_split_kill,
25+
UpdateType.SPLIT_UPDATE: self._handle_feature_flag_update,
26+
UpdateType.SPLIT_KILL: self._handle_feature_flag_kill,
2827
UpdateType.SEGMENT_UPDATE: self._handle_segment_change
2928
}
3029

31-
def _handle_split_update(self, event):
30+
def _handle_feature_flag_update(self, event):
3231
"""
33-
Handle incoming split update notification.
32+
Handle incoming feature flag update notification.
3433
35-
:param event: Incoming split change event
34+
:param event: Incoming feature flag change event
3635
:type event: splitio.push.parser.SplitChangeUpdate
3736
"""
38-
self._split_queue.put(event)
37+
self._feature_flag_queue.put(event)
3938

40-
def _handle_split_kill(self, event):
39+
def _handle_feature_flag_kill(self, event):
4140
"""
42-
Handle incoming split kill notification.
41+
Handle incoming feature flag kill notification.
4342
44-
:param event: Incoming split kill event
43+
:param event: Incoming feature flag kill event
4544
:type event: splitio.push.parser.SplitKillUpdate
4645
"""
47-
self._synchronizer.kill_split(event.split_name, event.default_treatment,
46+
self._synchronizer.kill_split(event.feature_flag_name, event.default_treatment,
4847
event.change_number)
49-
self._split_queue.put(event)
48+
self._feature_flag_queue.put(event)
5049

5150
def _handle_segment_change(self, event):
5251
"""
@@ -65,10 +64,10 @@ def update_workers_status(self, enabled):
6564
:type enabled: bool
6665
"""
6766
if enabled:
68-
self._split_worker.start()
67+
self._feature_flag_worker.start()
6968
self._segments_worker.start()
7069
else:
71-
self._split_worker.stop()
70+
self._feature_flag_worker.stop()
7271
self._segments_worker.stop()
7372

7473
def handle(self, event):
@@ -86,6 +85,6 @@ def handle(self, event):
8685
handle(event)
8786

8887
def shutdown(self):
89-
"""Stop splits & segments workers."""
90-
self._split_worker.stop()
88+
"""Stop feature flags & segments workers."""
89+
self._feature_flag_worker.stop()
9190
self._segments_worker.stop()

0 commit comments

Comments
 (0)