Skip to content

Commit c86273a

Browse files
committed
Updated push.splitworker and sync.split
1 parent 3e27c40 commit c86273a

File tree

5 files changed

+163
-52
lines changed

5 files changed

+163
-52
lines changed

splitio/push/splitworker.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from splitio.models.splits import from_raw, Status
1111
from splitio.models.telemetry import UpdateFromSSE
1212
from splitio.push.parser import UpdateType
13-
13+
from splitio.util.storage_helper import update_feature_flag_storage
1414

1515
_LOGGER = logging.getLogger(__name__)
1616

@@ -88,17 +88,20 @@ def _run(self):
8888
try:
8989
if self._check_instant_ff_update(event):
9090
try:
91-
new_split = from_raw(json.loads(self._get_feature_flag_definition(event)))
91+
new_feature_flag = from_raw(json.loads(self._get_feature_flag_definition(event)))
92+
segment_list = update_feature_flag_storage(self._feature_flag_storage, [new_feature_flag], event.change_number)
93+
'''
9294
if new_split.status == Status.ACTIVE:
9395
self._feature_flag_storage.put(new_split)
9496
_LOGGER.debug('Feature flag %s is updated', new_split.name)
95-
for segment_name in new_split.get_segment_names():
96-
if self._segment_storage.get(segment_name) is None:
97-
_LOGGER.debug('Fetching new segment %s', segment_name)
98-
self._segment_handler(segment_name, event.change_number)
9997
else:
10098
self._feature_flag_storage.remove(new_split.name)
10199
self._feature_flag_storage.set_change_number(event.change_number)
100+
'''
101+
for segment_name in segment_list:
102+
if self._segment_storage.get(segment_name) is None:
103+
_LOGGER.debug('Fetching new segment %s', segment_name)
104+
self._segment_handler(segment_name, event.change_number)
102105
self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
103106
continue
104107
except Exception as e:

splitio/sync/split.py

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from splitio.models import splits
1414
from splitio.util.backoff import Backoff
1515
from splitio.util.time import get_current_epoch_time_ms
16+
from splitio.util.storage_helper import update_feature_flag_storage
1617
from splitio.sync import util
1718

