Skip to content

Commit 802a2f2

Browse files
authored
Merge pull request #261 from splitio/development
Development
2 parents c5c84e4 + a358613 commit 802a2f2

File tree

9 files changed

+143
-29
lines changed

9 files changed

+143
-29
lines changed

CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
9.1.3 (July 25, 2022)
2+
- Fixed synching missed segment(s) after receiving split update
3+
14
9.1.2 (April 6, 2022)
25
- Updated pyyaml dependency for vulnerability CVE-2020-14343.
36

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
'flake8',
99
'pytest==7.0.1',
1010
'pytest-mock>=3.5.1',
11-
'coverage',
11+
'coverage==6.2',
1212
'pytest-cov',
1313
'importlib-metadata==4.2',
1414
'tomli==1.2.3',

splitio/sync/segment.py

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -139,32 +139,56 @@ def synchronize_segment(self, segment_name, till=None):
139139
:param till: ChangeNumber received.
140140
:type till: int
141141
142+
:return: True if no error occurs. False otherwise.
143+
:rtype: bool
142144
"""
143145
fetch_options = FetchOptions(True) # Set Cache-Control to no-cache
144146
successful_sync, remaining_attempts, change_number = self._attempt_segment_sync(segment_name, fetch_options, till)
145147
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
146148
if successful_sync: # succedeed sync
147149
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
148-
return
150+
return True
149151
with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN
150152
without_cdn_successful_sync, remaining_attempts, change_number = self._attempt_segment_sync(segment_name, with_cdn_bypass, till)
151153
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
152154
if without_cdn_successful_sync:
153155
_LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.',
154156
without_cdn_attempts)
155-
return
156-
else:
157-
_LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.',
158-
without_cdn_attempts)
157+
return True
158+
_LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.',
159+
without_cdn_attempts)
160+
return False
159161

160-
def synchronize_segments(self):
162+
def synchronize_segments(self, segment_names = None, dont_wait = False):
161163
"""
162-
Submit all current segments and wait for them to finish, then set the ready flag.
164+
Submit all current segments and wait for them to finish depend on dont_wait flag, then set the ready flag.
163165
164-
:return: True if no error occurs. False otherwise.
166+
:param segment_names: Optional, array of segment names to update.
167+
:type segment_name: {str}
168+
169+
:param dont_wait: Optional, instruct the function to not wait for task completion
170+
:type segment_name: boolean
171+
172+
:return: True if no error occurs or dont_wait flag is True. False otherwise.
165173
:rtype: bool
166174
"""
167-
segment_names = self._split_storage.get_segment_names()
175+
if segment_names is None:
176+
segment_names = self._split_storage.get_segment_names()
177+
168178
for segment_name in segment_names:
169179
self._worker_pool.submit_work(segment_name)
180+
if (dont_wait):
181+
return True
170182
return not self._worker_pool.wait_for_completion()
183+
184+
def segment_exist_in_storage(self, segment_name):
185+
"""
186+
Check if a segment exists in the storage
187+
188+
:param segment_name: Name of the segment
189+
:type segment_name: str
190+
191+
:return: True if segment exist. False otherwise.
192+
:rtype: bool
193+
"""
194+
return self._segment_storage.get(segment_name) != None

splitio/sync/split.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,14 @@ def _fetch_until(self, fetch_options, till=None):
5555
:return: last change number
5656
:rtype: int
5757
"""
58+
segment_list = set()
5859
while True: # Fetch until since==till
5960
change_number = self._split_storage.get_change_number()
6061
if change_number is None:
6162
change_number = -1
6263
if till is not None and till < change_number:
6364
# the passed till is less than change_number, no need to perform updates
64-
return change_number
65+
return change_number, segment_list
6566

6667
try:
6768
split_changes = self._api.fetch_splits(change_number, fetch_options)
@@ -72,13 +73,14 @@ def _fetch_until(self, fetch_options, till=None):
7273

7374
for split in split_changes.get('splits', []):
7475
if split['status'] == splits.Status.ACTIVE.value:
75-
self._split_storage.put(splits.from_raw(split))
76+
parsed = splits.from_raw(split)
77+
self._split_storage.put(parsed)
78+
segment_list.update(set(parsed.get_segment_names()))
7679
else:
7780
self._split_storage.remove(split['name'])
78-
7981
self._split_storage.set_change_number(split_changes['till'])
8082
if split_changes['till'] == split_changes['since']:
81-
return split_changes['till']
83+
return split_changes['till'], segment_list
8284

