Skip to content

Commit 18b5166

Browse files
committed
1- Moved segment in split check logic to synchronizer.py
2- Created test to verify segment_sync is called once from split_sync 3- disable waiting for segment sync workerpool job when synching segments from splits. 4- Other general cleanup.
1 parent 5036e3e commit 18b5166

File tree

4 files changed

+67
-38
lines changed

4 files changed

+67
-38
lines changed

splitio/sync/segment.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ def synchronize_segment(self, segment_name, till=None):
139139
:param till: ChangeNumber received.
140140
:type till: int
141141
142+
:return: True if no error occurs. False otherwise.
143+
:rtype: bool
142144
"""
143145
fetch_options = FetchOptions(True) # Set Cache-Control to no-cache
144146
successful_sync, remaining_attempts, change_number = self._attempt_segment_sync(segment_name, fetch_options, till)
@@ -157,21 +159,26 @@ def synchronize_segment(self, segment_name, till=None):
157159
without_cdn_attempts)
158160
return False
159161

160-
def synchronize_segments(self, segment_names = None):
162+
def synchronize_segments(self, segment_names = None, dont_wait = False):
161163
"""
162-
Submit all current segments and wait for them to finish, then set the ready flag.
164+
Submit all current segments and wait for them to finish depend on dont_wait flag, then set the ready flag.
163165
164166
:param segment_names: Optional, array of segment names to update.
165-
:type segment_name: [str]
167+
:type segment_name: {str}
166168
167-
:return: True if no error occurs. False otherwise.
169+
:param dont_wait: Optional, instruct the function to not wait for task completion
170+
:type segment_name: boolean
171+
172+
:return: True if no error occurs or dont_wait flag is True. False otherwise.
168173
:rtype: bool
169174
"""
170175
if segment_names is None:
171176
segment_names = self._split_storage.get_segment_names()
172177

173178
for segment_name in segment_names:
174179
self._worker_pool.submit_work(segment_name)
180+
if (dont_wait):
181+
return True
175182
return not self._worker_pool.wait_for_completion()
176183

177184
def segment_exist_in_storage(self, segment_name):
@@ -184,6 +191,4 @@ def segment_exist_in_storage(self, segment_name):
184191
:return: True if segment exist. False otherwise.
185192
:rtype: bool
186193
"""
187-
if self._segment_storage.get(segment_name) != None:
188-
return True
189-
return False
194+
return self._segment_storage.get(segment_name) != None

splitio/sync/split.py

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def __init__(self, split_api, split_storage):
4242
_ON_DEMAND_FETCH_BACKOFF_BASE,
4343
_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT)
4444

45-
def _fetch_until(self, fetch_options, till=None, segment_sync=None):
45+
def _fetch_until(self, fetch_options, till=None):
4646
"""
4747
Hit endpoint, update storage and return when since==till.
4848
@@ -55,7 +55,7 @@ def _fetch_until(self, fetch_options, till=None, segment_sync=None):
5555
:return: last change number
5656
:rtype: int
5757
"""
58-
segment_list = []
58+
segment_list = set()
5959
while True: # Fetch until since==till
6060
change_number = self._split_storage.get_change_number()
6161
if change_number is None:
@@ -70,23 +70,19 @@ def _fetch_until(self, fetch_options, till=None, segment_sync=None):
7070
_LOGGER.error('Exception raised while fetching splits')
7171
_LOGGER.debug('Exception information: ', exc_info=True)
7272
raise exc
73-
73+
7474
for split in split_changes.get('splits', []):
7575
if split['status'] == splits.Status.ACTIVE.value:
76-
self._split_storage.put(splits.from_raw(split))
76+
parsed = splits.from_raw(split)
77+
self._split_storage.put(parsed)
78+
segment_list.update(set(parsed.get_segment_names()))
7779
else:
7880
self._split_storage.remove(split['name'])
79-
for segment in self._split_storage.get_segment_names():
80-
_LOGGER.debug('Found segment: %s', segment)
81-
if not segment_sync.segment_exist_in_storage(segment):
82-
_LOGGER.debug('Segment %s does not exist, syncing.', segment)
83-
segment_list.append(segment)
84-
8581
self._split_storage.set_change_number(split_changes['till'])
8682
if split_changes['till'] == split_changes['since']:
8783
return split_changes['till'], segment_list
8884

