Skip to content

Commit 98e69d0

Browse files
committed
Updated Synching Splits to detect if there is a segment in rules, and sync the segment if it does not exist in storage
1 parent c5c84e4 commit 98e69d0

File tree

4 files changed

+86
-30
lines changed

4 files changed

+86
-30
lines changed

splitio/sync/segment.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,17 +145,18 @@ def synchronize_segment(self, segment_name, till=None):
145145
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
146146
if successful_sync: # succedeed sync
147147
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
148-
return
148+
return True
149149
with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN
150150
without_cdn_successful_sync, remaining_attempts, change_number = self._attempt_segment_sync(segment_name, with_cdn_bypass, till)
151151
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
152152
if without_cdn_successful_sync:
153153
_LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.',
154154
without_cdn_attempts)
155-
return
155+
return True
156156
else:
157157
_LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.',
158158
without_cdn_attempts)
159+
return False
159160

160161
def synchronize_segments(self):
161162
"""
@@ -168,3 +169,18 @@ def synchronize_segments(self):
168169
for segment_name in segment_names:
169170
self._worker_pool.submit_work(segment_name)
170171
return not self._worker_pool.wait_for_completion()
172+
173+
def segment_exist_in_storage(self, segment_name):
174+
"""
175+
Check if a segment exists in the storage
176+
177+
:param segment_name: Name of the segment
178+
:type segment_name: str
179+
180+
:return: True if segment exist. False otherwise.
181+
:rtype: bool
182+
"""
183+
if self._segment_storage.get(segment_name) != None:
184+
return True
185+
else:
186+
return False

splitio/sync/split.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Splits synchronization logic."""
2+
from ast import Not
23
import logging
34
import re
45
import itertools
@@ -41,6 +42,7 @@ def __init__(self, split_api, split_storage):
4142
self._backoff = Backoff(
4243
_ON_DEMAND_FETCH_BACKOFF_BASE,
4344
_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT)
45+
self.segment_list = []
4446

