Skip to content

Commit 114d02a

Browse files
committed
Added fetch segment with IFF split update
1 parent b553502 commit 114d02a

File tree

4 files changed

+55
-20
lines changed

4 files changed

+55
-20
lines changed

splitio/push/processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def __init__(self, synchronizer, telemetry_runtime_producer):
1919
self._feature_flag_queue = Queue()
2020
self._segments_queue = Queue()
2121
self._synchronizer = synchronizer
22-
self._feature_flag_worker = SplitWorker(synchronizer.synchronize_splits, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, telemetry_runtime_producer)
22+
self._feature_flag_worker = SplitWorker(synchronizer.synchronize_splits, synchronizer.synchronize_segment, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, synchronizer.segment_storage, telemetry_runtime_producer)
2323
self._segments_worker = SegmentWorker(synchronizer.synchronize_segment, self._segments_queue)
2424
self._handlers = {
2525
UpdateType.SPLIT_UPDATE: self._handle_feature_flag_update,

splitio/push/splitworker.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,35 @@ class SplitWorker(object):
2626

2727
_centinel = object()
2828

29-
def __init__(self, synchronize_feature_flag, feature_flag_queue, feature_flag_storage, telemetry_runtime_producer):
29+
def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_queue, feature_flag_storage, segment_storage, telemetry_runtime_producer):
3030
"""
3131
Class constructor.
3232
3333
:param synchronize_feature_flag: handler to perform feature flag synchronization on incoming event
3434
:type synchronize_feature_flag: callable
3535
36+
:param synchronize_segment: handler to perform segment synchronization on incoming event
37+
:type synchronize_segment: function
38+
3639
:param feature_flag_queue: queue with feature flag updates notifications
3740
:type feature_flag_queue: queue
41+
42+
:param feature_flag_storage: feature flag storage instance
43+
:type feature_flag_storage: splitio.storage.inmemory.InMemorySplitStorage
44+
45+
:param segment_storage: segment storage instance
46+
:type segment_storage: splitio.storage.inmemory.InMemorySegmentStorage
47+
48+
:param telemetry_runtime_producer: Telemetry runtime producer instance
49+
:type telemetry_runtime_producer: splitio.engine.telemetry.TelemetryRuntimeProducer
3850
"""
3951
self._feature_flag_queue = feature_flag_queue
4052
self._handler = synchronize_feature_flag
53+
self._segment_handler = synchronize_segment
4154
self._running = False
4255
self._worker = None
4356
self._feature_flag_storage = feature_flag_storage
57+
self._segment_storage = segment_storage
4458
self._compression_handlers = {
4559
CompressionMode.NO_COMPRESSION: lambda event: base64.b64decode(event.feature_flag_definition),
4660
CompressionMode.GZIP_COMPRESSION: lambda event: gzip.decompress(base64.b64decode(event.feature_flag_definition)).decode('utf-8'),
@@ -62,7 +76,6 @@ def _check_instant_ff_update(self, event):
6276
return True
6377
return False
6478

65-
6679
def _run(self):
6780
"""Run worker handler."""
6881
while self.is_running():
@@ -76,9 +89,14 @@ def _run(self):
7689
if self._check_instant_ff_update(event):
7790
try:
7891
new_split = from_raw(json.loads(self._get_feature_flag_definition(event)))
92+
_LOGGER.debug(new_split)
7993
if new_split.status == Status.ACTIVE:
8094
self._feature_flag_storage.put(new_split)
8195
_LOGGER.debug('Feature flag %s is updated', new_split.name)
96+
for segment_name in new_split.get_segment_names():
97+
if self._segment_storage.get(segment_name) is None:
98+
_LOGGER.debug('Fetching new segment %s', segment_name)
99+
self._segment_handler(segment_name, event.change_number)
82100
else:
83101
self._feature_flag_storage.remove(new_split.name)
84102
self._feature_flag_storage.set_change_number(event.change_number)

splitio/sync/synchronizer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,10 @@ def __init__(self, split_synchronizers, split_tasks):
256256
def split_sync(self):
257257
return self._split_synchronizers.split_sync
258258

259+
@property
260+
def segment_storage(self):
261+
return self._split_synchronizers.segment_sync._segment_storage
262+
259263
def _synchronize_segments(self):
260264
_LOGGER.debug('Starting segments synchronization')
261265
return self._split_synchronizers.segment_sync.synchronize_segments()

tests/push/test_split_worker.py

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@
77
from splitio.push.splitworker import SplitWorker
88
from splitio.push.parser import SplitChangeUpdate
99
from splitio.engine.telemetry import TelemetryStorageProducer
10-
from splitio.storage.inmemmory import InMemoryTelemetryStorage
11-
10+
from splitio.storage.inmemmory import InMemoryTelemetryStorage, InMemorySplitStorage, InMemorySegmentStorage
1211
change_number_received = None
1312

1413

@@ -22,14 +21,10 @@ class SplitWorkerTests(object):
2221

2322
def test_on_error(self, mocker):
2423
q = queue.Queue()
25-
telemetry_storage = InMemoryTelemetryStorage()
26-
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
27-
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
28-
2924
def handler_sync(change_number):
3025
raise APIException('some')
3126

32-
split_worker = SplitWorker(handler_sync, q, mocker.Mock(), telemetry_runtime_producer)
27+
split_worker = SplitWorker(handler_sync, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), mocker.Mock())
3328
split_worker.start()
3429
assert split_worker.is_running()
3530

