Skip to content

Commit 2049b14

Browse files
author
Matias Melograno
committed
improvements in attempting splits sync
1 parent c2b2801 commit 2049b14

File tree

2 files changed

+63
-37
lines changed

2 files changed

+63
-37
lines changed

splitio/sync/split.py

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,45 @@ def __init__(self, split_api, split_storage):
4141
_ON_DEMAND_FETCH_BACKOFF_BASE,
4242
_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT)
4343

44-
def attempt_split_sync(self, fetch_options, till=None):
44+
def _fetch_until(self, fetch_options, till=None):
45+
"""
46+
Hit endpoint, update storage and return when since==till.
47+
48+
:param fetch_options Fetch options for getting split definitions.
49+
:type fetch_options splitio.api.FetchOptions
50+
51+
:param till: Passed till from Streaming.
52+
:type till: int
53+
54+
:return: last change number
55+
:rtype: int
56+
"""
57+
while True: # Fetch until since==till
58+
change_number = self._split_storage.get_change_number()
59+
if change_number is None:
60+
change_number = -1
61+
if till is not None and till < change_number:
62+
# the passed till is less than change_number, no need to perform updates
63+
return change_number
64+
65+
try:
66+
split_changes = self._api.fetch_splits(change_number, fetch_options)
67+
except APIException as exc:
68+
_LOGGER.error('Exception raised while fetching splits')
69+
_LOGGER.debug('Exception information: ', exc_info=True)
70+
raise exc
71+
72+
for split in split_changes.get('splits', []):
73+
if split['status'] == splits.Status.ACTIVE.value:
74+
self._split_storage.put(splits.from_raw(split))
75+
else:
76+
self._split_storage.remove(split['name'])
77+
78+
self._split_storage.set_change_number(split_changes['till'])
79+
if split_changes['till'] == split_changes['since']:
80+
return split_changes['till']
81+
82+
def _attempt_split_sync(self, fetch_options, till=None):
4583
"""
4684
Hit endpoint, update storage and return True if sync is complete.
4785
@@ -58,31 +96,7 @@ def attempt_split_sync(self, fetch_options, till=None):
5896
remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
5997
while True:
6098
remaining_attempts -= 1
61-
while True: # Fetch until since==till
62-
change_number = self._split_storage.get_change_number()
63-
if change_number is None:
64-
change_number = -1
65-
if till is not None and till < change_number:
66-
# the passed till is less than change_number, no need to perform updates
67-
break
68-
69-
try:
70-
split_changes = self._api.fetch_splits(change_number, fetch_options)
71-
except APIException as exc:
72-
_LOGGER.error('Exception raised while fetching splits')
73-
_LOGGER.debug('Exception information: ', exc_info=True)
74-
raise exc
75-
76-
for split in split_changes.get('splits', []):
77-
if split['status'] == splits.Status.ACTIVE.value:
78-
self._split_storage.put(splits.from_raw(split))
79-
else:
80-
self._split_storage.remove(split['name'])
81-
82-
self._split_storage.set_change_number(split_changes['till'])
83-
if split_changes['till'] == split_changes['since']:
84-
break
85-
99+
change_number = self._fetch_until(fetch_options, till)
86100
if till is None or till <= change_number:
87101
return True, remaining_attempts, change_number
88102
elif remaining_attempts <= 0:
@@ -98,14 +112,14 @@ def synchronize_splits(self, till=None):
98112
:type till: int
99113
"""
100114
fetch_options = FetchOptions(True) # Set Cache-Control to no-cache
101-
successful_sync, remaining_attempts, change_number = self.attempt_split_sync(fetch_options,
102-
till)
115+
successful_sync, remaining_attempts, change_number = self._attempt_split_sync(fetch_options,
116+
till)
103117
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
104118
if successful_sync: # succedeed sync
105119
_LOGGER.debug('Refresh completed in %s attempts.', attempts)
106120
return
107121
with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN
108-
without_cdn_successful_sync, remaining_attempts, change_number = self.attempt_split_sync(with_cdn_bypass, till)
122+
without_cdn_successful_sync, remaining_attempts, change_number = self._attempt_split_sync(with_cdn_bypass, till)
109123
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
110124
if without_cdn_successful_sync:
111125
_LOGGER.debug('Refresh completed bypassing the CDN in %s attempts.',

tests/sync/test_splits_synchronizer.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,10 @@ def change_number_mock():
142142
change_number_mock._calls += 1
143143
if change_number_mock._calls == 1:
144144
return -1
145-
elif change_number_mock._calls < 8:
145+
elif change_number_mock._calls >= 2 and change_number_mock._calls <= 3:
146146
return 123
147+
elif change_number_mock._calls <= 7:
148+
return 1234
147149
return 12345 # Return proper cn for CDN Bypass
148150
change_number_mock._calls = 0
149151
storage.get_change_number.side_effect = change_number_mock
@@ -199,18 +201,29 @@ def get_changes(*args, **kwargs):
199201
'since': 123,
200202
'till': 123
201203
}
202-
elif get_changes.called > 2 and get_changes.called < 5: # Simulating condition to equal since and till
204+
elif get_changes.called == 3:
203205
return {
204206
'splits': [],
205207
'since': 123,
206-
'till': 12345
208+
'till': 1234
207209
}
208-
else:
210+
elif get_changes.called >= 4 and get_changes.called <= 6:
211+
return {
212+
'splits': [],
213+
'since': 1234,
214+
'till': 1234
215+
}
216+
elif get_changes.called == 7:
209217
return {
210218
'splits': [],
211-
'since': 12345,
219+
'since': 1234,
212220
'till': 12345
213221
}
222+
return {
223+
'splits': [],
224+
'since': 12345,
225+
'till': 12345
226+
}
214227
get_changes.called = 0
215228

216229
api.fetch_splits.side_effect = get_changes
@@ -223,9 +236,8 @@ def get_changes(*args, **kwargs):
223236

224237
split_synchronizer._backoff = Backoff(1, 0.1)
225238
split_synchronizer.synchronize_splits(12345)
226-
227-
assert mocker.call(12345, FetchOptions(True, 123)) in api.fetch_splits.mock_calls
228-
assert len(api.fetch_splits.mock_calls) == 8 # 2 ok + 2 since == till + 3 backoff + 1 CDN
239+
assert mocker.call(12345, FetchOptions(True, 1234)) in api.fetch_splits.mock_calls
240+
assert len(api.fetch_splits.mock_calls) == 8 # 2 ok + BACKOFF(2 since==till + 2 re-attempts) + CDN(2 since==till)
229241

230242
inserted_split = storage.put.mock_calls[0][1][0]
231243
assert isinstance(inserted_split, Split)

0 commit comments

Comments
 (0)