Skip to content

Commit df30877

Browse files
authored
Merge pull request #286 from splitio/patch-fix-timeout
patch fix for timeout and redis imp count
2 parents 802d4f1 + 0f5e3a2 commit df30877

File tree

7 files changed

+23
-32
lines changed

7 files changed

+23
-32
lines changed

splitio/client/factory.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
_INSTANTIATED_FACTORIES = Counter()
6464
_INSTANTIATED_FACTORIES_LOCK = threading.RLock()
6565
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED = 0.1 # 10%
66+
_MAX_RETRY_SYNC_ALL = 3
6667

6768

6869
class Status(Enum):
@@ -379,7 +380,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
379380
)
380381

381382
if preforked_initialization:
382-
synchronizer.sync_all(max_retry_attempts=3)
383+
synchronizer.sync_all(max_retry_attempts=_MAX_RETRY_SYNC_ALL)
383384
synchronizer._split_synchronizers._segment_sync.shutdown()
384385
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
385386
recorder, manager, preforked_initialization=preforked_initialization)

splitio/engine/impressions/adapters.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ def __init__(self, redis_client):
6666
:type telemtry_http_client: splitio.api.telemetry.TelemetryAPI
6767
"""
6868
self._redis_client = redis_client
69-
self.pipe = self._redis_client.pipeline()
70-
7169

7270
def record_unique_keys(self, uniques):
7371
"""
@@ -96,10 +94,11 @@ def flush_counters(self, to_send):
9694
try:
9795
resulted = 0
9896
counted = 0
97+
pipe = self._redis_client.pipeline()
9998
for pf_count in to_send:
100-
self.pipe.hincrby(self.IMP_COUNT_QUEUE_KEY, pf_count.feature + "::" + str(pf_count.timeframe), pf_count.count)
99+
pipe.hincrby(self.IMP_COUNT_QUEUE_KEY, pf_count.feature + "::" + str(pf_count.timeframe), pf_count.count)
101100
counted += pf_count.count
102-
resulted = sum(self.pipe.execute())
101+
resulted = sum(pipe.execute())
103102
self._expire_keys(self.IMP_COUNT_QUEUE_KEY, self.IMP_COUNT_KEY_DEFAULT_TTL, resulted, counted)
104103
return True
105104
except RedisAdapterException:

splitio/sync/manager.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,14 @@
77
from splitio.push.manager import PushManager, Status
88
from splitio.api import APIException
99
from splitio.util.backoff import Backoff
10-
10+
from splitio.sync.synchronizer import _SYNC_ALL_NO_RETRIES
1111

1212
_LOGGER = logging.getLogger(__name__)
1313

1414

1515
class Manager(object): # pylint:disable=too-many-instance-attributes
1616
"""Manager Class."""
1717

18-
_SYNC_ALL_ATTEMPTS = -1
1918
_CENTINEL_EVENT = object()
2019

2120
def __init__(self, ready_flag, synchronizer, auth_api, streaming_enabled, sdk_metadata, sse_url=None, client_key=None): # pylint:disable=too-many-arguments
@@ -59,15 +58,12 @@ def recreate(self):
5958
"""Recreate poolers for forked processes."""
6059
self._synchronizer._split_synchronizers._segment_sync.recreate()
6160