1819
_LEGACY_COMMENT_LINE_RE = re.compile(r'^#.*$')
@@ -79,7 +80,10 @@ def _fetch_until(self, fetch_options, till=None):
7980
_LOGGER.error('Exception raised while fetching feature flags')
8081
_LOGGER.debug('Exception information: ', exc_info=True)
8182
raise exc
82-
83+
fetched_feature_flags = []
84+
[fetched_feature_flags.append(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
85+
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes['till'])
86+
'''
8387
to_add = []
8488
to_delete = []
8589
for feature_flag in feature_flag_changes.get('splits', []):
@@ -93,25 +97,10 @@ def _fetch_until(self, fetch_options, till=None):
9397
to_delete.append(feature_flag['name'])
9498
9599
self._feature_flag_storage.update(to_add, to_delete, feature_flag_changes['till'])
100+
'''
96101
if feature_flag_changes['till'] == feature_flag_changes['since']:
97102
return feature_flag_changes['till'], segment_list
98103

99-
def _check_flag_sets(self, feature_flag):
100-
"""
101-
Check all flag sets in a feature flag, return True if any of sets exist in storage
102-
103-
:param feature_flag: Flag set to validate.
104-
:type feature_flag: json
105-
106-
:return: True if any of its flag_set exist. False otherwise.
107-
:rtype: bool
108-
"""
109-
for flag_set in feature_flag['sets']:
110-
if self._feature_flag_storage.is_flag_set_exist(flag_set):
111-
return True
112-
return False
113-
114-
115104
def _attempt_feature_flag_sync(self, fetch_options, till=None):
116105
"""
117106
Hit endpoint, update storage and return True if sync is complete.

splitio/util/storage_helper.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""Storage Helper."""
2+
3+
from splitio.models import splits
4+
5+
def update_feature_flag_storage(feature_flag_storage, feature_flags, change_number):
6+
"""
7+
Update feature flag storage from given list of feature flags while checking the flag set logic
8+
9+
:param feature_flag_storage: Feature flag storage instance
10+
:type feature_flag_storage: splitio.storage.inmemory.InMemorySplitStorage
11+
:param feature_flag: Feature flag instance to validate.
12+
:type feature_flag: splitio.models.splits.Split
13+
:param: last change number
14+
:type: int
15+
16+
:return: segments list from feature flags list
17+
:rtype: list(str)
18+
"""
19+
segment_list = set()
20+
to_add = []
21+
to_delete = []
22+
for feature_flag in feature_flags:
23+
if (feature_flag_storage.config_flag_sets_used == 0 and feature_flag.status == splits.Status.ACTIVE) or \
24+
(feature_flag.status == splits.Status.ACTIVE and _check_flag_sets(feature_flag_storage, feature_flag)):
25+
to_add.append(feature_flag)
26+
segment_list.update(set(feature_flag.get_segment_names()))
27+
else:
28+
if feature_flag_storage.get(feature_flag.name) is not None:
29+
to_delete.append(feature_flag.name)
30+
31+
feature_flag_storage.update(to_add, to_delete, change_number)
32+
return segment_list
33+
34+
def _check_flag_sets(feature_flag_storage, feature_flag):
35+
"""
36+
Check all flag sets in a feature flag, return True if any of sets exist in storage
37+
38+
:param feature_flag_storage: Feature flag storage instance
39+
:type feature_flag_storage: splitio.storage.inmemory.InMemorySplitStorage
40+
:param feature_flag: Feature flag instance to validate.
41+
:type feature_flag: splitio.models.splits.Split
42+
43+
:return: True if any of its flag_set exist. False otherwise.
44+
:rtype: bool
45+
"""
46+
for flag_set in feature_flag.sets:
47+
if feature_flag_storage.is_flag_set_exist(flag_set):
48+
return True
49+
return False

tests/push/test_split_worker.py

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,13 @@ def test_handler(self, mocker):
5555

5656
def get_change_number():
5757
return 2345
58-
59-
self._feature_flag = None
60-
def put(feature_flag):
61-
self._feature_flag = feature_flag
58+
split_worker._feature_flag_storage.get_change_number = get_change_number
6259

6360
self.new_change_number = 0
64-
def set_change_number(new_change_number):
65-
self.new_change_number = new_change_number
66-
67-
split_worker._feature_flag_storage.get_change_number = get_change_number
68-
split_worker._feature_flag_storage.set_change_number = set_change_number
69-
split_worker._feature_flag_storage.put = put
61+
def update(to_add, to_delete, change_number):
62+
self.new_change_number = change_number
63+
split_worker._feature_flag_storage.update = update
64+
split_worker._feature_flag_storage.config_flag_sets_used = 0
7065

7166
# should call the handler
7267
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456790, 12345, "{}", 1))
@@ -98,45 +93,45 @@ def test_compression(self, mocker):
9893
split_worker.start()
9994
def get_change_number():
10095
return 2345
101-
102-
def put(feature_flag):
103-
self._feature_flag = feature_flag
104-
105-
def remove(feature_flag):
106-
self._feature_flag_delete = feature_flag
107-
10896
split_worker._feature_flag_storage.get_change_number = get_change_number
109-
split_worker._feature_flag_storage.put = put
110-
split_worker._feature_flag_storage.remove = remove
97+
98+
self._feature_flag_added = None
99+
self._feature_flag_deleted = None
100+
def update(feature_flag_add, feature_flag_delete, change_number):
101+
self._feature_flag_added = feature_flag_add
102+
self._feature_flag_deleted = feature_flag_delete
103+
split_worker._feature_flag_storage.update = update
104+
split_worker._feature_flag_storage.config_flag_sets_used = 0
111105

