Skip to content

Commit aae892f

Browse files
authored
Merge branch 'Feature/Async' into async-flagsets-sync
2 parents f7f90ac + a75cd87 commit aae892f

30 files changed

+6312
-3315
lines changed

splitio/api/commons.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
class FetchOptions(object):
77
"""Fetch Options object."""
88

9-
def __init__(self, cache_control_headers=False, change_number=None):
9+
def __init__(self, cache_control_headers=False, change_number=None, sets=None):
1010
"""
1111
Class constructor.
1212
@@ -15,9 +15,13 @@ def __init__(self, cache_control_headers=False, change_number=None):
1515
1616
:param change_number: ChangeNumber to use for bypassing CDN in request.
1717
:type change_number: int
18+
19+
:param sets: list of flag sets
20+
:type sets: list
1821
"""
1922
self._cache_control_headers = cache_control_headers
2023
self._change_number = change_number
24+
self._sets = sets
2125

2226
@property
2327
def cache_control_headers(self):
@@ -29,12 +33,19 @@ def change_number(self):
2933
"""Return change number."""
3034
return self._change_number
3135

36+
@property
37+
def sets(self):
38+
"""Return sets."""
39+
return self._sets
40+
3241
def __eq__(self, other):
3342
"""Match between other options."""
3443
if self._cache_control_headers != other._cache_control_headers:
3544
return False
3645
if self._change_number != other._change_number:
3746
return False
47+
if self._sets != other._sets:
48+
return False
3849
return True
3950

4051

@@ -62,4 +73,6 @@ def build_fetch(change_number, fetch_options, metadata):
6273
extra_headers[_CACHE_CONTROL] = _CACHE_CONTROL_NO_CACHE
6374
if fetch_options.change_number is not None:
6475
query['till'] = fetch_options.change_number
76+
if fetch_options.sets is not None:
77+
query['sets'] = fetch_options.sets
6578
return query, extra_headers

splitio/api/splits.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ def fetch_splits(self, change_number, fetch_options):
5757
if 200 <= response.status_code < 300:
5858
return json.loads(response.body)
5959
else:
60+
if response.status_code == 414:
61+
_LOGGER.error('Error fetching feature flags; the amount of flag sets provided are too big, causing uri length error.')
6062
raise APIException(response.body, response.status_code)
6163
except HttpClientException as exc:
6264
_LOGGER.error('Error fetching feature flags because an exception was raised by the HTTPClient')
@@ -109,6 +111,8 @@ async def fetch_splits(self, change_number, fetch_options):
109111
if 200 <= response.status_code < 300:
110112
return json.loads(response.body)
111113
else:
114+
if response.status_code == 414:
115+
_LOGGER.error('Error fetching feature flags; the amount of flag sets provided are too big, causing uri length error.')
112116
raise APIException(response.body, response.status_code)
113117
except HttpClientException as exc:
114118
_LOGGER.error('Error fetching feature flags because an exception was raised by the HTTPClient')

splitio/api/telemetry.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ def record_init(self, configs):
7171
'Error posting init config because an exception was raised by the HTTPClient'
7272
)
7373
_LOGGER.debug('Error: ', exc_info=True)
74-
raise APIException('Init config data not flushed properly.') from exc
7574

7675
def record_stats(self, stats):
7776
"""
@@ -162,7 +161,6 @@ async def record_init(self, configs):
162161
'Error posting init config because an exception was raised by the HTTPClient'
163162
)
164163
_LOGGER.debug('Error: ', exc_info=True)
165-
raise APIException('Init config data not flushed properly.') from exc
166164

167165
async def record_stats(self, stats):
168166
"""

splitio/client/client.py

Lines changed: 216 additions & 10 deletions
Large diffs are not rendered by default.

splitio/client/config.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44

55
from splitio.engine.impressions import ImpressionsMode
6-
6+
from splitio.client.input_validator import validate_flag_sets
77

88
_LOGGER = logging.getLogger(__name__)
99
DEFAULT_DATA_SAMPLING = 1
@@ -58,7 +58,8 @@
5858
'dataSampling': DEFAULT_DATA_SAMPLING,
5959
'storageWrapper': None,
6060
'storagePrefix': None,
61-
'storageType': None
61+
'storageType': None,
62+
'flagSetsFilter': None
6263
}
6364

6465

@@ -143,4 +144,10 @@ def sanitize(sdk_key, config):
143144
_LOGGER.warning('metricRefreshRate parameter minimum value is 60 seconds, defaulting to 3600 seconds.')
144145
processed['metricsRefreshRate'] = 3600
145146

