Skip to content

Commit 8fdef37

Browse files
committed
Ported changes from master and updated tests
1 parent 58e7d4c commit 8fdef37

File tree

17 files changed

+139
-192
lines changed

17 files changed

+139
-192
lines changed

splitio/client/factory.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
_INSTANTIATED_FACTORIES = Counter()
6767
_INSTANTIATED_FACTORIES_LOCK = threading.RLock()
6868
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED = 0.1 # 10%
69-
69+
_MAX_RETRY_SYNC_ALL = 3
7070

7171
class Status(Enum):
7272
"""Factory Status."""
@@ -355,9 +355,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
355355
'telemetry': TelemetryAPI(http_client, api_key, sdk_metadata, telemetry_runtime_producer),
356356
}
357357

358-
if not input_validator.validate_apikey_type(apis['segments']):
359-
return None
360-
361358
storages = {
362359
'splits': InMemorySplitStorage(),
363360
'segments': InMemorySegmentStorage(),
@@ -424,7 +421,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
424421
)
425422

426423
if preforked_initialization:
427-
synchronizer.sync_all()
424+
synchronizer.sync_all(max_retry_attempts=_MAX_RETRY_SYNC_ALL)
428425
synchronizer._split_synchronizers._segment_sync.shutdown()
429426
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
430427
recorder, manager, None, telemetry_producer, telemetry_consumer.get_telemetry_init_consumer(), apis['telemetry'], preforked_initialization=preforked_initialization)

splitio/client/input_validator.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -448,31 +448,6 @@ def filter(self, record):
448448
return record.name not in ('SegmentsAPI', 'HttpClient')
449449

450450