112106
# compression 0
113-
self._feature_flag = None
107+
self._feature_flag_added = None
114108
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456790, 2345, 'eyJ0cmFmZmljVHlwZU5hbWUiOiJ1c2VyIiwiaWQiOiIzM2VhZmE1MC0xYTY1LTExZWQtOTBkZi1mYTMwZDk2OTA0NDUiLCJuYW1lIjoiYmlsYWxfc3BsaXQiLCJ0cmFmZmljQWxsb2NhdGlvbiI6MTAwLCJ0cmFmZmljQWxsb2NhdGlvblNlZWQiOi0xMzY0MTE5MjgyLCJzZWVkIjotNjA1OTM4ODQzLCJzdGF0dXMiOiJBQ1RJVkUiLCJraWxsZWQiOmZhbHNlLCJkZWZhdWx0VHJlYXRtZW50Ijoib2ZmIiwiY2hhbmdlTnVtYmVyIjoxNjg0MzQwOTA4NDc1LCJhbGdvIjoyLCJjb25maWd1cmF0aW9ucyI6e30sImNvbmRpdGlvbnMiOlt7ImNvbmRpdGlvblR5cGUiOiJST0xMT1VUIiwibWF0Y2hlckdyb3VwIjp7ImNvbWJpbmVyIjoiQU5EIiwibWF0Y2hlcnMiOlt7ImtleVNlbGVjdG9yIjp7InRyYWZmaWNUeXBlIjoidXNlciJ9LCJtYXRjaGVyVHlwZSI6IklOX1NFR01FTlQiLCJuZWdhdGUiOmZhbHNlLCJ1c2VyRGVmaW5lZFNlZ21lbnRNYXRjaGVyRGF0YSI6eyJzZWdtZW50TmFtZSI6ImJpbGFsX3NlZ21lbnQifX1dfSwicGFydGl0aW9ucyI6W3sidHJlYXRtZW50Ijoib24iLCJzaXplIjowfSx7InRyZWF0bWVudCI6Im9mZiIsInNpemUiOjEwMH1dLCJsYWJlbCI6ImluIHNlZ21lbnQgYmlsYWxfc2VnbWVudCJ9LHsiY29uZGl0aW9uVHlwZSI6IlJPTExPVVQiLCJtYXRjaGVyR3JvdXAiOnsiY29tYmluZXIiOiJBTkQiLCJtYXRjaGVycyI6W3sia2V5U2VsZWN0b3IiOnsidHJhZmZpY1R5cGUiOiJ1c2VyIn0sIm1hdGNoZXJUeXBlIjoiQUxMX0tFWVMiLCJuZWdhdGUiOmZhbHNlfV19LCJwYXJ0aXRpb25zIjpbeyJ0cmVhdG1lbnQiOiJvbiIsInNpemUiOjB9LHsidHJlYXRtZW50Ijoib2ZmIiwic2l6ZSI6MTAwfV0sImxhYmVsIjoiZGVmYXVsdCBydWxlIn1dfQ==', 0))
115109
time.sleep(0.1)
116-
assert self._feature_flag.name == 'bilal_split'
110+
# pytest.set_trace()
111+
assert self._feature_flag_added[0].name == 'bilal_split'
117112
assert telemetry_storage._counters._update_from_sse['sp'] == 1
118113

119114
# compression 2
120-
self._feature_flag = None
115+
self._feature_flag_added = None
121116
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456790, 2345, 'eJzEUtFq20AQ/JUwz2c4WZZr3ZupTQh1FKjcQinGrKU95cjpZE6nh9To34ssJ3FNX0sfd3Zm53b2TgietDbF9vXIGdUMha5lDwFTQiGOmTQlchLRPJlEEZeTVJZ6oimWZTpP5WyWQMCNyoOxZPft0ZoA8TZ5aW1TUDCNg4qk/AueM5dQkyiez6IonS6mAu0IzWWSxovFLBZoA4WuhcLy8/bh+xoCL8bagaXJtixQsqbOhq1nCjW7AIVGawgUz+Qqzrr6wB4qmi9m00/JIk7TZCpAtmqgpgJF47SpOn9+UQt16s9YaS71z9NHOYQFha9Pm83Tty0EagrFM/t733RHqIFZH4wb7LDMVh+Ecc4Lv+ZsuQiNH8hXF3hLv39XXNCHbJ+v7x/X2eDmuKLA74sPihVr47jMuRpWfxy1Kwo0GLQjmv1xpBFD3+96gSP5cLVouM7QQaA1vxhK9uKmd853bEZS9jsBSwe2UDDu7mJxd2Mo/muQy81m/2X9I7+N8R/FcPmUd76zjH7X/w4AAP//90glTw==', 2))
122117
time.sleep(0.1)
123-
assert self._feature_flag.name == 'bilal_split'
118+
assert self._feature_flag_added[0].name == 'bilal_split'
124119
assert telemetry_storage._counters._update_from_sse['sp'] == 2
125120

