Skip to content

Commit 8499daa

Browse files
authored
Merge pull request #375 from splitio/iff-split-worker
Added IFF support to split_worker class
2 parents 1725e90 + 7b6e333 commit 8499daa

File tree

2 files changed

+108
-10
lines changed

2 files changed

+108
-10
lines changed

splitio/push/splitworker.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,29 @@
11
"""Feature Flag changes processing worker."""
22
import logging
33
import threading
4+
import gzip
5+
import zlib
6+
import base64
7+
import json
8+
from enum import Enum
49

10+
from splitio.models.splits import from_raw
511

612
_LOGGER = logging.getLogger(__name__)
713

14+
class CompressionMode(Enum):
15+
"""Compression modes """
16+
17+
NO_COMPRESSION = 0
18+
GZIP_COMPRESSION = 1
19+
ZLIB_COMPRESSION = 2
820

921
class SplitWorker(object):
1022
"""Feature Flag Worker for processing updates."""
1123

1224
_centinel = object()
1325

14-
def __init__(self, synchronize_feature_flag, feature_flag_queue):
26+
def __init__(self, synchronize_feature_flag, feature_flag_queue, feature_flag_storage):
1527
"""
1628
Class constructor.
1729
@@ -25,11 +37,22 @@ def __init__(self, synchronize_feature_flag, feature_flag_queue):
2537
self._handler = synchronize_feature_flag
2638
self._running = False
2739
self._worker = None
40+
self._feature_flag_storage = feature_flag_storage
41+
self._compression_handlers = {
42+
CompressionMode.NO_COMPRESSION: lambda event: base64.b64decode(event.feature_flag_definition),
43+
CompressionMode.GZIP_COMPRESSION: lambda event: gzip.decompress(base64.b64decode(event.feature_flag_definition)).decode('utf-8'),
44+
CompressionMode.ZLIB_COMPRESSION: lambda event: zlib.decompress(base64.b64decode(event.feature_flag_definition)).decode('utf-8'),
45+
}
2846

2947
def is_running(self):
3048
"""Return whether the working is running."""
3149
return self._running
3250

51+
def _get_feature_flag_definition(self, event):
52+
"""return feature flag definition in event."""
53+
cm = CompressionMode(event.compression) # will throw if the number is not defined in compression mode
54+
return self._compression_handlers[cm](event)
55+
3356
def _run(self):
3457
"""Run worker handler."""
3558
while self.is_running():
@@ -40,9 +63,20 @@ def _run(self):
4063
continue
4164
_LOGGER.debug('Processing feature flag update %d', event.change_number)
4265
try:
66+
if event.compression is not None and event.previous_change_number == self._feature_flag_storage.get_change_number():
67+
try:
68+
self._feature_flag_storage.put(from_raw(json.loads(self._get_feature_flag_definition(event))))
69+
self._feature_flag_storage.set_change_number(event.change_number)
70+
continue
71+
except Exception as e:
72+
_LOGGER.error('Exception raised in updating feature flag')
73+
_LOGGER.debug(str(e))
74+
_LOGGER.debug('Exception information: ', exc_info=True)
75+
pass
4376
self._handler(event.change_number)
44-
except Exception: # pylint: disable=broad-except
77+
except Exception as e: # pylint: disable=broad-except
4578
_LOGGER.error('Exception raised in feature flag synchronization')
79+
_LOGGER.debug(str(e))
4680
_LOGGER.debug('Exception information: ', exc_info=True)
4781

4882
def start(self):

tests/push/test_split_worker.py

Lines changed: 72 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from splitio.api import APIException
77
from splitio.push.splitworker import SplitWorker
8-
from splitio.models.notification import SplitChangeNotification
8+
from splitio.push.parser import SplitChangeUpdate
99

1010
change_number_received = None
1111

@@ -18,17 +18,17 @@ def handler_sync(change_number):
1818

1919
class SplitWorkerTests(object):
2020

21-
def test_on_error(self):
21+
def test_on_error(self, mocker):
2222
q = queue.Queue()
2323

2424
def handler_sync(change_number):
2525
raise APIException('some')
2626

27-
split_worker = SplitWorker(handler_sync, q)
27+
split_worker = SplitWorker(handler_sync, q, mocker.Mock())
2828
split_worker.start()
2929
assert split_worker.is_running()
3030

31-
q.put(SplitChangeNotification('some', 'SPLIT_UPDATE', 123456789))
31+
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456789, None, None, None))
3232
with pytest.raises(Exception):
3333
split_worker._handler()
3434

@@ -39,19 +39,83 @@ def handler_sync(change_number):
3939
assert not split_worker.is_running()
4040
assert not split_worker._worker.is_alive()
4141

42-
def test_handler(self):
42+
def test_handler(self, mocker):
4343
q = queue.Queue()
44-
split_worker = SplitWorker(handler_sync, q)
44+
split_worker = SplitWorker(handler_sync, q, mocker.Mock())
4545

4646
global change_number_received
4747
assert not split_worker.is_running()
4848
split_worker.start()
4949
assert split_worker.is_running()
5050

51-
q.put(SplitChangeNotification('some', 'SPLIT_UPDATE', 123456789))
52-
51+
# should call the handler
52+
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456789, None, None, None))
5353
time.sleep(0.1)
5454
assert change_number_received == 123456789
5555

56+
def get_change_number():
57+
return 2345
58+
59+
self._feature_flag = None
60+
def put(feature_flag):
61+
self._feature_flag = feature_flag
62+
63+
self.new_change_number = 0
64+
def set_change_number(new_change_number):
65+
self.new_change_number = new_change_number
66+
67+
split_worker._feature_flag_storage.get_change_number = get_change_number
68+
split_worker._feature_flag_storage.set_change_number = set_change_number
69+
split_worker._feature_flag_storage.put = put
70+
71+
# should call the handler
72+
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456790, 12345, "{}", 1))
73+
time.sleep(0.1)
74+
assert change_number_received == 123456790
75+
76+
# should call the handler
77+
change_number_received = 0
78+
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456790, 12345, "{}", 3))
79+
time.sleep(0.1)
80+
assert change_number_received == 123456790
81+
82+
# should Not call the handler
83+
change_number_received = 0
84+
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456, 2345, "eJzEUtFq20AQ/JUwz2c4WZZr3ZupTQh1FKjcQinGrKU95cjpZE6nh9To34ssJ3FNX0sfd3Zm53b2TgietDbF9vXIGdUMha5lDwFTQiGOmTQlchLRPJlEEZeTVJZ6oimWZTpP5WyWQMCNyoOxZPft0ZoA8TZ5aW1TUDCNg4qk/AueM5dQkyiez6IonS6mAu0IzWWSxovFLBZoA4WuhcLy8/bh+xoCL8bagaXJtixQsqbOhq1nCjW7AIVGawgUz+Qqzrr6wB4qmi9m00/JIk7TZCpAtmqgpgJF47SpOn9+UQt16s9YaS71z9NHOYQFha9Pm83Tty0EagrFM/t733RHqIFZH4wb7LDMVh+Ecc4Lv+ZsuQiNH8hXF3hLv39XXNCHbJ+v7x/X2eDmuKLA74sPihVr47jMuRpWfxy1Kwo0GLQjmv1xpBFD3+96gSP5cLVouM7QQaA1vxhK9uKmd853bEZS9jsBSwe2UDDu7mJxd2Mo/muQy81m/2X9I7+N8R/FcPmUd76zjH7X/w4AAP//90glTw==", 2))
85+
time.sleep(0.1)
86+
assert change_number_received == 0
87+
5688
split_worker.stop()
5789
assert not split_worker.is_running()
90+
91+
def test_compression(self, mocker):
92+
q = queue.Queue()
93+
split_worker = SplitWorker(handler_sync, q, mocker.Mock())
94+
global change_number_received
95+
split_worker.start()
96+
def get_change_number():
97+
return 2345
98+
99+
def put(feature_flag):
100+
self._feature_flag = feature_flag
101+
102+
split_worker._feature_flag_storage.get_change_number = get_change_number
103+
split_worker._feature_flag_storage.put = put
104+
105+
# compression 0
106+
self._feature_flag = None
107+
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456790, 2345, 'eyJ0cmFmZmljVHlwZU5hbWUiOiJ1c2VyIiwiaWQiOiIzM2VhZmE1MC0xYTY1LTExZWQtOTBkZi1mYTMwZDk2OTA0NDUiLCJuYW1lIjoiYmlsYWxfc3BsaXQiLCJ0cmFmZmljQWxsb2NhdGlvbiI6MTAwLCJ0cmFmZmljQWxsb2NhdGlvblNlZWQiOi0xMzY0MTE5MjgyLCJzZWVkIjotNjA1OTM4ODQzLCJzdGF0dXMiOiJBQ1RJVkUiLCJraWxsZWQiOmZhbHNlLCJkZWZhdWx0VHJlYXRtZW50Ijoib2ZmIiwiY2hhbmdlTnVtYmVyIjoxNjg0MzQwOTA4NDc1LCJhbGdvIjoyLCJjb25maWd1cmF0aW9ucyI6e30sImNvbmRpdGlvbnMiOlt7ImNvbmRpdGlvblR5cGUiOiJST0xMT1VUIiwibWF0Y2hlckdyb3VwIjp7ImNvbWJpbmVyIjoiQU5EIiwibWF0Y2hlcnMiOlt7ImtleVNlbGVjdG9yIjp7InRyYWZmaWNUeXBlIjoidXNlciJ9LCJtYXRjaGVyVHlwZSI6IklOX1NFR01FTlQiLCJuZWdhdGUiOmZhbHNlLCJ1c2VyRGVmaW5lZFNlZ21lbnRNYXRjaGVyRGF0YSI6eyJzZWdtZW50TmFtZSI6ImJpbGFsX3NlZ21lbnQifX1dfSwicGFydGl0aW9ucyI6W3sidHJlYXRtZW50Ijoib24iLCJzaXplIjowfSx7InRyZWF0bWVudCI6Im9mZiIsInNpemUiOjEwMH1dLCJsYWJlbCI6ImluIHNlZ21lbnQgYmlsYWxfc2VnbWVudCJ9LHsiY29uZGl0aW9uVHlwZSI6IlJPTExPVVQiLCJtYXRjaGVyR3JvdXAiOnsiY29tYmluZXIiOiJBTkQiLCJtYXRjaGVycyI6W3sia2V5U2VsZWN0b3IiOnsidHJhZmZpY1R5cGUiOiJ1c2VyIn0sIm1hdGNoZXJUeXBlIjoiQUxMX0tFWVMiLCJuZWdhdGUiOmZhbHNlfV19LCJwYXJ0aXRpb25zIjpbeyJ0cmVhdG1lbnQiOiJvbiIsInNpemUiOjB9LHsidHJlYXRtZW50Ijoib2ZmIiwic2l6ZSI6MTAwfV0sImxhYmVsIjoiZGVmYXVsdCBydWxlIn1dfQ==', 0))
108+
time.sleep(0.1)
109+
assert self._feature_flag.name == 'bilal_split'
110+
111+
# compression 2
112+
self._feature_flag = None
113+
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456790, 2345, 'eJzEUtFq20AQ/JUwz2c4WZZr3ZupTQh1FKjcQinGrKU95cjpZE6nh9To34ssJ3FNX0sfd3Zm53b2TgietDbF9vXIGdUMha5lDwFTQiGOmTQlchLRPJlEEZeTVJZ6oimWZTpP5WyWQMCNyoOxZPft0ZoA8TZ5aW1TUDCNg4qk/AueM5dQkyiez6IonS6mAu0IzWWSxovFLBZoA4WuhcLy8/bh+xoCL8bagaXJtixQsqbOhq1nCjW7AIVGawgUz+Qqzrr6wB4qmi9m00/JIk7TZCpAtmqgpgJF47SpOn9+UQt16s9YaS71z9NHOYQFha9Pm83Tty0EagrFM/t733RHqIFZH4wb7LDMVh+Ecc4Lv+ZsuQiNH8hXF3hLv39XXNCHbJ+v7x/X2eDmuKLA74sPihVr47jMuRpWfxy1Kwo0GLQjmv1xpBFD3+96gSP5cLVouM7QQaA1vxhK9uKmd853bEZS9jsBSwe2UDDu7mJxd2Mo/muQy81m/2X9I7+N8R/FcPmUd76zjH7X/w4AAP//90glTw==', 2))
114+
time.sleep(0.1)
115+
assert self._feature_flag.name == 'bilal_split'
116+
117+
# compression 1
118+
self._feature_flag = None
119+
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456790, 2345, 'H4sIAAkVZWQC/8WST0+DQBDFv0qzZ0ig/BF6a2xjGismUk2MaZopzOKmy9Isy0EbvrtDwbY2Xo233Tdv5se85cCMBs5FtvrYYwIlsglratTMYiKns+chcAgc24UwsF0Xczt2cm5z8Jw8DmPH9wPyqr5zKyTITb2XwpA4TJ5KWWVgRKXYxHWcX/QUkVi264W+68bjaGyxupdCJ4i9KPI9UgyYpibI9Ha1eJnT/J2QsnNxkDVaLEcOjTQrjWBKVIasFefky95BFZg05Zb2mrhh5I9vgsiL44BAIIuKTeiQVYqLotHHLyLOoT1quRjub4fztQuLxj89LpePzytClGCyd9R3umr21ErOcitUh2PTZHY29HN2+JGixMxUujNfvMB3+u2pY1AXySad3z3Mk46msACDp8W7jhly4uUpFt3qD33vDAx0gLpXkx+P1GusbdcE24M2F4uaywwVEWvxSa1Oa13Vjvn2RXradm0xCVuUVBJqNCBGV0DrX4OcLpeb+/lreh3jH8Uw/JQj3UhkxPgCCurdEnADAAA=', 1))
120+
time.sleep(0.1)
121+
assert self._feature_flag.name == 'bilal_split'

0 commit comments

Comments
 (0)