451-
def validate_apikey_type(segment_api):
452-
"""
453-
Try to guess if the apikey is of browser type and let the user know.
454-
455-
:param segment_api: Segments API client.
456-
:type segment_api: splitio.api.segments.SegmentsAPI
457-
"""
458-
api_messages_filter = _ApiLogFilter()
459-
_logger = logging.getLogger('splitio.api.segments')
460-
try:
461-
_logger.addFilter(api_messages_filter) # pylint: disable=protected-access
462-
segment_api.fetch_segment('__SOME_INVALID_SEGMENT__', -1, FetchOptions())
463-
except APIException as exc:
464-
if exc.status_code == 403:
465-
_LOGGER.error('factory instantiation: you passed a browser type '
466-
+ 'api_key, please grab an api key from the Split '
467-
+ 'console that is of type sdk')
468-
return False
469-
finally:
470-
_logger.removeFilter(api_messages_filter) # pylint: disable=protected-access
471-
472-
# True doesn't mean that the APIKEY is right, only that it's not of type "browser"
473-
return True
474-
475-
476451
def validate_factory_instantiation(apikey):
477452
"""
478453
Check if the factory if being instantiated with the appropriate arguments.

splitio/engine/impressions/adapters.py

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,16 @@ def flush_counters(self, to_send):
9191
:param uniques: unique keys disctionary
9292
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
9393
"""
94-
bulk_counts = self._build_counters(to_send)
9594
try:
96-
inserted = self._redis_client.rpush(self.IMP_COUNT_QUEUE_KEY, *bulk_counts)
97-
self._expire_keys(self.IMP_COUNT_QUEUE_KEY, self.IMP_COUNT_KEY_DEFAULT_TTL, inserted, len(bulk_counts))
95+
resulted = 0
96+
counted = 0
97+
pipe = self._redis_client.pipeline()
98+
for pf_count in to_send:
99+
pipe.hincrby(self.IMP_COUNT_QUEUE_KEY, pf_count.feature + "::" + str(pf_count.timeframe), pf_count.count)
100+
counted += pf_count.count
101+
resulted = sum(pipe.execute())
102+
self._expire_keys(self.IMP_COUNT_QUEUE_KEY,
103+
self.IMP_COUNT_KEY_DEFAULT_TTL, resulted, counted)
98104
return True
99105
except RedisAdapterException:
100106
_LOGGER.error('Something went wrong when trying to add counters to redis')
@@ -124,23 +130,3 @@ def _uniques_formatter(self, uniques):
124130
:rtype: json
125131
"""
126132
return [json.dumps({'f': feature, 'ks': list(keys)}) for feature, keys in uniques.items()]
127-
128-
def _build_counters(self, counters):
129-
"""
130-
Build an impression bulk formatted as the API expects it.
131-
132-
:param counters: List of impression counters per feature.
133-
:type counters: list[splitio.engine.impressions.Counter.CountPerFeature]
134-
135-
:return: dict with list of impression count dtos
136-
:rtype: dict
137-
"""
138-
return json.dumps({
139-
'pf': [
140-
{
141-
'f': pf_count.feature,
142-
'm': pf_count.timeframe,
143-
'rc': pf_count.count
144-
} for pf_count in counters
145-
]
146-
})

splitio/storage/inmemmory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ def get(self, segment_name):
205205
with self._lock:
206206
fetched = self._segments.get(segment_name)
207207
if fetched is None:
208-
_LOGGER.warning(
208+
_LOGGER.debug(
209209
"Tried to retrieve nonexistant segment %s. Skipping",
210210
segment_name
211211
)

splitio/sync/manager.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from splitio.util.backoff import Backoff
1111
from splitio.util.time import get_current_epoch_time_ms
1212
from splitio.models.telemetry import SSESyncMode, StreamingEventTypes
13+
from splitio.sync.synchronizer import _SYNC_ALL_NO_RETRIES
1314

1415
_LOGGER = logging.getLogger(__name__)
1516

@@ -61,10 +62,10 @@ def recreate(self):
6162
"""Recreate poolers for forked processes."""
6263
self._synchronizer._split_synchronizers._segment_sync.recreate()
6364

64-
def start(self):
65+
def start(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
6566
"""Start the SDK synchronization tasks."""
6667
try:
67-
self._synchronizer.sync_all()
68+
self._synchronizer.sync_all(max_retry_attempts)
6869
self._ready_flag.set()
6970
self._synchronizer.start_periodic_data_recording()
7071
if self._streaming_enabled:

splitio/sync/synchronizer.py

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
import abc
44
import logging
55
import threading
6+
import time
67

78
from splitio.api import APIException
8-
9+
from splitio.util.backoff import Backoff
910

1011
_LOGGER = logging.getLogger(__name__)
11-
12+
_SYNC_ALL_NO_RETRIES = -1
1213

1314
class SplitSynchronizers(object):
1415
"""SplitSynchronizers."""
@@ -224,6 +225,9 @@ def shutdown(self, blocking):
224225
class Synchronizer(BaseSynchronizer):
225226
"""Synchronizer."""
226227

228+
_ON_DEMAND_FETCH_BACKOFF_BASE = 10 # backoff base starting at 10 seconds
229+
_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT = 30 # don't sleep for more than 1 minute
230+
227231
def __init__(self, split_synchronizers, split_tasks):
228232
"""
229233
Class constructor.
@@ -233,6 +237,9 @@ def __init__(self, split_synchronizers, split_tasks):
233237
:param split_tasks: tasks for starting/stopping tasks
234238
:type split_tasks: splitio.sync.synchronizer.SplitTasks
235239
"""
240+
self._backoff = Backoff(
241+
self._ON_DEMAND_FETCH_BACKOFF_BASE,
242+
self._ON_DEMAND_FETCH_BACKOFF_MAX_WAIT)
236243
self._split_synchronizers = split_synchronizers
237244
self._split_tasks = split_tasks
238245
self._periodic_data_recording_tasks = [
@@ -296,14 +303,17 @@ def synchronize_splits(self, till, sync_segments=True):
296303
_LOGGER.debug('Error: ', exc_info=True)
297304
return False
298305

299-
def sync_all(self):
300-
"""Synchronize all split data."""
301-
attempts = 3
302-
while attempts > 0:
306+
def sync_all(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
307+
"""
308+
Synchronize all splits.
309+
:param max_retry_attempts: apply max attempts if it set to absilute integer.
310+
:type max_retry_attempts: int
311+
"""
312+
retry_attempts = 0
313+
while True:
303314
try:
304315
if not self.synchronize_splits(None, False):
305-
attempts -= 1
306-
continue
316+
raise Exception("split sync failed")
307317

308318
# Only retrying splits, since segments may trigger too many calls.
309319

@@ -313,11 +323,16 @@ def sync_all(self):
313323
# All is good
314324
return
315325
except Exception as exc: # pylint:disable=broad-except
316-
attempts -= 1
317326
_LOGGER.error("Exception caught when trying to sync all data: %s", str(exc))
318327
_LOGGER.debug('Error: ', exc_info=True)
328+
if max_retry_attempts != _SYNC_ALL_NO_RETRIES:
329+
retry_attempts += 1
330+
if retry_attempts > max_retry_attempts:
331+
break
332+
how_long = self._backoff.get()
333+
time.sleep(how_long)
319334

320-
_LOGGER.error("Could not correctly synchronize splits and segments after 3 attempts.")
335+
_LOGGER.error("Could not correctly synchronize splits and segments after %d attempts.", retry_attempts)
321336

322337
def shutdown(self, blocking):
323338
"""
@@ -486,8 +501,11 @@ def __init__(self, split_synchronizers, split_tasks):
486501
self._split_synchronizers = split_synchronizers
487502
self._split_tasks = split_tasks
488503