126121
# compression 1
127-
self._feature_flag = None
122+
self._feature_flag_added = None
128123
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456790, 2345, 'H4sIAAkVZWQC/8WST0+DQBDFv0qzZ0ig/BF6a2xjGismUk2MaZopzOKmy9Isy0EbvrtDwbY2Xo233Tdv5se85cCMBs5FtvrYYwIlsglratTMYiKns+chcAgc24UwsF0Xczt2cm5z8Jw8DmPH9wPyqr5zKyTITb2XwpA4TJ5KWWVgRKXYxHWcX/QUkVi264W+68bjaGyxupdCJ4i9KPI9UgyYpibI9Ha1eJnT/J2QsnNxkDVaLEcOjTQrjWBKVIasFefky95BFZg05Zb2mrhh5I9vgsiL44BAIIuKTeiQVYqLotHHLyLOoT1quRjub4fztQuLxj89LpePzytClGCyd9R3umr21ErOcitUh2PTZHY29HN2+JGixMxUujNfvMB3+u2pY1AXySad3z3Mk46msACDp8W7jhly4uUpFt3qD33vDAx0gLpXkx+P1GusbdcE24M2F4uaywwVEWvxSa1Oa13Vjvn2RXradm0xCVuUVBJqNCBGV0DrX4OcLpeb+/lreh3jH8Uw/JQj3UhkxPgCCurdEnADAAA=', 1))
129124
time.sleep(0.1)
130-
assert self._feature_flag.name == 'bilal_split'
125+
assert self._feature_flag_added[0].name == 'bilal_split'
131126
assert telemetry_storage._counters._update_from_sse['sp'] == 3
132127

133128
# should call delete split
134-
self._feature_flag = None
135-
self._feature_flag_delete = None
129+
self._feature_flag_added = None
130+
self._feature_flag_deleted = None
136131
q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456790, 2345, 'eyJ0cmFmZmljVHlwZU5hbWUiOiAidXNlciIsICJpZCI6ICIzM2VhZmE1MC0xYTY1LTExZWQtOTBkZi1mYTMwZDk2OTA0NDUiLCAibmFtZSI6ICJiaWxhbF9zcGxpdCIsICJ0cmFmZmljQWxsb2NhdGlvbiI6IDEwMCwgInRyYWZmaWNBbGxvY2F0aW9uU2VlZCI6IC0xMzY0MTE5MjgyLCAic2VlZCI6IC02MDU5Mzg4NDMsICJzdGF0dXMiOiAiQVJDSElWRUQiLCAia2lsbGVkIjogZmFsc2UsICJkZWZhdWx0VHJlYXRtZW50IjogIm9mZiIsICJjaGFuZ2VOdW1iZXIiOiAxNjg0Mjc1ODM5OTUyLCAiYWxnbyI6IDIsICJjb25maWd1cmF0aW9ucyI6IHt9LCAiY29uZGl0aW9ucyI6IFt7ImNvbmRpdGlvblR5cGUiOiAiUk9MTE9VVCIsICJtYXRjaGVyR3JvdXAiOiB7ImNvbWJpbmVyIjogIkFORCIsICJtYXRjaGVycyI6IFt7ImtleVNlbGVjdG9yIjogeyJ0cmFmZmljVHlwZSI6ICJ1c2VyIn0sICJtYXRjaGVyVHlwZSI6ICJJTl9TRUdNRU5UIiwgIm5lZ2F0ZSI6IGZhbHNlLCAidXNlckRlZmluZWRTZWdtZW50TWF0Y2hlckRhdGEiOiB7InNlZ21lbnROYW1lIjogImJpbGFsX3NlZ21lbnQifX1dfSwgInBhcnRpdGlvbnMiOiBbeyJ0cmVhdG1lbnQiOiAib24iLCAic2l6ZSI6IDB9LCB7InRyZWF0bWVudCI6ICJvZmYiLCAic2l6ZSI6IDEwMH1dLCAibGFiZWwiOiAiaW4gc2VnbWVudCBiaWxhbF9zZWdtZW50In0sIHsiY29uZGl0aW9uVHlwZSI6ICJST0xMT1VUIiwgIm1hdGNoZXJHcm91cCI6IHsiY29tYmluZXIiOiAiQU5EIiwgIm1hdGNoZXJzIjogW3sia2V5U2VsZWN0b3IiOiB7InRyYWZmaWNUeXBlIjogInVzZXIifSwgIm1hdGNoZXJUeXBlIjogIkFMTF9LRVlTIiwgIm5lZ2F0ZSI6IGZhbHNlfV19LCAicGFydGl0aW9ucyI6IFt7InRyZWF0bWVudCI6ICJvbiIsICJzaXplIjogMH0sIHsidHJlYXRtZW50IjogIm9mZiIsICJzaXplIjogMTAwfV0sICJsYWJlbCI6ICJkZWZhdWx0IHJ1bGUifV19', 0))
137132
time.sleep(0.1)
138-
assert self._feature_flag_delete == 'bilal_split'
139-
assert self._feature_flag == None
133+
assert self._feature_flag_deleted[0] == 'bilal_split'
134+
self._feature_flag_added = None
140135

