Skip to content

Commit 467ec53

Browse files
authored
Merge pull request #289 from splitio/development
Patch version 9.2.1
2 parents d5502ba + 6588ebb commit 467ec53

File tree

15 files changed

+118
-145
lines changed

15 files changed

+118
-145
lines changed

CHANGES.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
9.2.1 (Dec 2, 2022)
2+
- Changed redis record type for impressions counts from list using rpush to hashed key using hincrby.
3+
- Apply Timeout Exception when incorrect SDK API Key is used.
4+
- Changed potential initial fetching segment Warning to Debug in logging.
5+
16
9.2.0 (Oct 14, 2022)
27
- Added a new impressions mode for the SDK called NONE , to be used in factory when there is no desire to capture impressions on an SDK factory to feed Split's analytics engine. Running NONE mode, the SDK will only capture unique keys evaluated for a particular feature flag instead of full blown impressions
38

splitio/client/factory.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
_INSTANTIATED_FACTORIES = Counter()
6464
_INSTANTIATED_FACTORIES_LOCK = threading.RLock()
6565
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED = 0.1 # 10%
66+
_MAX_RETRY_SYNC_ALL = 3
6667

6768

6869
class Status(Enum):
@@ -316,9 +317,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
316317
'telemetry': TelemetryAPI(http_client, api_key, sdk_metadata),
317318
}
318319

319-
if not input_validator.validate_apikey_type(apis['segments']):
320-
return None
321-
322320
storages = {
323321
'splits': InMemorySplitStorage(),
324322
'segments': InMemorySegmentStorage(),
@@ -382,7 +380,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
382380
)
383381

384382
if preforked_initialization:
385-
synchronizer.sync_all()
383+
synchronizer.sync_all(max_retry_attempts=_MAX_RETRY_SYNC_ALL)
386384
synchronizer._split_synchronizers._segment_sync.shutdown()
387385
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
388386
recorder, manager, preforked_initialization=preforked_initialization)
@@ -394,7 +392,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
394392
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
395393
recorder, manager, sdk_ready_flag)
396394

397-
398395
def _build_redis_factory(api_key, cfg):
399396
"""Build and return a split factory with redis-based storage."""
400397
sdk_metadata = util.get_metadata(cfg)

splitio/client/input_validator.py

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -446,32 +446,7 @@ def validate_attributes(attributes, method_name):
446446
class _ApiLogFilter(logging.Filter): # pylint: disable=too-few-public-methods
447447
def filter(self, record):
448448
return record.name not in ('SegmentsAPI', 'HttpClient')
449-
450-
451-
def validate_apikey_type(segment_api):
452-
"""
453-
Try to guess if the apikey is of browser type and let the user know.
454-
455-
:param segment_api: Segments API client.
456-
:type segment_api: splitio.api.segments.SegmentsAPI
457-
"""
458-
api_messages_filter = _ApiLogFilter()
459-
_logger = logging.getLogger('splitio.api.segments')
460-
try:
461-
_logger.addFilter(api_messages_filter) # pylint: disable=protected-access
462-
segment_api.fetch_segment('__SOME_INVALID_SEGMENT__', -1, FetchOptions())
463-
except APIException as exc:
464-
if exc.status_code == 403:
465-
_LOGGER.error('factory instantiation: you passed a browser type '
466-
+ 'api_key, please grab an api key from the Split '
467-
+ 'console that is of type sdk')
468-
return False
469-
finally:
470-
_logger.removeFilter(api_messages_filter) # pylint: disable=protected-access
471-
472-
# True doesn't mean that the APIKEY is right, only that it's not of type "browser"
473-
return True
474-
449+
475450