62-
def start(self):
61+
def start(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
6362
"""
6463
Start the SDK synchronization tasks.
65-
66-
:param max_retry_attempts: apply max attempts if it set to absilute integer.
67-
:type max_retry_attempts: int
6864
"""
6965
try:
70-
self._synchronizer.sync_all(self._SYNC_ALL_ATTEMPTS)
66+
self._synchronizer.sync_all(max_retry_attempts)
7167
self._ready_flag.set()
7268
self._synchronizer.start_periodic_data_recording()
7369
if self._streaming_enabled:

splitio/sync/synchronizer.py

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111

1212
_LOGGER = logging.getLogger(__name__)
13-
13+
_SYNC_ALL_NO_RETRIES = -1
1414

1515
class SplitSynchronizers(object):
1616
"""SplitSynchronizers."""
@@ -292,7 +292,7 @@ def synchronize_splits(self, till, sync_segments=True):
292292
_LOGGER.debug('Error: ', exc_info=True)
293293
return False
294294

295-
def sync_all(self, max_retry_attempts=-1):
295+
def sync_all(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
296296
"""
297297
Synchronize all splits.
298298
@@ -303,10 +303,7 @@ def sync_all(self, max_retry_attempts=-1):
303303
while True:
304304
try:
305305
if not self.synchronize_splits(None, False):
306-
retry_attempts = self._retry_block(max_retry_attempts, retry_attempts)
307-
if max_retry_attempts != -1 and retry_attempts == -1:
308-
break
309-
continue
306+
raise Exception("split sync failed")
310307

311308
# Only retrying splits, since segments may trigger too many calls.
312309
if not self._synchronize_segments():
@@ -317,20 +314,16 @@ def sync_all(self, max_retry_attempts=-1):
317314
except Exception as exc: # pylint:disable=broad-except
318315
_LOGGER.error("Exception caught when trying to sync all data: %s", str(exc))
319316
_LOGGER.debug('Error: ', exc_info=True)
320-
retry_attempts = self._retry_block(max_retry_attempts, retry_attempts)
321-
if max_retry_attempts != -1 and retry_attempts == -1:
322-
break
323-
continue
317+
if max_retry_attempts != _SYNC_ALL_NO_RETRIES:
318+
retry_attempts += 1
319+
if retry_attempts > max_retry_attempts:
320+
break
321+
how_long = self._backoff.get()
322+
time.sleep(how_long)
324323

325324
_LOGGER.error("Could not correctly synchronize splits and segments after %d attempts.", retry_attempts)
326325

327326
def _retry_block(self, max_retry_attempts, retry_attempts):
328-
if max_retry_attempts != -1:
329-
retry_attempts += 1
330-
if retry_attempts > max_retry_attempts:
331-
return -1
332-
how_long = self._backoff.get()
333-
time.sleep(how_long)
334327
return retry_attempts
335328

336329
def shutdown(self, blocking):

tests/client/test_factory.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ def test_redis_client_creation(self, mocker):
142142

143143
def test_uwsgi_forked_client_creation(self):
144144
"""Test client with preforked initialization."""
145-
# pytest.set_trace()
145+
# Invalid API Key with preforked should exit after 3 attempts.
146146
factory = get_factory('some_api_key', config={'preforkedInitialization': True})
147147
assert isinstance(factory._storages['splits'], inmemmory.InMemorySplitStorage)
148148
assert isinstance(factory._storages['segments'], inmemmory.InMemorySegmentStorage)
@@ -228,9 +228,10 @@ def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk
228228
mocker.patch('splitio.sync.manager.Manager.__init__', new=_split_synchronizer)
229229

230230
# Start factory and make assertions
231+
# Using invalid key should result in a timeout exception
231232
factory = get_factory('some_api_key')
232233
try:
233-
factory.block_until_ready(1)
234+
factory.block_until_ready(1)
234235
except:
235236
pass
236237
assert factory.ready is False
@@ -316,7 +317,7 @@ def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk
316317
# Start factory and make assertions
317318
factory = get_factory('some_api_key')
318319
try:
319-
factory.block_until_ready(1)
320+
factory.block_until_ready(1)
320321
except:
321322
pass
322323
assert factory.ready is False

tests/sync/test_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def run(x):
4646
manager = Manager(threading.Event(), synchronizer, mocker.Mock(), False, SdkMetadata('1.0', 'some', '1.2.3.4'))
4747

4848
manager._SYNC_ALL_ATTEMPTS = 1
49-
manager.start() # should not throw!
49+
manager.start(2) # should not throw!
5050

5151
def test_start_streaming_false(self, mocker):
5252
splits_ready_event = threading.Event()

tests/sync/test_synchronizer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def run(x, c):
3434

3535
sychronizer.synchronize_splits(None) # APIExceptions are handled locally and should not be propagated!
3636

37+
# test forcing to have only one retry attempt and then exit
3738
sychronizer.sync_all(1) # sync_all should not throw!
3839

3940
def test_sync_all_failed_segments(self, mocker):

0 commit comments

Comments
 (0)