141136
def test_edge_cases(self, mocker):
142137
q = queue.Queue()

tests/util/test_storage_helper.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
"""Storage Helper tests."""
2+
3+
from splitio.util.storage_helper import update_feature_flag_storage
4+
from splitio.storage.inmemmory import InMemorySplitStorage
5+
from splitio.models import splits
6+
from tests.sync.test_splits_synchronizer import splits as split_sample
7+
8+
class StorageHelperTests(object):
9+
10+
def test_helper_scenarios(self, mocker):
11+
storage = mocker.Mock(spec=InMemorySplitStorage)
12+
split = splits.from_raw(split_sample[0])
13+
14+
self.added = []
15+
self.deleted = []
16+
self.change_number = 0
17+
def update(to_add, to_delete, change_number):
18+
self.added = to_add
19+
self.deleted = to_delete
20+
self.change_number = change_number
21+
storage.update = update
22+
23+
def is_flag_set_exist(flag_set):
24+
return False
25+
storage.is_flag_set_exist = is_flag_set_exist
26+
27+
storage.config_flag_sets_used = 0
28+
update_feature_flag_storage(storage, [split], 123)
29+
assert self.added[0] == split
30+
assert self.deleted == []
31+
assert self.change_number == 123
32+
33+
storage.config_flag_sets_used = 2
34+
update_feature_flag_storage(storage, [split], 123)
35+
assert self.added == []
36+
assert self.deleted[0] == split.name
37+
38+
def is_flag_set_exist2(flag_set):
39+
return True
40+
storage.is_flag_set_exist = is_flag_set_exist2
41+
update_feature_flag_storage(storage, [split], 123)
42+
assert self.added[0] == split
43+
assert self.deleted == []
44+
45+
split_json = split_sample[0]
46+
split_json['conditions'].append({
47+
"matcherGroup": {
48+
"combiner": "AND",
49+
"matchers": [
50+
{
51+
"matcherType": "IN_SEGMENT",
52+
"negate": False,
53+
"userDefinedSegmentMatcherData": {
54+
"segmentName": "segment1"
55+
},
56+
"whitelistMatcherData": None
57+
}
58+
]
59+
},
60+
"partitions": [
61+
{
62+
"treatment": "on",
63+
"size": 30
64+
},
65+
{
66+
"treatment": "off",
67+
"size": 70
68+
}
69+
]
70+
}
71+
)
72+
73+
split = splits.from_raw(split_json)
74+
storage.config_flag_sets_used = 0
75+
assert update_feature_flag_storage(storage, [split], 123) == {'segment1'}

0 commit comments

Comments
 (0)