89-
def _attempt_split_sync(self, fetch_options, till=None, segment_sync=None):
85+
def _attempt_split_sync(self, fetch_options, till=None):
9086
"""
9187
Hit endpoint, update storage and return True if sync is complete.
9288
@@ -103,15 +99,15 @@ def _attempt_split_sync(self, fetch_options, till=None, segment_sync=None):
10399
remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
104100
while True:
105101
remaining_attempts -= 1
106-
change_number, segment_list = self._fetch_until(fetch_options, till, segment_sync)
102+
change_number, segment_list = self._fetch_until(fetch_options, till)
107103
if till is None or till <= change_number:
108104
return True, remaining_attempts, change_number, segment_list
109105
elif remaining_attempts <= 0:
110106
return False, remaining_attempts, change_number, segment_list
111107
how_long = self._backoff.get()
112108
time.sleep(how_long)
113109

114-
def synchronize_splits(self, till=None, segment_sync=None):
110+
def synchronize_splits(self, till=None):
115111
"""
116112
Hit endpoint, update storage and return True if sync is complete.
117113
@@ -120,13 +116,13 @@ def synchronize_splits(self, till=None, segment_sync=None):
120116
"""
121117
fetch_options = FetchOptions(True) # Set Cache-Control to no-cache
122118
successful_sync, remaining_attempts, change_number, segment_list = self._attempt_split_sync(fetch_options,
123-
till, segment_sync)
119+
till)
124120
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
125121
if successful_sync: # succedeed sync
126122
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
127123
return segment_list
128124
with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN
129-
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = self._attempt_split_sync(with_cdn_bypass, till, segment_sync)
125+
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = self._attempt_split_sync(with_cdn_bypass, till)
130126
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
131127
if without_cdn_successful_sync:
132128
_LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.',

splitio/sync/synchronizer.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ def synchronize_segment(self, segment_name, till):
221221
_LOGGER.error('Failed to sync some segments.')
222222
return success
223223

224-
def synchronize_splits(self, till):
224+
def synchronize_splits(self, till, sync_segments=True):
225225
"""
226226
Synchronize all splits.
227227
@@ -233,13 +233,17 @@ def synchronize_splits(self, till):
233233
"""
234234
_LOGGER.debug('Starting splits synchronization')
235235
try:
236-
segment_list = self._split_synchronizers.split_sync.synchronize_splits(till, self._split_synchronizers.segment_sync)
237-
if len(segment_list) != 0:
238-
success = self._split_synchronizers.segment_sync.synchronize_segments(segment_list)
236+
new_segments = []
237+
for segment in self._split_synchronizers.split_sync.synchronize_splits(till):
238+
if not self._split_synchronizers.segment_sync.segment_exist_in_storage(segment):
239+
new_segments.append(segment)
240+
if sync_segments and len(new_segments) != 0:
241+
success = self._split_synchronizers.segment_sync.synchronize_segments(new_segments, True)
239242
if not success:
240-
_LOGGER.error('Failed to sync segment.')
243+
_LOGGER.error('Failed to schedule sync one or all segment(s) below.')
244+
_LOGGER.error(','.join(new_segments))
241245
else:
242-
_LOGGER.debug('Segment synced.')
246+
_LOGGER.debug('Segment sync scheduled.')
243247
return True
244248
except APIException:
245249
_LOGGER.error('Failed syncing splits')
@@ -251,14 +255,14 @@ def sync_all(self):
251255
attempts = 3
252256
while attempts > 0:
253257
try:
254-
if not self.synchronize_splits(None):
258+
if not self.synchronize_splits(None, False):
255259
attempts -= 1
256260
continue
257-
261+
258262
# Only retrying splits, since segments may trigger too many calls.
259263
if not self._synchronize_segments():
260264
_LOGGER.warning('Segments failed to synchronize.')
261-
265+
262266
# All is good
263267
return
264268
except Exception as exc: # pylint:disable=broad-except

tests/sync/test_synchronizer.py

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def test_synchronize_splits(self, mocker):
9090
split_api = mocker.Mock()
9191
split_api.fetch_splits.return_value = {'splits': self.splits, 'since': 123,
9292
'till': 123}
93-
split_sync = SplitSynchronizer(split_api, split_storage)
93+
split_sync = SplitSynchronizer(split_api, split_storage)
9494
segment_storage = InMemorySegmentStorage()
9595
segment_api = mocker.Mock()
9696
segment_api.fetch_segment.return_value = {'name': 'segmentA', 'added': ['key1', 'key2',
@@ -101,14 +101,38 @@ def test_synchronize_splits(self, mocker):
101101
synchronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks))
102102

103103
synchronizer.synchronize_splits(123)
104-
104+
105105
inserted_split = split_storage.get('some_name')
106106
assert isinstance(inserted_split, Split)
107107
assert inserted_split.name == 'some_name'
108-
109-
inserted_segment = segment_storage.get('segmentA')
110-
assert inserted_segment.name == 'segmentA'
111-
assert inserted_segment.keys == {'key1', 'key2', 'key3'}
108+
109+
if not segment_sync._worker_pool.wait_for_completion():
110+
inserted_segment = segment_storage.get('segmentA')
111+
assert inserted_segment.name == 'segmentA'
112+
assert inserted_segment.keys == {'key1', 'key2', 'key3'}
113+
114+
def test_synchronize_splits_calling_segment_sync_once(self, mocker):
115+
split_storage = InMemorySplitStorage()
116+
split_api = mocker.Mock()
117+
split_api.fetch_splits.return_value = {'splits': self.splits, 'since': 123,
118+
'till': 123}
119+
split_sync = SplitSynchronizer(split_api, split_storage)
120+
counts = {'segments': 0}
121+
122+
def sync_segments(*_):
123+
"""Sync Segments."""
124+
counts['segments'] += 1
125+
return True
126+
127+
segment_sync = mocker.Mock()
128+
segment_sync.synchronize_segments.side_effect = sync_segments
129+
segment_sync.segment_exist_in_storage.return_value = False
130+
split_synchronizers = SplitSynchronizers(split_sync, segment_sync, mocker.Mock(),
131+
mocker.Mock(), mocker.Mock())
132+
synchronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks))
133+
synchronizer.synchronize_splits(123, True)
134+
135+
assert counts['segments'] == 1
112136

113137
def test_sync_all(self, mocker):
114138
split_storage = mocker.Mock(spec=SplitStorage)
@@ -140,7 +164,7 @@ def test_sync_all(self, mocker):
140164
assert inserted_segment[0] == 'segmentA'
141165
assert inserted_segment[1] == ['key1', 'key2', 'key3']
142166
assert inserted_segment[2] == []
143-
167+
144168
def test_start_periodic_fetching(self, mocker):
145169
split_task = mocker.Mock(spec=SplitSynchronizationTask)
146170
segment_task = mocker.Mock(spec=SegmentSynchronizationTask)

0 commit comments

Comments
 (0)