489-
def sync_all(self):
490-
"""Synchronize all split data."""
504+
def sync_all(self, max_retry_attempts=-1):
505+
"""
506+
Synchronize all splits.
507+
:param max_retry_attempts: Not used, added for compatibility
508+
"""
491509
try:
492510
self._split_synchronizers.split_sync.synchronize_splits(None)
493511
except APIException as exc:

splitio/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '9.1.3'
1+
__version__ = '9.2.1'

tests/client/test_client.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ def test_get_treatment(self, mocker):
3939
mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5)
4040

4141
impmanager = mocker.Mock(spec=ImpressionManager)
42-
recorder = StandardRecorder(impmanager, event_storage, impression_storage)
4342
telemetry_storage = InMemoryTelemetryStorage()
4443
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
4544
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
45+
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer())
4646
factory = SplitFactory(mocker.Mock(),
4747
{'splits': split_storage,
4848
'segments': segment_storage,
@@ -108,10 +108,10 @@ def test_get_treatment_with_config(self, mocker):
108108
destroyed_property.return_value = False
109109

110110
impmanager = mocker.Mock(spec=ImpressionManager)
111-
recorder = StandardRecorder(impmanager, event_storage, impression_storage)
112111
telemetry_storage = InMemoryTelemetryStorage()
113112
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
114113
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
114+
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer())
115115
factory = SplitFactory(mocker.Mock(),
116116
{'splits': split_storage,
117117
'segments': segment_storage,
@@ -185,10 +185,10 @@ def test_get_treatments(self, mocker):
185185
destroyed_property.return_value = False
186186

187187
impmanager = mocker.Mock(spec=ImpressionManager)
188-
recorder = StandardRecorder(impmanager, event_storage, impression_storage)
189188
telemetry_storage = InMemoryTelemetryStorage()
190189
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
191190
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
191+
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer())
192192
factory = SplitFactory(mocker.Mock(),
193193
{'splits': split_storage,
194194
'segments': segment_storage,
@@ -258,10 +258,10 @@ def test_get_treatments_with_config(self, mocker):
258258
destroyed_property = mocker.PropertyMock()
259259
destroyed_property.return_value = False
260260
impmanager = mocker.Mock(spec=ImpressionManager)
261-
recorder = StandardRecorder(impmanager, event_storage, impression_storage)
262261
telemetry_storage = InMemoryTelemetryStorage()
263262
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
264263
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
264+
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer())
265265
factory = SplitFactory(mocker.Mock(),
266266
{'splits': split_storage,
267267
'segments': segment_storage,
@@ -335,10 +335,10 @@ def test_destroy(self, mocker):
335335
event_storage = mocker.Mock(spec=EventStorage)
336336

337337
impmanager = mocker.Mock(spec=ImpressionManager)
338-
recorder = StandardRecorder(impmanager, event_storage, impression_storage)
339338
telemetry_storage = InMemoryTelemetryStorage()
340339
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
341340
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
341+
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer())
342342
factory = SplitFactory(mocker.Mock(),
343343
{'splits': split_storage,
344344
'segments': segment_storage,
@@ -367,10 +367,10 @@ def test_track(self, mocker):
367367
event_storage.put.return_value = True
368368

369369
impmanager = mocker.Mock(spec=ImpressionManager)
370-
recorder = StandardRecorder(impmanager, event_storage, impression_storage)
371370
telemetry_storage = InMemoryTelemetryStorage()
372371
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
373372
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
373+
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer())
374374
factory = SplitFactory(mocker.Mock(),
375375
{'splits': split_storage,
376376
'segments': segment_storage,
@@ -404,10 +404,10 @@ def test_evaluations_before_running_post_fork(self, mocker):
404404
destroyed_property.return_value = False
405405

406406
impmanager = mocker.Mock(spec=ImpressionManager)
407-
recorder = StandardRecorder(impmanager, mocker.Mock(), mocker.Mock())
408407
telemetry_storage = InMemoryTelemetryStorage()
409408
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
410409
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
410+
recorder = StandardRecorder(impmanager, mocker.Mock(), mocker.Mock(), telemetry_producer.get_telemetry_evaluation_producer())
411411
factory = SplitFactory(mocker.Mock(),
412412
{'splits': mocker.Mock(),
413413
'segments': mocker.Mock(),
@@ -454,10 +454,10 @@ def test_evaluations_before_running_post_fork(self, mocker):
454454
@mock.patch('splitio.client.client.Client.ready', side_effect=None)
455455
def test_telemetry_not_ready(self, mocker):
456456
impmanager = mocker.Mock(spec=ImpressionManager)
457-
recorder = StandardRecorder(impmanager, mocker.Mock(), mocker.Mock())
458457
telemetry_storage = InMemoryTelemetryStorage()
459458
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
460459
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
460+
recorder = StandardRecorder(impmanager, mocker.Mock(), mocker.Mock(), telemetry_producer.get_telemetry_evaluation_producer())
461461
factory = SplitFactory('localhost',
462462
{'splits': mocker.Mock(),
463463
'segments': mocker.Mock(),
@@ -492,10 +492,10 @@ def test_telemetry_record_treatment_exception(self, mocker):
492492
mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5)
493493

494494
impmanager = mocker.Mock(spec=ImpressionManager)
495-
recorder = StandardRecorder(impmanager, event_storage, impression_storage)
496495
telemetry_storage = InMemoryTelemetryStorage()
497496
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
498497
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
498+
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer())
499499
factory = SplitFactory(mocker.Mock(),
500500
{'splits': split_storage,
501501
'segments': segment_storage,
@@ -536,10 +536,10 @@ def test_telemetry_record_treatments_exception(self, mocker):
536536
mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5)
537537

538538
impmanager = mocker.Mock(spec=ImpressionManager)
539-
recorder = StandardRecorder(impmanager, event_storage, impression_storage)
540539
telemetry_storage = InMemoryTelemetryStorage()
541540
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
542541
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
542+
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer())
543543
factory = SplitFactory(mocker.Mock(),
544544
{'splits': split_storage,
545545
'segments': segment_storage,
@@ -579,10 +579,10 @@ def test_telemetry_method_latency(self, mocker):
579579
mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5)
580580

581581
impmanager = mocker.Mock(spec=ImpressionManager)
582-
recorder = StandardRecorder(impmanager, event_storage, impression_storage)
583582
telemetry_storage = InMemoryTelemetryStorage()
584583
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
585584
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
585+
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer())
586586
factory = SplitFactory(mocker.Mock(),
587587
{'splits': split_storage,
588588
'segments': segment_storage,
@@ -621,10 +621,10 @@ def test_telemetry_track_exception(self, mocker):
621621
mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5)
622622

623623
impmanager = mocker.Mock(spec=ImpressionManager)
624-
recorder = StandardRecorder(impmanager, event_storage, impression_storage)
625624
telemetry_storage = InMemoryTelemetryStorage()
626625
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
627626
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
627+
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer())
628628
factory = SplitFactory(mocker.Mock(),
629629
{'splits': split_storage,
630630
'segments': segment_storage,

0 commit comments

Comments
 (0)