Skip to content

Commit 397581b

Browse files
committed
Added retry attempts to sync_all
1 parent 534ee79 commit 397581b

File tree

6 files changed

+80
-33
lines changed

6 files changed

+80
-33
lines changed

splitio/client/factory.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
379379
)
380380

381381
if preforked_initialization:
382-
synchronizer.sync_all()
382+
synchronizer.sync_all(max_retry_attempts=3)
383383
synchronizer._split_synchronizers._segment_sync.shutdown()
384384
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
385385
recorder, manager, preforked_initialization=preforked_initialization)
@@ -391,7 +391,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
391391
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
392392
recorder, manager, sdk_ready_flag)
393393

394-
395394
def _build_redis_factory(api_key, cfg):
396395
"""Build and return a split factory with redis-based storage."""
397396
sdk_metadata = util.get_metadata(cfg)

splitio/sync/manager.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,15 @@ def recreate(self):
5858
"""Recreate poolers for forked processes."""
5959
self._synchronizer._split_synchronizers._segment_sync.recreate()
6060

61-
def start(self):
62-
"""Start the SDK synchronization tasks."""
61+
def start(self, retry_attempts=-1):
62+
"""
63+
Start the SDK synchronization tasks.
64+
65+
:param max_retry_attempts: apply max attempts if it set to absilute integer.
66+
:type max_retry_attempts: int
67+
"""
6368
try:
64-
self._synchronizer.sync_all()
69+
self._synchronizer.sync_all(retry_attempts)
6570
self._ready_flag.set()
6671
self._synchronizer.start_periodic_data_recording()
6772
if self._streaming_enabled:

splitio/sync/synchronizer.py

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -292,15 +292,20 @@ def synchronize_splits(self, till, sync_segments=True):
292292
_LOGGER.debug('Error: ', exc_info=True)
293293
return False
294294

295-
def sync_all(self):
296-
"""Synchronize all split data."""
297-
attempts = 3
298-
while attempts > 0:
295+
def sync_all(self, max_retry_attempts=-1):
296+
"""
297+
Synchronize all splits.
298+
299+
:param max_retry_attempts: apply max attempts if it set to absilute integer.
300+
:type max_retry_attempts: int
301+
"""
302+
retry_attempts = 0
303+
while True:
299304
try:
300305
if not self.synchronize_splits(None, False):
301-
attempts -= 1
302-
how_long = self._backoff.get()
303-
time.sleep(how_long)
306+
retry_attempts = self._retry_block(max_retry_attempts, retry_attempts)
307+
if not max_retry_attempts == -1 and retry_attempts == -1:
308+
break
304309
continue
305310

306311
# Only retrying splits, since segments may trigger too many calls.
@@ -310,11 +315,23 @@ def sync_all(self):
310315
# All is good
311316
return
312317
except Exception as exc: # pylint:disable=broad-except
313-
attempts -= 1
314318
_LOGGER.error("Exception caught when trying to sync all data: %s", str(exc))
315319
_LOGGER.debug('Error: ', exc_info=True)
316-
317-
_LOGGER.error("Could not correctly synchronize splits and segments after 3 attempts.")
320+
retry_attempts = self._retry_block(max_retry_attempts, retry_attempts)
321+
if not max_retry_attempts == -1 and retry_attempts == -1:
322+
break
323+
continue
324+
325+
_LOGGER.error("Could not correctly synchronize splits and segments after %d attempts.", retry_attempts)
326+
327+
def _retry_block(self, max_retry_attempts, retry_attempts):
328+
if not max_retry_attempts == -1:
329+
retry_attempts += 1
330+
if retry_attempts > max_retry_attempts:
331+
return -1
332+
how_long = self._backoff.get()
333+
time.sleep(how_long)
334+
return retry_attempts
318335

