Skip to content

Commit 72300f7

Browse files
authored
Merge pull request #379 from splitio/iff-change-number-check
Added discard split event if change number is null
2 parents b9cd98f + 1d5ced7 commit 72300f7

File tree

3 files changed

+81
-26
lines changed

3 files changed

+81
-26
lines changed

splitio/push/parser.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -503,9 +503,9 @@ def _parse_update(channel, timestamp, data):
503503
"""
504504
update_type = UpdateType(data['type'])
505505
change_number = data['changeNumber']
506-
if update_type == UpdateType.SPLIT_UPDATE:
506+
if update_type == UpdateType.SPLIT_UPDATE and change_number is not None:
507507
return SplitChangeUpdate(channel, timestamp, change_number, data.get('pcn'), data.get('d'), data.get('c'))
508-
elif update_type == UpdateType.SPLIT_KILL:
508+
elif update_type == UpdateType.SPLIT_KILL and change_number is not None:
509509
return SplitKillUpdate(channel, timestamp, change_number,
510510
data['splitName'], data['defaultTreatment'])
511511
elif update_type == UpdateType.SEGMENT_UPDATE:

splitio/push/splitworker.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ def _get_feature_flag_definition(self, event):
5454
cm = CompressionMode(event.compression) # will throw if the number is not defined in compression mode
5555
return self._compression_handlers[cm](event)
5656

57+
def _check_instant_ff_update(self, event):
58+
if event.update_type == UpdateType.SPLIT_UPDATE and event.compression is not None and event.previous_change_number == self._feature_flag_storage.get_change_number():
59+
return True
60+
return False
61+
62+
5763
def _run(self):
5864
"""Run worker handler."""
5965
while self.is_running():
@@ -64,17 +70,16 @@ def _run(self):
6470
continue
6571
_LOGGER.debug('Processing feature flag update %d', event.change_number)
6672
try:
67-
if event.update_type == UpdateType.SPLIT_UPDATE:
68-
if event.compression is not None and event.previous_change_number == self._feature_flag_storage.get_change_number():
69-
try:
70-
self._feature_flag_storage.put(from_raw(json.loads(self._get_feature_flag_definition(event))))
71-
self._feature_flag_storage.set_change_number(event.change_number)
72-
continue
73-
except Exception as e:
74-
_LOGGER.error('Exception raised in updating feature flag')
75-
_LOGGER.debug(str(e))
76-
_LOGGER.debug('Exception information: ', exc_info=True)
77-
pass
73+
if self._check_instant_ff_update(event):
74+
try:
75+
self._feature_flag_storage.put(from_raw(json.loads(self._get_feature_flag_definition(event))))
76+
self._feature_flag_storage.set_change_number(event.change_number)
77+
continue
78+
except Exception as e:
79+
_LOGGER.error('Exception raised in updating feature flag')
80+
_LOGGER.debug(str(e))
81+
_LOGGER.debug('Exception information: ', exc_info=True)
82+
pass
7883
self._handler(event.change_number)
7984
except Exception as e: # pylint: disable=broad-except
8085
_LOGGER.error('Exception raised in feature flag synchronization')

tests/integration/test_streaming_e2e.py

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -109,20 +109,10 @@ def test_happiness(self):
109109
assert factory.client().get_treatment('pindon', 'split2') == 'off'
110110
assert factory.client().get_treatment('maldo', 'split2') == 'on'
111111

112-
# test if changeNumber is missing
113-
# split_changes = make_split_fast_change_event(4)
114-
# data = json.loads(split_changes['data'])
115-
# inner_data = json.loads(data['data'])
116-
# inner_data['changeNumber'] = None
117-
# data['data'] = json.dumps(inner_data)
118-
# split_changes['data'] = json.dumps(data)
119-
# sse_server.publish(split_changes)
120-
# time.sleep(1)
121-
# assert factory.client().get_treatment('maldo', 'split1') == 'off'
122-
123112
sse_server.publish(make_split_fast_change_event(4))
124113
time.sleep(1)
125-
assert factory.client().get_treatment('maldo', 'split1') == 'on'
114+
assert factory.client().get_treatment('maldo', 'split5') == 'on'
115+
126116

127117
# Validate the SSE request
128118
sse_request = sse_requests.get()
@@ -1233,6 +1223,66 @@ def test_ably_errors_handling(self):
12331223
sse_server.stop()
12341224
split_backend.stop()
12351225

1226+
def test_change_number(mocker):
1227+
# test if changeNumber is missing
1228+
auth_server_response = {
1229+
'pushEnabled': True,
1230+
'token': ('eyJhbGciOiJIUzI1NiIsImtpZCI6IjVZOU05US45QnJtR0EiLCJ0eXAiOiJKV1QifQ.'
1231+
'eyJ4LWFibHktY2FwYWJpbGl0eSI6IntcIk1UWXlNVGN4T1RRNE13PT1fTWpBNE16Y3pO'
1232+
'RFUxTWc9PV9zZWdtZW50c1wiOltcInN1YnNjcmliZVwiXSxcIk1UWXlNVGN4T1RRNE13P'
1233+
'T1fTWpBNE16Y3pORFUxTWc9PV9zcGxpdHNcIjpbXCJzdWJzY3JpYmVcIl0sXCJjb250cm'
1234+
'9sX3ByaVwiOltcInN1YnNjcmliZVwiLFwiY2hhbm5lbC1tZXRhZGF0YTpwdWJsaXNoZXJ'
1235+
'zXCJdLFwiY29udHJvbF9zZWNcIjpbXCJzdWJzY3JpYmVcIixcImNoYW5uZWwtbWV0YWRh'
1236+
'dGE6cHVibGlzaGVyc1wiXX0iLCJ4LWFibHktY2xpZW50SWQiOiJjbGllbnRJZCIsImV4c'
1237+
'CI6MTYwNDEwMDU5MSwiaWF0IjoxNjA0MDk2OTkxfQ.aP9BfR534K6J9h8gfDWg_CQgpz5E'
1238+
'vJh17WlOlAKhcD0')
1239+
}
1240+
1241+
split_changes = {
1242+
-1: {
1243+
'since': -1,
1244+
'till': 1,
1245+
'splits': [make_simple_split('split1', 1, True, False, 'off', 'user', True)]
1246+
},
1247+
1: {'since': 1, 'till': 1, 'splits': []}
1248+
}
1249+
1250+
segment_changes = {}
1251+
split_backend_requests = Queue()
1252+
split_backend = SplitMockServer(split_changes, segment_changes, split_backend_requests,
1253+
auth_server_response)
1254+
sse_requests = Queue()
1255+
sse_server = SSEMockServer(sse_requests)
1256+
1257+
split_backend.start()
1258+
sse_server.start()
1259+
sse_server.publish(make_initial_event())
1260+
sse_server.publish(make_occupancy('control_pri', 2))
1261+
sse_server.publish(make_occupancy('control_sec', 2))
1262+
1263+
kwargs = {
1264+
'sdk_api_base_url': 'http://localhost:%d/api' % split_backend.port(),
1265+
'events_api_base_url': 'http://localhost:%d/api' % split_backend.port(),
1266+
'auth_api_base_url': 'http://localhost:%d/api' % split_backend.port(),
1267+
'streaming_api_base_url': 'http://localhost:%d' % sse_server.port(),
1268+
'config': {'connectTimeout': 10000, 'featuresRefreshRate': 10}
1269+
}
1270+
1271+
factory = get_factory('some_apikey', **kwargs)
1272+
factory.block_until_ready(1)
1273+
assert factory.ready
1274+
time.sleep(2)
1275+
1276+
split_changes = make_split_fast_change_event(5).copy()
1277+
data = json.loads(split_changes['data'])
1278+
inner_data = json.loads(data['data'])
1279+
inner_data['changeNumber'] = None
1280+
data['data'] = json.dumps(inner_data)
1281+
split_changes['data'] = json.dumps(data)
1282+
sse_server.publish(split_changes)
1283+
time.sleep(1)
1284+
assert factory._storages['splits'].get_change_number() == 1
1285+
12361286

12371287
def make_split_change_event(change_number):
12381288
"""Make a split change event."""
@@ -1253,7 +1303,7 @@ def make_split_change_event(change_number):
12531303

12541304
def make_split_fast_change_event(change_number):
12551305
"""Make a split change event."""
1256-
json1 = make_simple_split('split1', 1, True, False, 'off', 'user', True)
1306+
json1 = make_simple_split('split5', 1, True, False, 'off', 'user', True)
12571307
str1 = json.dumps(json1)
12581308
byt1 = bytes(str1, encoding='utf-8')
12591309
compressed = base64.b64encode(byt1)

0 commit comments

Comments
 (0)