476451
def validate_factory_instantiation(apikey):
477452
"""

splitio/engine/impressions/adapters.py

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,18 @@ def flush_counters(self, to_send):
8888
"""
8989
post the impression counters to redis.
9090
91-
:param uniques: unique keys disctionary
92-
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
91+
:param to_send: unique keys disctionary
92+
:type to_send: Dictionary {'feature1': set(), 'feature2': set(), .. }
9393
"""
94-
bulk_counts = self._build_counters(to_send)
9594
try:
96-
inserted = self._redis_client.rpush(self.IMP_COUNT_QUEUE_KEY, *bulk_counts)
97-
self._expire_keys(self.IMP_COUNT_QUEUE_KEY, self.IMP_COUNT_KEY_DEFAULT_TTL, inserted, len(bulk_counts))
95+
resulted = 0
96+
counted = 0
97+
pipe = self._redis_client.pipeline()
98+
for pf_count in to_send:
99+
pipe.hincrby(self.IMP_COUNT_QUEUE_KEY, pf_count.feature + "::" + str(pf_count.timeframe), pf_count.count)
100+
counted += pf_count.count
101+
resulted = sum(pipe.execute())
102+
self._expire_keys(self.IMP_COUNT_QUEUE_KEY, self.IMP_COUNT_KEY_DEFAULT_TTL, resulted, counted)
98103
return True
99104
except RedisAdapterException:
100105
_LOGGER.error('Something went wrong when trying to add counters to redis')
@@ -124,23 +129,3 @@ def _uniques_formatter(self, uniques):
124129
:rtype: json
125130
"""
126131
return [json.dumps({'f': feature, 'ks': list(keys)}) for feature, keys in uniques.items()]
127-
128-
def _build_counters(self, counters):
129-
"""
130-
Build an impression bulk formatted as the API expects it.
131-
132-
:param counters: List of impression counters per feature.
133-
:type counters: list[splitio.engine.impressions.Counter.CountPerFeature]
134-
135-
:return: dict with list of impression count dtos
136-
:rtype: dict
137-
"""
138-
return json.dumps({
139-
'pf': [
140-
{
141-
'f': pf_count.feature,
142-
'm': pf_count.timeframe,
143-
'rc': pf_count.count
144-
} for pf_count in counters
145-
]
146-
})

splitio/storage/adapters/redis.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,13 @@ def hget(self, name, key):
241241
except RedisError as exc:
242242
raise RedisAdapterException('Error executing hget operation') from exc
243243

244+
def hincrby(self, name, key, amount=1):
245+
"""Mimic original redis function but using user custom prefix."""
246+
try:
247+
return self._decorated.hincrby(self._prefix_helper.add_prefix(name), key, amount)
248+
except RedisError as exc:
249+
raise RedisAdapterException('Error executing hincrby operation') from exc
250+
244251
def incr(self, name, amount=1):
245252
"""Mimic original redis function but using user custom prefix."""
246253
try:
@@ -323,6 +330,10 @@ def incr(self, name, amount=1):
323330
"""Mimic original redis function but using user custom prefix."""
324331
self._pipe.incr(self._prefix_helper.add_prefix(name), amount)
325332

333+
def hincrby(self, name, key, amount=1):
334+
"""Mimic original redis function but using user custom prefix."""
335+
self._pipe.hincrby(self._prefix_helper.add_prefix(name), key, amount)
336+
326337
def execute(self):
327338
"""Mimic original redis function but using user custom prefix."""
328339
try:

splitio/storage/inmemmory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ def get(self, segment_name):
193193
with self._lock:
194194
fetched = self._segments.get(segment_name)
195195
if fetched is None:
196-
_LOGGER.warning(
196+
_LOGGER.debug(
197197
"Tried to retrieve nonexistant segment %s. Skipping",
198198
segment_name
199199
)

splitio/sync/manager.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from splitio.push.manager import PushManager, Status
88
from splitio.api import APIException
99
from splitio.util.backoff import Backoff
10-
10+
from splitio.sync.synchronizer import _SYNC_ALL_NO_RETRIES
1111

1212
_LOGGER = logging.getLogger(__name__)
1313

@@ -58,10 +58,12 @@ def recreate(self):
5858
"""Recreate poolers for forked processes."""
5959
self._synchronizer._split_synchronizers._segment_sync.recreate()
6060

61-
def start(self):
62-
"""Start the SDK synchronization tasks."""
61+
def start(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
62+
"""
63+
Start the SDK synchronization tasks.
64+
"""
6365
try:
64-
self._synchronizer.sync_all()
66+
self._synchronizer.sync_all(max_retry_attempts)
6567
self._ready_flag.set()
6668
self._synchronizer.start_periodic_data_recording()
6769
if self._streaming_enabled:

splitio/sync/synchronizer.py

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
import abc
44
import logging
55
import threading
6+
import time
67

78
from splitio.api import APIException
9+
from splitio.util.backoff import Backoff
810

911

1012
_LOGGER = logging.getLogger(__name__)
11-
13+
_SYNC_ALL_NO_RETRIES = -1
1214

1315
class SplitSynchronizers(object):
1416
"""SplitSynchronizers."""
@@ -212,6 +214,9 @@ def shutdown(self, blocking):
212214
class Synchronizer(BaseSynchronizer):
213215
"""Synchronizer."""
214216

217+
_ON_DEMAND_FETCH_BACKOFF_BASE = 10 # backoff base starting at 10 seconds
218+
_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT = 30 # don't sleep for more than 1 minute
219+
215220
def __init__(self, split_synchronizers, split_tasks):
216221
"""
217222
Class constructor.
@@ -221,6 +226,9 @@ def __init__(self, split_synchronizers, split_tasks):
221226
:param split_tasks: tasks for starting/stopping tasks
222227
:type split_tasks: splitio.sync.synchronizer.SplitTasks
223228
"""
229+
self._backoff = Backoff(
230+
self._ON_DEMAND_FETCH_BACKOFF_BASE,
231+
self._ON_DEMAND_FETCH_BACKOFF_MAX_WAIT)
224232
self._split_synchronizers = split_synchronizers
225233
self._split_tasks = split_tasks
226234
self._periodic_data_recording_tasks = [
@@ -284,14 +292,18 @@ def synchronize_splits(self, till, sync_segments=True):
284292
_LOGGER.debug('Error: ', exc_info=True)
285293
return False
286294

287-
def sync_all(self):
288-
"""Synchronize all split data."""
289-
attempts = 3
290-
while attempts > 0:
295+
def sync_all(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
296+
"""
297+
Synchronize all splits.
298+
299+
:param max_retry_attempts: apply max attempts if it set to absilute integer.
300+
:type max_retry_attempts: int
301+
"""
302+
retry_attempts = 0
303+
while True:
291304
try:
292305
if not self.synchronize_splits(None, False):
293-
attempts -= 1
294-
continue
306+
raise Exception("split sync failed")
295307

296308
# Only retrying splits, since segments may trigger too many calls.
297309
if not self._synchronize_segments():
@@ -300,11 +312,19 @@ def sync_all(self):
300312
# All is good
301313
return
302314
except Exception as exc: # pylint:disable=broad-except
303-
attempts -= 1
304315
_LOGGER.error("Exception caught when trying to sync all data: %s", str(exc))
305316
_LOGGER.debug('Error: ', exc_info=True)
317+
if max_retry_attempts != _SYNC_ALL_NO_RETRIES:
318+
retry_attempts += 1
319+
if retry_attempts > max_retry_attempts:
320+
break
321+
how_long = self._backoff.get()
322+
time.sleep(how_long)
306323

307-
_LOGGER.error("Could not correctly synchronize splits and segments after 3 attempts.")
324+
_LOGGER.error("Could not correctly synchronize splits and segments after %d attempts.", retry_attempts)
325+
326+
def _retry_block(self, max_retry_attempts, retry_attempts):
327+
return retry_attempts
308328

309329
def shutdown(self, blocking):
310330
"""
@@ -468,8 +488,12 @@ def __init__(self, split_synchronizers, split_tasks):
468488
self._split_synchronizers = split_synchronizers
469489
self._split_tasks = split_tasks
470490

471-
def sync_all(self):
472-
"""Synchronize all split data."""
491+
def sync_all(self, max_retry_attempts=-1):
492+
"""
493+
Synchronize all splits.
494+
495+
:param max_retry_attempts: Not used, added for compatibility
496+
"""
473497
try:
474498
self._split_synchronizers.split_sync.synchronize_splits(None)
475499
except APIException as exc:

splitio/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '9.2.0'
1+
__version__ = '9.2.1'

0 commit comments

Comments
 (0)