319336
def shutdown(self, blocking):
320337
"""
@@ -478,8 +495,12 @@ def __init__(self, split_synchronizers, split_tasks):
478495
self._split_synchronizers = split_synchronizers
479496
self._split_tasks = split_tasks
480497

481-
def sync_all(self):
482-
"""Synchronize all split data."""
498+
def sync_all(self, max_retry_attempts=-1):
499+
"""
500+
Synchronize all splits.
501+
502+
:param max_retry_attempts: Not used, added for compatibility
503+
"""
483504
try:
484505
self._split_synchronizers.split_sync.synchronize_splits(None)
485506
except APIException as exc:

tests/client/test_factory.py

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import os
66
import time
77
import threading
8+
import pytest
89
from splitio.client.factory import get_factory, SplitFactory, _INSTANTIATED_FACTORIES, Status,\
910
_LOGGER as _logger
1011
from splitio.client.config import DEFAULT_CONFIG
@@ -56,7 +57,10 @@ def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk
5657
assert isinstance(factory._recorder._impression_storage, inmemmory.ImpressionStorage)
5758

5859
assert factory._labels_enabled is True
59-
factory.block_until_ready()
60+
try:
61+
factory.block_until_ready(1)
62+
except:
63+
pass
6064
assert factory.ready
6165
factory.destroy()
6266

@@ -129,12 +133,16 @@ def test_redis_client_creation(self, mocker):
129133
assert isinstance(factory._recorder._make_pipe(), RedisPipelineAdapter)
130134
assert isinstance(factory._recorder._event_sotrage, redis.RedisEventsStorage)
131135
assert isinstance(factory._recorder._impression_storage, redis.RedisImpressionsStorage)
132-
factory.block_until_ready()
136+
try:
137+
factory.block_until_ready(1)
138+
except:
139+
pass
133140
assert factory.ready
134141
factory.destroy()
135142

136143
def test_uwsgi_forked_client_creation(self):
137144
"""Test client with preforked initialization."""
145+
# pytest.set_trace()
138146
factory = get_factory('some_api_key', config={'preforkedInitialization': True})
139147
assert isinstance(factory._storages['splits'], inmemmory.InMemorySplitStorage)
140148
assert isinstance(factory._storages['segments'], inmemmory.InMemorySegmentStorage)
@@ -221,8 +229,11 @@ def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk
221229

222230
# Start factory and make assertions
223231
factory = get_factory('some_api_key')
224-
factory.block_until_ready()
225-
assert factory.ready
232+
try:
233+
factory.block_until_ready(1)
234+
except:
235+
pass
236+
assert factory.ready is False
226237
assert factory.destroyed is False
227238

228239
factory.destroy()
@@ -304,8 +315,11 @@ def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk
304315

305316
# Start factory and make assertions
306317
factory = get_factory('some_api_key')
307-
factory.block_until_ready()
308-
assert factory.ready
318+
try:
319+
factory.block_until_ready(1)
320+
except:
321+
pass
322+
assert factory.ready is False
309323
assert factory.destroyed is False
310324

311325
event = threading.Event()
@@ -470,7 +484,10 @@ def _get_storage_mock(self, name):
470484
'preforkedInitialization': True,
471485
}
472486
factory = get_factory("none", config=config)
473-
factory.block_until_ready(10)
487+
try:
488+
factory.block_until_ready(10)
489+
except:
490+
pass
474491
assert factory._status == Status.WAITING_FORK
475492
assert len(sync_all_mock.mock_calls) == 1
476493
assert len(start_mock.mock_calls) == 0
@@ -481,6 +498,7 @@ def _get_storage_mock(self, name):
481498

482499
assert clear_impressions._called == 1
483500
assert clear_events._called == 1
501+
factory.destroy()
484502

485503
def test_error_prefork(self, mocker):
486504
"""Test not handling fork."""
@@ -490,9 +508,12 @@ def test_error_prefork(self, mocker):
490508

491509
filename = os.path.join(os.path.dirname(__file__), '../integration/files', 'file2.yaml')
492510
factory = get_factory('localhost', config={'splitFile': filename})
493-
factory.block_until_ready(1)
494-
511+
try:
512+
factory.block_until_ready(1)
513+
except:
514+
pass
495515
_logger = mocker.Mock()
496516
mocker.patch('splitio.client.factory._LOGGER', new=_logger)
497517
factory.resume()
498518
assert _logger.warning.mock_calls == expected_msg
519+
factory.destroy()

tests/sync/test_manager.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import threading
44
import unittest.mock as mock
5-
65
from splitio.tasks.split_sync import SplitSynchronizationTask
76
from splitio.tasks.segment_sync import SegmentSynchronizationTask
87
from splitio.tasks.impressions_sync import ImpressionsSyncTask, ImpressionsCountSyncTask
@@ -46,14 +45,16 @@ def run(x):
4645
synchronizer = Synchronizer(synchronizers, split_tasks)
4746
manager = Manager(threading.Event(), synchronizer, mocker.Mock(), False, SdkMetadata('1.0', 'some', '1.2.3.4'))
4847

49-
manager.start() # should not throw!
48+
manager.start(1) # should not throw!
5049

5150
def test_start_streaming_false(self, mocker):
5251
splits_ready_event = threading.Event()
5352
synchronizer = mocker.Mock(spec=Synchronizer)
5453
manager = Manager(splits_ready_event, synchronizer, mocker.Mock(), False, SdkMetadata('1.0', 'some', '1.2.3.4'))
55-
manager.start()
56-
54+
try:
55+
manager.start()
56+
except:
57+
pass
5758
splits_ready_event.wait(2)
5859
assert splits_ready_event.is_set()
5960
assert len(synchronizer.sync_all.mock_calls) == 1

tests/sync/test_synchronizer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def run(x, c):
3434

3535
sychronizer.synchronize_splits(None) # APIExceptions are handled locally and should not be propagated!
3636

37-
sychronizer.sync_all() # sync_all should not throw!
37+
sychronizer.sync_all(1) # sync_all should not throw!
3838

3939
def test_sync_all_failed_segments(self, mocker):
4040
api = mocker.Mock()
@@ -53,7 +53,7 @@ def run(x, y):
5353
mocker.Mock(), mocker.Mock())
5454
sychronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks))
5555

56-
sychronizer.sync_all() # SyncAll should not throw!
56+
sychronizer.sync_all(1) # SyncAll should not throw!
5757
assert not sychronizer._synchronize_segments()
5858

5959
splits = [{
@@ -319,7 +319,7 @@ def sync_splits(*_):
319319
split_tasks = mocker.Mock(spec=SplitTasks)
320320
synchronizer = Synchronizer(split_synchronizers, split_tasks)
321321

322-
synchronizer.sync_all()
322+
synchronizer.sync_all(2)
323323
assert counts['splits'] == 3
324324

325325
def test_sync_all_segment_attempts(self, mocker):

0 commit comments

Comments
 (0)