Skip to content

Commit 92acce6

Browse files
committed
polishing
1 parent 802d4f1 commit 92acce6

File tree

4 files changed

+16
-22
lines changed

4 files changed

+16
-22
lines changed

splitio/client/factory.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
_INSTANTIATED_FACTORIES = Counter()
6464
_INSTANTIATED_FACTORIES_LOCK = threading.RLock()
6565
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED = 0.1 # 10%
66+
_MAX_RETRY_SYNC_ALL = 3
6667

6768

6869
class Status(Enum):
@@ -379,7 +380,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
379380
)
380381

381382
if preforked_initialization:
382-
synchronizer.sync_all(max_retry_attempts=3)
383+
synchronizer.sync_all(max_retry_attempts=_MAX_RETRY_SYNC_ALL)
383384
synchronizer._split_synchronizers._segment_sync.shutdown()
384385
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
385386
recorder, manager, preforked_initialization=preforked_initialization)

splitio/engine/impressions/adapters.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ def __init__(self, redis_client):
6666
:type telemtry_http_client: splitio.api.telemetry.TelemetryAPI
6767
"""
6868
self._redis_client = redis_client
69-
self.pipe = self._redis_client.pipeline()
70-
7169

7270
def record_unique_keys(self, uniques):
7371
"""
@@ -96,10 +94,11 @@ def flush_counters(self, to_send):
9694
try:
9795
resulted = 0
9896
counted = 0
97+
pipe = self._redis_client.pipeline()
9998
for pf_count in to_send:
100-
self.pipe.hincrby(self.IMP_COUNT_QUEUE_KEY, pf_count.feature + "::" + str(pf_count.timeframe), pf_count.count)
99+
pipe.hincrby(self.IMP_COUNT_QUEUE_KEY, pf_count.feature + "::" + str(pf_count.timeframe), pf_count.count)
101100
counted += pf_count.count
102-
resulted = sum(self.pipe.execute())
101+
resulted = sum(pipe.execute())
103102
self._expire_keys(self.IMP_COUNT_QUEUE_KEY, self.IMP_COUNT_KEY_DEFAULT_TTL, resulted, counted)
104103
return True
105104
except RedisAdapterException:

splitio/sync/manager.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,14 @@
77
from splitio.push.manager import PushManager, Status
88
from splitio.api import APIException
99
from splitio.util.backoff import Backoff
10-
10+
from splitio.sync.synchronizer import _SYNC_ALL_NO_RETRIES
1111

1212
_LOGGER = logging.getLogger(__name__)
1313

1414

1515
class Manager(object): # pylint:disable=too-many-instance-attributes
1616
"""Manager Class."""
1717

18-
_SYNC_ALL_ATTEMPTS = -1
1918
_CENTINEL_EVENT = object()
2019

2120
def __init__(self, ready_flag, synchronizer, auth_api, streaming_enabled, sdk_metadata, sse_url=None, client_key=None): # pylint:disable=too-many-arguments
@@ -59,15 +58,12 @@ def recreate(self):
5958
"""Recreate poolers for forked processes."""
6059
self._synchronizer._split_synchronizers._segment_sync.recreate()
6160

62-
def start(self):
61+
def start(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
6362
"""
6463
Start the SDK synchronization tasks.
65-
66-
:param max_retry_attempts: apply max attempts if it set to absilute integer.
67-
:type max_retry_attempts: int
6864
"""
6965
try:
70-
self._synchronizer.sync_all(self._SYNC_ALL_ATTEMPTS)
66+
self._synchronizer.sync_all(max_retry_attempts)
7167
self._ready_flag.set()
7268
self._synchronizer.start_periodic_data_recording()
7369
if self._streaming_enabled:

splitio/sync/synchronizer.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111

1212
_LOGGER = logging.getLogger(__name__)
13-
13+
_SYNC_ALL_NO_RETRIES = 3
1414

1515
class SplitSynchronizers(object):
1616
"""SplitSynchronizers."""
@@ -292,7 +292,7 @@ def synchronize_splits(self, till, sync_segments=True):
292292
_LOGGER.debug('Error: ', exc_info=True)
293293
return False
294294

295-
def sync_all(self, max_retry_attempts=-1):
295+
def sync_all(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
296296
"""
297297
Synchronize all splits.
298298
@@ -303,10 +303,7 @@ def sync_all(self, max_retry_attempts=-1):
303303
while True:
304304
try:
305305
if not self.synchronize_splits(None, False):
306-
retry_attempts = self._retry_block(max_retry_attempts, retry_attempts)
307-
if max_retry_attempts != -1 and retry_attempts == -1:
308-
break
309-
continue
306+
raise Exception("split sync failed")
310307

311308
# Only retrying splits, since segments may trigger too many calls.
312309
if not self._synchronize_segments():
@@ -317,10 +314,11 @@ def sync_all(self, max_retry_attempts=-1):
317314
except Exception as exc: # pylint:disable=broad-except
318315
_LOGGER.error("Exception caught when trying to sync all data: %s", str(exc))
319316
_LOGGER.debug('Error: ', exc_info=True)
320-
retry_attempts = self._retry_block(max_retry_attempts, retry_attempts)
321-
if max_retry_attempts != -1 and retry_attempts == -1:
322-
break
323-
continue
317+
318+
retry_attempts = self._retry_block(max_retry_attempts, retry_attempts)
319+
if max_retry_attempts != -1 and retry_attempts == -1:
320+
break
321+
continue
324322

325323
_LOGGER.error("Could not correctly synchronize splits and segments after %d attempts.", retry_attempts)
326324

0 commit comments

Comments
 (0)