Skip to content

Commit 31229ea

Browse files
committed
Updated Synching Splits to detect if there is a segment in rules, and sync the segment if it does not exist in storage
1 parent 98e69d0 commit 31229ea

File tree

6 files changed

+65
-39
lines changed

6 files changed

+65
-39
lines changed

splitio/sync/segment.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,19 @@ def synchronize_segment(self, segment_name, till=None):
158158
without_cdn_attempts)
159159
return False
160160

161-
def synchronize_segments(self):
161+
def synchronize_segments(self, segment_names = None):
162162
"""
163163
Submit all current segments and wait for them to finish, then set the ready flag.
164164
165+
:param segment_names: Optional, array of segment names to update.
166+
:type segment_name: [str]
167+
165168
:return: True if no error occurs. False otherwise.
166169
:rtype: bool
167170
"""
168-
segment_names = self._split_storage.get_segment_names()
171+
if segment_names is None:
172+
segment_names = self._split_storage.get_segment_names()
173+
169174
for segment_name in segment_names:
170175
self._worker_pool.submit_work(segment_name)
171176
return not self._worker_pool.wait_for_completion()

splitio/sync/split.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import logging
44
import re
55
import itertools
6+
from numpy import append
7+
from pyparsing import Each
68
import yaml
79
import time
810

@@ -42,9 +44,8 @@ def __init__(self, split_api, split_storage):
4244
self._backoff = Backoff(
4345
_ON_DEMAND_FETCH_BACKOFF_BASE,
4446
_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT)
45-
self.segment_list = []
4647

47-
def _fetch_until(self, fetch_options, till=None):
48+
def _fetch_until(self, fetch_options, till=None, segment_sync=None):
4849
"""
4950
Hit endpoint, update storage and return when since==till.
5051
@@ -57,13 +58,14 @@ def _fetch_until(self, fetch_options, till=None):
5758
:return: last change number
5859
:rtype: int
5960
"""
61+
segment_list = []
6062
while True: # Fetch until since==till
6163
change_number = self._split_storage.get_change_number()
6264
if change_number is None:
6365
change_number = -1
6466
if till is not None and till < change_number:
6567
# the passed till is less than change_number, no need to perform updates
66-
return change_number
68+
return change_number, segment_list
6769

6870
try:
6971
split_changes = self._api.fetch_splits(change_number, fetch_options)
@@ -74,17 +76,21 @@ def _fetch_until(self, fetch_options, till=None):
7476

7577
for split in split_changes.get('splits', []):
7678
if split['status'] == splits.Status.ACTIVE.value:
77-
# _LOGGER.debug('split details: '+str(split))
79+
_LOGGER.debug('split details: '+str(split))
7880
self._split_storage.put(splits.from_raw(split))
7981
else:
8082
self._split_storage.remove(split['name'])
81-
self.segment_list = self._split_storage.get_segment_names()
83+
for segment in self._split_storage.get_segment_names():
84+
_LOGGER.debug('Found segment: %s', segment)
85+
if not segment_sync.segment_exist_in_storage(segment):
86+
_LOGGER.debug('Segment %s does not exist, syncing.', segment)
87+
segment_list.append(segment)
8288

8389
self._split_storage.set_change_number(split_changes['till'])
8490
if split_changes['till'] == split_changes['since']:
85-
return split_changes['till']
91+
return split_changes['till'], segment_list
8692