@@ -46,10 +41,7 @@ def handler_sync(change_number):
4641

4742
def test_handler(self, mocker):
4843
q = queue.Queue()
49-
telemetry_storage = InMemoryTelemetryStorage()
50-
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
51-
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
52-
split_worker = SplitWorker(handler_sync, q, mocker.Mock(), telemetry_runtime_producer)
44+
split_worker = SplitWorker(handler_sync, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), mocker.Mock())
5345

5446
global change_number_received
5547
assert not split_worker.is_running()
@@ -101,7 +93,7 @@ def test_compression(self, mocker):
10193
telemetry_storage = InMemoryTelemetryStorage()
10294
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
10395
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
104-
split_worker = SplitWorker(handler_sync, q, mocker.Mock(), telemetry_runtime_producer)
96+
split_worker = SplitWorker(handler_sync, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), telemetry_runtime_producer)
10597
global change_number_received
10698
split_worker.start()
10799
def get_change_number():
@@ -148,10 +140,7 @@ def remove(feature_flag):
148140

149141
def test_edge_cases(self, mocker):
150142
q = queue.Queue()
151-
telemetry_storage = InMemoryTelemetryStorage()
152-
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
153-
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
154-
split_worker = SplitWorker(handler_sync, q, mocker.Mock(), telemetry_runtime_producer)
143+
split_worker = SplitWorker(handler_sync, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), mocker.Mock())
155144
global change_number_received
156145
split_worker.start()
157146