147+
if config['operationMode'] == 'consumer' and config.get('flagSetsFilter') is not None:
148+
processed['flagSetsFilter'] = None
149+
_LOGGER.warning('config: FlagSets filter is not applicable for Consumer modes where the SDK does keep rollout data in sync. FlagSet filter was discarded.')
150+
else:
151+
processed['flagSetsFilter'] = sorted(validate_flag_sets(processed['flagSetsFilter'], 'SDK Config')) if processed['flagSetsFilter'] is not None else None
152+
146153
return processed

splitio/client/factory.py

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ async def block_until_ready(self, timeout=None):
410410
await asyncio.wait_for(asyncio.shield(self._sdk_ready_flag.wait()), timeout)
411411
except asyncio.TimeoutError as e:
412412
_LOGGER.error("Exception initializing SDK")
413-
_LOGGER.error(str(e))
413+
_LOGGER.debug(str(e))
414414
await self._telemetry_init_producer.record_bur_time_out()
415415
raise TimeoutException('SDK Initialization: time of %d exceeded' % timeout)
416416

@@ -498,7 +498,8 @@ def _wrap_impression_listener_async(listener, metadata):
498498
return None
499499

500500
def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pylint:disable=too-many-arguments,too-many-locals
501-
auth_api_base_url=None, streaming_api_base_url=None, telemetry_api_base_url=None):
501+
auth_api_base_url=None, streaming_api_base_url=None, telemetry_api_base_url=None,
502+
total_flag_sets=0, invalid_flag_sets=0):
502503
"""Build and return a split factory tailored to the supplied config."""
503504
if not input_validator.validate_factory_instantiation(api_key):
504505
return None
@@ -536,7 +537,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
536537
}
537538

538539
storages = {
539-
'splits': InMemorySplitStorage(),
540+
'splits': InMemorySplitStorage(cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
540541
'segments': InMemorySegmentStorage(),
541542
'impressions': InMemoryImpressionStorage(cfg['impressionsQueueSize'], telemetry_runtime_producer),
542543
'events': InMemoryEventStorage(cfg['eventsQueueSize'], telemetry_runtime_producer),
@@ -607,7 +608,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
607608
unique_keys_tracker=unique_keys_tracker
608609
)
609610

610-
telemetry_init_producer.record_config(cfg, extra_cfg)
611+
telemetry_init_producer.record_config(cfg, extra_cfg, total_flag_sets, invalid_flag_sets)
611612

612613
if preforked_initialization:
613614
synchronizer.sync_all(max_retry_attempts=_MAX_RETRY_SYNC_ALL)
@@ -625,7 +626,8 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
625626
telemetry_submitter)
626627

627628
async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=None, # pylint:disable=too-many-arguments,too-many-localsa
628-
auth_api_base_url=None, streaming_api_base_url=None, telemetry_api_base_url=None):
629+
auth_api_base_url=None, streaming_api_base_url=None, telemetry_api_base_url=None,
630+
total_flag_sets=0, invalid_flag_sets=0):
629631
"""Build and return a split factory tailored to the supplied config in async mode."""
630632
if not input_validator.validate_factory_instantiation(api_key):
631633
return None
@@ -663,7 +665,7 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=
663665
}
664666