87-
def _attempt_split_sync(self, fetch_options, till=None):
93+
def _attempt_split_sync(self, fetch_options, till=None, segment_sync=None):
8894
"""
8995
Hit endpoint, update storage and return True if sync is complete.
9096
@@ -101,35 +107,35 @@ def _attempt_split_sync(self, fetch_options, till=None):
101107
remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
102108
while True:
103109
remaining_attempts -= 1
104-
change_number = self._fetch_until(fetch_options, till)
110+
change_number, segment_list = self._fetch_until(fetch_options, till, segment_sync)
105111
if till is None or till <= change_number:
106-
return True, remaining_attempts, change_number
112+
return True, remaining_attempts, change_number, segment_list
107113
elif remaining_attempts <= 0:
108-
return False, remaining_attempts, change_number
114+
return False, remaining_attempts, change_number, segment_list
109115
how_long = self._backoff.get()
110116
time.sleep(how_long)
111117

112-
def synchronize_splits(self, till=None):
118+
def synchronize_splits(self, till=None, segment_sync=None):
113119
"""
114120
Hit endpoint, update storage and return True if sync is complete.
115121
116122
:param till: Passed till from Streaming.
117123
:type till: int
118124
"""
119125
fetch_options = FetchOptions(True) # Set Cache-Control to no-cache
120-
successful_sync, remaining_attempts, change_number = self._attempt_split_sync(fetch_options,
121-
till)
126+
successful_sync, remaining_attempts, change_number, segment_list = self._attempt_split_sync(fetch_options,
127+
till, segment_sync)
122128
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
123129
if successful_sync: # succedeed sync
124130
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
125-
return self.segment_list
131+
return segment_list
126132
with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN
127-
without_cdn_successful_sync, remaining_attempts, change_number = self._attempt_split_sync(with_cdn_bypass, till)
133+
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = self._attempt_split_sync(with_cdn_bypass, till, segment_sync)
128134
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
129135
if without_cdn_successful_sync:
130136
_LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.',
131137
without_cdn_attempts)
132-
return self.segment_list
138+
return segment_list
133139
else:
134140
_LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.',
135141
without_cdn_attempts)

splitio/sync/synchronizer.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,6 @@ def __init__(self, split_synchronizers, split_tasks):
205205
def _synchronize_segments(self):
206206
_LOGGER.debug('Starting segments synchronization')
207207
return self._split_synchronizers.segment_sync.synchronize_segments()
208-
return
209208

210209
def synchronize_segment(self, segment_name, till):
211210
"""
@@ -233,19 +232,14 @@ def synchronize_splits(self, till):
233232
:rtype: bool
234233
"""
235234
_LOGGER.debug('Starting splits synchronization')
236-
self._split_synchronizers.split_sync.segment_list = []
237235
try:
238-
self._split_synchronizers.split_sync.synchronize_splits(till)
239-
for segment in self._split_synchronizers.split_sync.segment_list:
240-
_LOGGER.debug('Found segment: %s', segment)
241-
if not self._split_synchronizers.segment_sync.segment_exist_in_storage(segment):
242-
_LOGGER.debug('Segment does not exist, syncing now.')
243-
success = self.synchronize_segment(segment, -1)
244-
if not success:
245-
_LOGGER.error('Failed to sync segment.')
246-
else:
247-
_LOGGER.debug('Segment synced.')
248-
236+
segment_list = self._split_synchronizers.split_sync.synchronize_splits(till, self._split_synchronizers.segment_sync)
237+
if segment_list != []:
238+
success = self._split_synchronizers.segment_sync.synchronize_segments(segment_list)
239+
if not success:
240+
_LOGGER.error('Failed to sync segment.')
241+
else:
242+
_LOGGER.debug('Segment synced.')
249243
return True
250244
except APIException:
251245
_LOGGER.error('Failed syncing splits')
@@ -260,6 +254,11 @@ def sync_all(self):
260254
if not self.synchronize_splits(None):
261255
attempts -= 1
262256
continue
257+
258+
# Only retrying splits, since segments may trigger too many calls.
259+
if not self._synchronize_segments():
260+
_LOGGER.warning('Segments failed to synchronize.')
261+
263262
# All is good
264263
return
265264
except Exception as exc: # pylint:disable=broad-except

tests/sync/test_splits_synchronizer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def change_number_mock():
4040
return 123
4141
change_number_mock._calls = 0
4242
storage.get_change_number.side_effect = change_number_mock
43+
storage.get_segment_names.return_value = []
4344

4445
api = mocker.Mock()
4546
splits = [{
@@ -113,6 +114,7 @@ def test_not_called_on_till(self, mocker):
113114
def change_number_mock():
114115
return 2
115116
storage.get_change_number.side_effect = change_number_mock
117+
storage.get_segment_names.return_value = []
116118

117119
def get_changes(*args, **kwargs):
118120
get_changes.called += 1
@@ -147,6 +149,7 @@ def change_number_mock():
147149
return 12345 # Return proper cn for CDN Bypass
148150
change_number_mock._calls = 0
149151
storage.get_change_number.side_effect = change_number_mock
152+
storage.get_segment_names.return_value = []
150153

151154
api = mocker.Mock()
152155
splits = [{

tests/sync/test_synchronizer.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from splitio.models.segments import Segment
1818
from splitio.storage.inmemmory import InMemorySegmentStorage, InMemorySplitStorage
1919

20-
2120
class SynchronizerTests(object):
2221
def test_sync_all_failed_splits(self, mocker):
2322
api = mocker.Mock()
@@ -95,13 +94,14 @@ def test_synchronize_splits(self, mocker):
9594
segment_storage = InMemorySegmentStorage()
9695
segment_api = mocker.Mock()
9796
segment_api.fetch_segment.return_value = {'name': 'segmentA', 'added': ['key1', 'key2',
98-
'key3'], 'removed': [], 'since': -1, 'till': 123}
97+
'key3'], 'removed': [], 'since': 123, 'till': 123}
9998
segment_sync = SegmentSynchronizer(segment_api, split_storage, segment_storage)
10099
split_synchronizers = SplitSynchronizers(split_sync, segment_sync, mocker.Mock(),
101100
mocker.Mock(), mocker.Mock())
102101
synchronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks))
103-
synchronizer.synchronize_splits(123)
104102

103+
synchronizer.synchronize_splits(123)
104+
105105
inserted_split = split_storage.get('some_name')
106106
assert isinstance(inserted_split, Split)
107107
assert inserted_split.name == 'some_name'
@@ -118,17 +118,29 @@ def test_sync_all(self, mocker):
118118
split_api.fetch_splits.return_value = {'splits': self.splits, 'since': 123,
119119
'till': 123}
120120
split_sync = SplitSynchronizer(split_api, split_storage)
121-
122-
split_synchronizers = SplitSynchronizers(split_sync, mocker.Mock(), mocker.Mock(),
121+
122+
segment_storage = mocker.Mock(spec=SegmentStorage)
123+
segment_storage.get_change_number.return_value = 123
124+
segment_api = mocker.Mock()
125+
segment_api.fetch_segment.return_value = {'name': 'segmentA', 'added': ['key1', 'key2',
126+
'key3'], 'removed': [], 'since': 123, 'till': 123}
127+
segment_sync = SegmentSynchronizer(segment_api, split_storage, segment_storage)
128+
129+
split_synchronizers = SplitSynchronizers(split_sync, segment_sync, mocker.Mock(),
123130
mocker.Mock(), mocker.Mock())
124-
synchronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks))
125131

132+
synchronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks))
126133
synchronizer.sync_all()
127134

128135
inserted_split = split_storage.put.mock_calls[0][1][0]
129136
assert isinstance(inserted_split, Split)
130137
assert inserted_split.name == 'some_name'
131-
138+
139+
inserted_segment = segment_storage.update.mock_calls[0][1]
140+
assert inserted_segment[0] == 'segmentA'
141+
assert inserted_segment[1] == ['key1', 'key2', 'key3']
142+
assert inserted_segment[2] == []
143+
132144
def test_start_periodic_fetching(self, mocker):
133145
split_task = mocker.Mock(spec=SplitSynchronizationTask)
134146
segment_task = mocker.Mock(spec=SegmentSynchronizationTask)
@@ -250,7 +262,7 @@ def sync_segments(*_):
250262

251263
synchronizer.sync_all()
252264
assert counts['splits'] == 1
253-
# assert counts['segments'] == 1
265+
assert counts['segments'] == 2
254266

255267
def test_sync_all_split_attempts(self, mocker):
256268
"""Test that 3 attempts are done before failing."""

tests/tasks/test_split_sync.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def change_number_mock():
2424
return 123
2525
change_number_mock._calls = 0
2626
storage.get_change_number.side_effect = change_number_mock
27+
storage.get_segment_names.return_value = []
2728

2829
api = mocker.Mock()
2930
splits = [{

0 commit comments

Comments
 (0)