8385
def _attempt_split_sync(self, fetch_options, till=None):
8486
"""
@@ -94,14 +96,16 @@ def _attempt_split_sync(self, fetch_options, till=None):
9496
:rtype: bool, int, int
9597
"""
9698
self._backoff.reset()
99+
final_segment_list = set()
97100
remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
98101
while True:
99102
remaining_attempts -= 1
100-
change_number = self._fetch_until(fetch_options, till)
103+
change_number, segment_list = self._fetch_until(fetch_options, till)
104+
final_segment_list.update(segment_list)
101105
if till is None or till <= change_number:
102-
return True, remaining_attempts, change_number
106+
return True, remaining_attempts, change_number, final_segment_list
103107
elif remaining_attempts <= 0:
104-
return False, remaining_attempts, change_number
108+
return False, remaining_attempts, change_number, final_segment_list
105109
how_long = self._backoff.get()
106110
time.sleep(how_long)
107111

@@ -112,20 +116,23 @@ def synchronize_splits(self, till=None):
112116
:param till: Passed till from Streaming.
113117
:type till: int
114118
"""
119+
final_segment_list = set()
115120
fetch_options = FetchOptions(True) # Set Cache-Control to no-cache
116-
successful_sync, remaining_attempts, change_number = self._attempt_split_sync(fetch_options,
121+
successful_sync, remaining_attempts, change_number, segment_list = self._attempt_split_sync(fetch_options,
117122
till)
123+
final_segment_list.update(segment_list)
118124
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
119125
if successful_sync: # succedeed sync
120126
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
121-
return
127+
return final_segment_list
122128
with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN
123-
without_cdn_successful_sync, remaining_attempts, change_number = self._attempt_split_sync(with_cdn_bypass, till)
129+
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = self._attempt_split_sync(with_cdn_bypass, till)
130+
final_segment_list.update(segment_list)
124131
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
125132
if without_cdn_successful_sync:
126133
_LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.',
127134
without_cdn_attempts)
128-
return
135+
return final_segment_list
129136
else:
130137
_LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.',
131138
without_cdn_attempts)

splitio/sync/synchronizer.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ def synchronize_segment(self, segment_name, till):
221221
_LOGGER.error('Failed to sync some segments.')
222222
return success
223223

224-
def synchronize_splits(self, till):
224+
def synchronize_splits(self, till, sync_segments=True):
225225
"""
226226
Synchronize all splits.
227227
@@ -233,7 +233,18 @@ def synchronize_splits(self, till):
233233
"""
234234
_LOGGER.debug('Starting splits synchronization')
235235
try:
236-
self._split_synchronizers.split_sync.synchronize_splits(till)
236+
new_segments = []
237+
for segment in self._split_synchronizers.split_sync.synchronize_splits(till):
238+
if not self._split_synchronizers.segment_sync.segment_exist_in_storage(segment):
239+
new_segments.append(segment)
240+
if sync_segments and len(new_segments) != 0:
241+
_LOGGER.debug('Synching Segments: %s', ','.join(new_segments))
242+
success = self._split_synchronizers.segment_sync.synchronize_segments(new_segments, True)
243+
if not success:
244+
_LOGGER.error('Failed to schedule sync one or all segment(s) below.')
245+
_LOGGER.error(','.join(new_segments))
246+
else:
247+
_LOGGER.debug('Segment sync scheduled.')
237248
return True
238249
except APIException:
239250
_LOGGER.error('Failed syncing splits')
@@ -245,7 +256,7 @@ def sync_all(self):
245256
attempts = 3
246257
while attempts > 0:
247258
try:
248-
if not self.synchronize_splits(None):
259+
if not self.synchronize_splits(None, False):
249260
attempts -= 1
250261
continue
251262

splitio/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '9.1.2'
1+
__version__ = '9.1.3'

tests/sync/test_splits_synchronizer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def change_number_mock():
4040
return 123
4141
change_number_mock._calls = 0
4242
storage.get_change_number.side_effect = change_number_mock
43+
storage.get_segment_names.return_value = []
4344

4445
api = mocker.Mock()
4546
splits = [{
@@ -113,6 +114,7 @@ def test_not_called_on_till(self, mocker):
113114
def change_number_mock():
114115
return 2
115116
storage.get_change_number.side_effect = change_number_mock
117+
storage.get_segment_names.return_value = []
116118

117119
def get_changes(*args, **kwargs):
118120
get_changes.called += 1
@@ -147,6 +149,7 @@ def change_number_mock():
147149
return 12345 # Return proper cn for CDN Bypass
148150
change_number_mock._calls = 0
149151
storage.get_change_number.side_effect = change_number_mock
152+
storage.get_segment_names.return_value = []
150153

151154
api = mocker.Mock()
152155
splits = [{

tests/sync/test_synchronizer.py

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from splitio.api import APIException
1616
from splitio.models.splits import Split
1717
from splitio.models.segments import Segment
18-
18+
from splitio.storage.inmemmory import InMemorySegmentStorage, InMemorySplitStorage
1919

2020
class SynchronizerTests(object):
2121
def test_sync_all_failed_splits(self, mocker):
@@ -66,9 +66,74 @@ def run(x, y):
6666
'killed': False,
6767
'defaultTreatment': 'off',
6868
'algo': 2,
69-
'conditions': []
69+
'conditions': [{
70+
'conditionType': 'WHITELIST',
71+
'matcherGroup':{
72+
'combiner': 'AND',
73+
'matchers':[{
74+
'matcherType': 'IN_SEGMENT',
75+
'negate': False,
76+
'userDefinedSegmentMatcherData': {
77+
'segmentName': 'segmentA'
78+
}
79+
}]
80+
},
81+
'partitions': [{
82+
'size': 100,
83+
'treatment': 'on'
84+
}]
85+
}]
7086
}]
7187

88+
def test_synchronize_splits(self, mocker):
89+
split_storage = InMemorySplitStorage()
90+
split_api = mocker.Mock()
91+
split_api.fetch_splits.return_value = {'splits': self.splits, 'since': 123,
92+
'till': 123}
93+
split_sync = SplitSynchronizer(split_api, split_storage)
94+
segment_storage = InMemorySegmentStorage()
95+
segment_api = mocker.Mock()
96+
segment_api.fetch_segment.return_value = {'name': 'segmentA', 'added': ['key1', 'key2',
97+
'key3'], 'removed': [], 'since': 123, 'till': 123}
98+
segment_sync = SegmentSynchronizer(segment_api, split_storage, segment_storage)
99+
split_synchronizers = SplitSynchronizers(split_sync, segment_sync, mocker.Mock(),
100+
mocker.Mock(), mocker.Mock())
101+
synchronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks))
102+
103+
synchronizer.synchronize_splits(123)
104+
105+
inserted_split = split_storage.get('some_name')
106+
assert isinstance(inserted_split, Split)
107+
assert inserted_split.name == 'some_name'
108+
109+
if not segment_sync._worker_pool.wait_for_completion():
110+
inserted_segment = segment_storage.get('segmentA')
111+
assert inserted_segment.name == 'segmentA'
112+
assert inserted_segment.keys == {'key1', 'key2', 'key3'}
113+
114+
def test_synchronize_splits_calling_segment_sync_once(self, mocker):
115+
split_storage = InMemorySplitStorage()
116+
split_api = mocker.Mock()
117+
split_api.fetch_splits.return_value = {'splits': self.splits, 'since': 123,
118+
'till': 123}
119+
split_sync = SplitSynchronizer(split_api, split_storage)
120+
counts = {'segments': 0}
121+
122+
def sync_segments(*_):
123+
"""Sync Segments."""
124+
counts['segments'] += 1
125+
return True
126+
127+
segment_sync = mocker.Mock()
128+
segment_sync.synchronize_segments.side_effect = sync_segments
129+
segment_sync.segment_exist_in_storage.return_value = False
130+
split_synchronizers = SplitSynchronizers(split_sync, segment_sync, mocker.Mock(),
131+
mocker.Mock(), mocker.Mock())
132+
synchronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks))
133+
synchronizer.synchronize_splits(123, True)
134+
135+
assert counts['segments'] == 1
136+
72137
def test_sync_all(self, mocker):
73138
split_storage = mocker.Mock(spec=SplitStorage)
74139
split_storage.get_change_number.return_value = 123
@@ -207,7 +272,7 @@ def test_sync_all_ok(self, mocker):
207272
def sync_splits(*_):
208273
"""Sync Splits."""
209274
counts['splits'] += 1
210-
return True
275+
return []
211276

212277
def sync_segments(*_):
213278
"""Sync Segments."""
@@ -254,5 +319,5 @@ def sync_segments(*_):
254319
split_tasks = mocker.Mock(spec=SplitTasks)
255320
synchronizer = Synchronizer(split_synchronizers, split_tasks)
256321

257-
synchronizer.sync_all()
322+
synchronizer._synchronize_segments()
258323
assert counts['segments'] == 1

tests/tasks/test_split_sync.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def change_number_mock():
2424
return 123
2525
change_number_mock._calls = 0
2626
storage.get_change_number.side_effect = change_number_mock
27+
storage.get_segment_names.return_value = []
2728

2829
api = mocker.Mock()
2930
splits = [{

0 commit comments

Comments
 (0)