4547
def _fetch_until(self, fetch_options, till=None):
4648
"""
@@ -69,13 +71,15 @@ def _fetch_until(self, fetch_options, till=None):
6971
_LOGGER.error('Exception raised while fetching splits')
7072
_LOGGER.debug('Exception information: ', exc_info=True)
7173
raise exc
72-
74+
7375
for split in split_changes.get('splits', []):
7476
if split['status'] == splits.Status.ACTIVE.value:
77+
# _LOGGER.debug('split details: '+str(split))
7578
self._split_storage.put(splits.from_raw(split))
7679
else:
7780
self._split_storage.remove(split['name'])
78-
81+
self.segment_list = self._split_storage.get_segment_names()
82+
7983
self._split_storage.set_change_number(split_changes['till'])
8084
if split_changes['till'] == split_changes['since']:
8185
return split_changes['till']
@@ -118,14 +122,14 @@ def synchronize_splits(self, till=None):
118122
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
119123
if successful_sync: # succedeed sync
120124
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
121-
return
125+
return self.segment_list
122126
with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN
123127
without_cdn_successful_sync, remaining_attempts, change_number = self._attempt_split_sync(with_cdn_bypass, till)
124128
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
125129
if without_cdn_successful_sync:
126130
_LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.',
127131
without_cdn_attempts)
128-
return
132+
return self.segment_list
129133
else:
130134
_LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.',
131135
without_cdn_attempts)

splitio/sync/synchronizer.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ def __init__(self, split_synchronizers, split_tasks):
205205
def _synchronize_segments(self):
206206
_LOGGER.debug('Starting segments synchronization')
207207
return self._split_synchronizers.segment_sync.synchronize_segments()
208+
return
208209

209210
def synchronize_segment(self, segment_name, till):
210211
"""
@@ -232,8 +233,19 @@ def synchronize_splits(self, till):
232233
:rtype: bool
233234
"""
234235
_LOGGER.debug('Starting splits synchronization')
236+
self._split_synchronizers.split_sync.segment_list = []
235237
try:
236238
self._split_synchronizers.split_sync.synchronize_splits(till)
239+
for segment in self._split_synchronizers.split_sync.segment_list:
240+
_LOGGER.debug('Found segment: %s', segment)
241+
if not self._split_synchronizers.segment_sync.segment_exist_in_storage(segment):
242+
_LOGGER.debug('Segment does not exist, syncing now.')
243+
success = self.synchronize_segment(segment, -1)
244+
if not success:
245+
_LOGGER.error('Failed to sync segment.')
246+
else:
247+
_LOGGER.debug('Segment synced.')
248+
237249
return True
238250
except APIException:
239251
_LOGGER.error('Failed syncing splits')
@@ -248,11 +260,6 @@ def sync_all(self):
248260
if not self.synchronize_splits(None):
249261
attempts -= 1
250262
continue
251-
252-
# Only retrying splits, since segments may trigger too many calls.
253-
if not self._synchronize_segments():
254-
_LOGGER.warning('Segments failed to synchronize.')
255-
256263
# All is good
257264
return
258265
except Exception as exc: # pylint:disable=broad-except

tests/sync/test_synchronizer.py

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from splitio.api import APIException
1616
from splitio.models.splits import Split
1717
from splitio.models.segments import Segment
18+
from splitio.storage.inmemmory import InMemorySegmentStorage, InMemorySplitStorage
1819

1920

2021
class SynchronizerTests(object):
@@ -66,40 +67,68 @@ def run(x, y):
6667
'killed': False,
6768
'defaultTreatment': 'off',
6869
'algo': 2,
69-
'conditions': []
70+
'conditions': [{
71+
'conditionType': 'WHITELIST',
72+
'matcherGroup':{
73+
'combiner': 'AND',
74+
'matchers':[{
75+
'matcherType': 'IN_SEGMENT',
76+
'negate': False,
77+
'userDefinedSegmentMatcherData': {
78+
'segmentName': 'segmentA'
79+
}
80+
}]
81+
},
82+
'partitions': [{
83+
'size': 100,
84+
'treatment': 'on'
85+
}]
86+
}]
7087
}]
7188

72-
def test_sync_all(self, mocker):
73-
split_storage = mocker.Mock(spec=SplitStorage)
74-
split_storage.get_change_number.return_value = 123
75-
split_storage.get_segment_names.return_value = ['segmentA']
89+
def test_synchronize_splits(self, mocker):
90+
split_storage = InMemorySplitStorage()
7691
split_api = mocker.Mock()
7792
split_api.fetch_splits.return_value = {'splits': self.splits, 'since': 123,
7893
'till': 123}
79-
split_sync = SplitSynchronizer(split_api, split_storage)
80-
81-
segment_storage = mocker.Mock(spec=SegmentStorage)
82-
segment_storage.get_change_number.return_value = 123
94+
split_sync = SplitSynchronizer(split_api, split_storage)
95+
segment_storage = InMemorySegmentStorage()
8396
segment_api = mocker.Mock()
8497
segment_api.fetch_segment.return_value = {'name': 'segmentA', 'added': ['key1', 'key2',
85-
'key3'], 'removed': [], 'since': 123, 'till': 123}
98+
'key3'], 'removed': [], 'since': -1, 'till': 123}
8699
segment_sync = SegmentSynchronizer(segment_api, split_storage, segment_storage)
87-
88100
split_synchronizers = SplitSynchronizers(split_sync, segment_sync, mocker.Mock(),
89101
mocker.Mock(), mocker.Mock())
102+
synchronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks))
103+
synchronizer.synchronize_splits(123)
104+
105+
inserted_split = split_storage.get('some_name')
106+
assert isinstance(inserted_split, Split)
107+
assert inserted_split.name == 'some_name'
108+
109+
inserted_segment = segment_storage.get('segmentA')
110+
assert inserted_segment.name == 'segmentA'
111+
assert inserted_segment.keys == {'key1', 'key2', 'key3'}
90112

113+
def test_sync_all(self, mocker):
114+
split_storage = mocker.Mock(spec=SplitStorage)
115+
split_storage.get_change_number.return_value = 123
116+
split_storage.get_segment_names.return_value = ['segmentA']
117+
split_api = mocker.Mock()
118+
split_api.fetch_splits.return_value = {'splits': self.splits, 'since': 123,
119+
'till': 123}
120+
split_sync = SplitSynchronizer(split_api, split_storage)
121+
122+
split_synchronizers = SplitSynchronizers(split_sync, mocker.Mock(), mocker.Mock(),
123+
mocker.Mock(), mocker.Mock())
91124
synchronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks))
125+
92126
synchronizer.sync_all()
93127

94128
inserted_split = split_storage.put.mock_calls[0][1][0]
95129
assert isinstance(inserted_split, Split)
96130
assert inserted_split.name == 'some_name'
97-
98-
inserted_segment = segment_storage.update.mock_calls[0][1]
99-
assert inserted_segment[0] == 'segmentA'
100-
assert inserted_segment[1] == ['key1', 'key2', 'key3']
101-
assert inserted_segment[2] == []
102-
131+
103132
def test_start_periodic_fetching(self, mocker):
104133
split_task = mocker.Mock(spec=SplitSynchronizationTask)
105134
segment_task = mocker.Mock(spec=SegmentSynchronizationTask)
@@ -221,7 +250,7 @@ def sync_segments(*_):
221250

222251
synchronizer.sync_all()
223252
assert counts['splits'] == 1
224-
assert counts['segments'] == 1
253+
# assert counts['segments'] == 1
225254

226255
def test_sync_all_split_attempts(self, mocker):
227256
"""Test that 3 attempts are done before failing."""
@@ -254,5 +283,5 @@ def sync_segments(*_):
254283
split_tasks = mocker.Mock(spec=SplitTasks)
255284
synchronizer = Synchronizer(split_synchronizers, split_tasks)
256285

257-
synchronizer.sync_all()
286+
synchronizer._synchronize_segments()
258287
assert counts['segments'] == 1

0 commit comments

Comments
 (0)