665667
storages = {
666-
'splits': InMemorySplitStorageAsync(),
668+
'splits': InMemorySplitStorageAsync(cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
667669
'segments': InMemorySegmentStorageAsync(),
668670
'impressions': InMemoryImpressionStorageAsync(cfg['impressionsQueueSize'], telemetry_runtime_producer),
669671
'events': InMemoryEventStorageAsync(cfg['eventsQueueSize'], telemetry_runtime_producer),
@@ -733,7 +735,7 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=
733735
unique_keys_tracker=unique_keys_tracker
734736
)
735737

736-
await telemetry_init_producer.record_config(cfg, extra_cfg)
738+
await telemetry_init_producer.record_config(cfg, extra_cfg, total_flag_sets, invalid_flag_sets)
737739

738740
if preforked_initialization:
739741
await synchronizer.sync_all(max_retry_attempts=_MAX_RETRY_SYNC_ALL)
@@ -814,7 +816,7 @@ def _build_redis_factory(api_key, cfg):
814816
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer", daemon=True)
815817
initialization_thread.start()
816818

817-
telemetry_init_producer.record_config(cfg, {})
819+
telemetry_init_producer.record_config(cfg, {}, 0, 0)
818820

819821
split_factory = SplitFactory(
820822
api_key,
@@ -894,7 +896,7 @@ async def _build_redis_factory_async(api_key, cfg):
894896
)
895897

896898
manager = RedisManagerAsync(synchronizer)
897-
await telemetry_init_producer.record_config(cfg, {})
899+
await telemetry_init_producer.record_config(cfg, {}, 0, 0)
898900
manager.start()
899901

900902
split_factory = SplitFactoryAsync(
@@ -977,7 +979,7 @@ def _build_pluggable_factory(api_key, cfg):
977979
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer", daemon=True)
978980
initialization_thread.start()
979981

980-
telemetry_init_producer.record_config(cfg, {})
982+
telemetry_init_producer.record_config(cfg, {}, 0, 0)
981983

982984
split_factory = SplitFactory(
983985
api_key,
@@ -1056,7 +1058,7 @@ async def _build_pluggable_factory_async(api_key, cfg):
10561058
# Using same class as redis for consumer mode only
10571059
manager = RedisManagerAsync(synchronizer)
10581060
manager.start()
1059-
await telemetry_init_producer.record_config(cfg, {})
1061+
await telemetry_init_producer.record_config(cfg, {}, 0, 0)
10601062

10611063
split_factory = SplitFactoryAsync(
10621064
api_key,
@@ -1083,7 +1085,7 @@ def _build_localhost_factory(cfg):
10831085
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()
10841086

10851087
storages = {
1086-
'splits': InMemorySplitStorage(),
1088+
'splits': InMemorySplitStorage(cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
10871089
'segments': InMemorySegmentStorage(), # not used, just to avoid possible future errors.
10881090
'impressions': LocalhostImpressionsStorage(),
10891091
'events': LocalhostEventsStorage(),
@@ -1238,7 +1240,10 @@ def get_factory(api_key, **kwargs):
12381240
_INSTANTIATED_FACTORIES.update([api_key])
12391241
_INSTANTIATED_FACTORIES_LOCK.release()
12401242

1241-
config = sanitize_config(api_key, kwargs.get('config', {}))
1243+
config_raw = kwargs.get('config', {})
1244+
total_flag_sets, invalid_flag_sets = _get_total_and_invalid_flag_sets(config_raw)
1245+
1246+
config = sanitize_config(api_key, config_raw)
12421247

12431248
if config['operationMode'] == 'localhost':
12441249
split_factory = _build_localhost_factory(config)
@@ -1254,7 +1259,9 @@ def get_factory(api_key, **kwargs):
12541259
kwargs.get('events_api_base_url'),
12551260
kwargs.get('auth_api_base_url'),
12561261
kwargs.get('streaming_api_base_url'),
1257-
kwargs.get('telemetry_api_base_url'))
1262+
kwargs.get('telemetry_api_base_url'),
1263+
total_flag_sets,
1264+
invalid_flag_sets)
12581265

12591266
return split_factory
12601267

@@ -1282,8 +1289,10 @@ async def get_factory_async(api_key, **kwargs):
12821289
_INSTANTIATED_FACTORIES.update([api_key])
12831290
_INSTANTIATED_FACTORIES_LOCK.release()
12841291

1285-
config = sanitize_config(api_key, kwargs.get('config', {}))
1292+
config_raw = kwargs.get('config', {})
1293+
total_flag_sets, invalid_flag_sets = _get_total_and_invalid_flag_sets(config_raw)
12861294

1295+
config = sanitize_config(api_key, config_raw)
12871296
if config['operationMode'] == 'localhost':
12881297
split_factory = await _build_localhost_factory_async(config)
12891298
elif config['storageType'] == 'redis':
@@ -1298,8 +1307,9 @@ async def get_factory_async(api_key, **kwargs):
12981307
kwargs.get('events_api_base_url'),
12991308
kwargs.get('auth_api_base_url'),
13001309
kwargs.get('streaming_api_base_url'),
1301-
kwargs.get('telemetry_api_base_url'))
1302-
1310+
kwargs.get('telemetry_api_base_url'),
1311+
total_flag_sets,
1312+
invalid_flag_sets)
13031313
return split_factory
13041314

13051315
def _get_active_and_redundant_count():
@@ -1310,4 +1320,13 @@ def _get_active_and_redundant_count():
13101320
redundant_factory_count += _INSTANTIATED_FACTORIES[item] - 1
13111321
active_factory_count += _INSTANTIATED_FACTORIES[item]
13121322
_INSTANTIATED_FACTORIES_LOCK.release()
1313-
return redundant_factory_count, active_factory_count
1323+
return redundant_factory_count, active_factory_count
1324+
1325+
def _get_total_and_invalid_flag_sets(config_raw):
1326+
total_flag_sets = 0
1327+
invalid_flag_sets = 0
1328+
if config_raw.get('flagSetsFilter') is not None and isinstance(config_raw.get('flagSetsFilter'), list):
1329+
total_flag_sets = len(config_raw.get('flagSetsFilter'))
1330+
invalid_flag_sets = total_flag_sets - len(input_validator.validate_flag_sets(config_raw.get('flagSetsFilter'), 'Telemetry Init'))
1331+
1332+
return total_flag_sets, invalid_flag_sets

0 commit comments

Comments
 (0)