@@ -190,4 +179,28 @@ def put(feature_flag):
190179
change_number_received = 0
191180
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456, 2345, None, 1))
192181
time.sleep(0.1)
193-
assert self._feature_flag == None
182+
assert self._feature_flag == None
183+
184+
def test_fetch_segment(self, mocker):
185+
q = queue.Queue()
186+
split_storage = InMemorySplitStorage()
187+
segment_storage = InMemorySegmentStorage()
188+
189+
self.segment_name = None
190+
def segment_handler_sync(segment_name, change_number):
191+
self.segment_name = segment_name
192+
return
193+
split_worker = SplitWorker(handler_sync, segment_handler_sync, q, split_storage, segment_storage, mocker.Mock())
194+
split_worker.start()
195+
196+
def get_change_number():
197+
return 2345
198+
split_worker._feature_flag_storage.get_change_number = get_change_number
199+
200+
def check_instant_ff_update(event):
201+
return True
202+
split_worker._check_instant_ff_update = check_instant_ff_update
203+
204+
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 1675095324253, 2345, 'eyJjaGFuZ2VOdW1iZXIiOiAxNjc1MDk1MzI0MjUzLCAidHJhZmZpY1R5cGVOYW1lIjogInVzZXIiLCAibmFtZSI6ICJiaWxhbF9zcGxpdCIsICJ0cmFmZmljQWxsb2NhdGlvbiI6IDEwMCwgInRyYWZmaWNBbGxvY2F0aW9uU2VlZCI6IC0xMzY0MTE5MjgyLCAic2VlZCI6IC02MDU5Mzg4NDMsICJzdGF0dXMiOiAiQUNUSVZFIiwgImtpbGxlZCI6IGZhbHNlLCAiZGVmYXVsdFRyZWF0bWVudCI6ICJvZmYiLCAiYWxnbyI6IDIsICJjb25kaXRpb25zIjogW3siY29uZGl0aW9uVHlwZSI6ICJST0xMT1VUIiwgIm1hdGNoZXJHcm91cCI6IHsiY29tYmluZXIiOiAiQU5EIiwgIm1hdGNoZXJzIjogW3sia2V5U2VsZWN0b3IiOiB7InRyYWZmaWNUeXBlIjogInVzZXIiLCAiYXR0cmlidXRlIjogbnVsbH0sICJtYXRjaGVyVHlwZSI6ICJJTl9TRUdNRU5UIiwgIm5lZ2F0ZSI6IGZhbHNlLCAidXNlckRlZmluZWRTZWdtZW50TWF0Y2hlckRhdGEiOiB7InNlZ21lbnROYW1lIjogImJpbGFsX3NlZ21lbnQifSwgIndoaXRlbGlzdE1hdGNoZXJEYXRhIjogbnVsbCwgInVuYXJ5TnVtZXJpY01hdGNoZXJEYXRhIjogbnVsbCwgImJldHdlZW5NYXRjaGVyRGF0YSI6IG51bGwsICJkZXBlbmRlbmN5TWF0Y2hlckRhdGEiOiBudWxsLCAiYm9vbGVhbk1hdGNoZXJEYXRhIjogbnVsbCwgInN0cmluZ01hdGNoZXJEYXRhIjogbnVsbH1dfSwgInBhcnRpdGlvbnMiOiBbeyJ0cmVhdG1lbnQiOiAib24iLCAic2l6ZSI6IDB9LCB7InRyZWF0bWVudCI6ICJvZmYiLCAic2l6ZSI6IDEwMH1dLCAibGFiZWwiOiAiaW4gc2VnbWVudCBiaWxhbF9zZWdtZW50In0sIHsiY29uZGl0aW9uVHlwZSI6ICJST0xMT1VUIiwgIm1hdGNoZXJHcm91cCI6IHsiY29tYmluZXIiOiAiQU5EIiwgIm1hdGNoZXJzIjogW3sia2V5U2VsZWN0b3IiOiB7InRyYWZmaWNUeXBlIjogInVzZXIiLCAiYXR0cmlidXRlIjogbnVsbH0sICJtYXRjaGVyVHlwZSI6ICJBTExfS0VZUyIsICJuZWdhdGUiOiBmYWxzZSwgInVzZXJEZWZpbmVkU2VnbWVudE1hdGNoZXJEYXRhIjogbnVsbCwgIndoaXRlbGlzdE1hdGNoZXJEYXRhIjogbnVsbCwgInVuYXJ5TnVtZXJpY01hdGNoZXJEYXRhIjogbnVsbCwgImJldHdlZW5NYXRjaGVyRGF0YSI6IG51bGwsICJkZXBlbmRlbmN5TWF0Y2hlckRhdGEiOiBudWxsLCAiYm9vbGVhbk1hdGNoZXJEYXRhIjogbnVsbCwgInN0cmluZ01hdGNoZXJEYXRhIjogbnVsbH1dfSwgInBhcnRpdGlvbnMiOiBbeyJ0cmVhdG1lbnQiOiAib24iLCAic2l6ZSI6IDUwfSwgeyJ0cmVhdG1lbnQiOiAib2ZmIiwgInNpemUiOiA1MH1dLCAibGFiZWwiOiAiZGVmYXVsdCBydWxlIn1dLCAiY29uZmlndXJhdGlvbnMiOiB7fX0=', 0))
205+
time.sleep(0.1)
206+
assert self.segment_name == "bilal_segment"

0 commit comments

Comments
 (0)