Skip to content

Commit 3fac14b

Browse files
authored
Merge branch 'Feature/FlagSets' into flagsets-storage-redis
2 parents 474c72f + a69f8f6 commit 3fac14b

File tree

8 files changed

+95
-46
lines changed

8 files changed

+95
-46
lines changed

splitio/client/client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,7 @@ def _get_feature_flag_names_by_flag_sets(self, flag_sets, method_name):
435435
return []
436436
return feature_flags_by_set
437437

438+
438439
def _build_impression( # pylint: disable=too-many-arguments
439440
self,
440441
matching_key,

splitio/client/factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ def _build_pluggable_factory(api_key, cfg):
523523
pluggable_adapter = cfg.get('storageWrapper')
524524
storage_prefix = cfg.get('storagePrefix')
525525
storages = {
526-
'splits': PluggableSplitStorage(pluggable_adapter, storage_prefix),
526+
'splits': PluggableSplitStorage(pluggable_adapter, storage_prefix, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
527527
'segments': PluggableSegmentStorage(pluggable_adapter, storage_prefix),
528528
'impressions': PluggableImpressionsStorage(pluggable_adapter, sdk_metadata, storage_prefix),
529529
'events': PluggableEventsStorage(pluggable_adapter, sdk_metadata, storage_prefix),

splitio/models/telemetry.py

Lines changed: 43 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class CounterConstants(Enum):
2727
EVENTS_QUEUED = 'eventsQueued'
2828
EVENTS_DROPPED = 'eventsDropped'
2929

30-
class ConfigParams(Enum):
30+
class _ConfigParams(Enum):
3131
"""Config parameters constants"""
3232
SPLITS_REFRESH_RATE = 'featuresRefreshRate'
3333
SEGMENTS_REFRESH_RATE = 'segmentsRefreshRate'
@@ -41,8 +41,9 @@ class ConfigParams(Enum):
4141
EVENTS_QUEUE_SIZE = 'eventsQueueSize'
4242
IMPRESSIONS_MODE = 'impressionsMode'
4343
IMPRESSIONS_LISTENER = 'impressionListener'
44+
FLAG_SETS = 'flagSetsFilter'
4445

45-
class ExtraConfig(Enum):
46+
class _ExtraConfig(Enum):
4647
"""Extra config constants"""
4748
ACTIVE_FACTORY_COUNT = 'activeFactoryCount'
4849
REDUNDANT_FACTORY_COUNT = 'redundantFactoryCount'
@@ -53,7 +54,7 @@ class ExtraConfig(Enum):
5354
HTTP_PROXY = 'httpProxy'
5455
HTTPS_PROXY_ENV = 'HTTPS_PROXY'
5556

56-
class ApiURLs(Enum):
57+
class _ApiURLs(Enum):
5758
"""Api URL constants"""
5859
SDK_URL = 'sdk_url'
5960
EVENTS_URL = 'events_url'
@@ -88,7 +89,7 @@ class MethodExceptionsAndLatencies(Enum):
8889
TREATMENTS_WITH_CONFIG_BY_FLAG_SETS = 'treatments_with_config_by_flag_sets'
8990
TRACK = 'track'
9091

91-
class LastSynchronizationConstants(Enum):
92+
class _LastSynchronizationConstants(Enum):
9293
"""Last sync constants"""
9394
LAST_SYNCHRONIZATIONS = 'lastSynchronizations'
9495

@@ -108,7 +109,7 @@ class SSESyncMode(Enum):
108109
STREAMING = 0
109110
POLLING = 1
110111

111-
class StreamingEventsConstant(Enum):
112+
class _StreamingEventsConstant(Enum):
112113
"""Storage types constant"""
113114
STREAMING_EVENTS = 'streamingEvents'
114115

@@ -426,7 +427,7 @@ def get_all(self):
426427
:rtype: dict
427428
"""
428429
with self._lock:
429-
return {LastSynchronizationConstants.LAST_SYNCHRONIZATIONS.value: {HTTPExceptionsAndLatencies.SPLIT.value: self._split, HTTPExceptionsAndLatencies.SEGMENT.value: self._segment, HTTPExceptionsAndLatencies.IMPRESSION.value: self._impression,
430+
return {_LastSynchronizationConstants.LAST_SYNCHRONIZATIONS.value: {HTTPExceptionsAndLatencies.SPLIT.value: self._split, HTTPExceptionsAndLatencies.SEGMENT.value: self._segment, HTTPExceptionsAndLatencies.IMPRESSION.value: self._impression,
430431
HTTPExceptionsAndLatencies.IMPRESSION_COUNT.value: self._impression_count, HTTPExceptionsAndLatencies.EVENT.value: self._event,
431432
HTTPExceptionsAndLatencies.TELEMETRY.value: self._telemetry, HTTPExceptionsAndLatencies.TOKEN.value: self._token}
432433
}
@@ -671,6 +672,8 @@ def pop_update_from_sse(self, event):
671672
:rtype: int
672673
"""
673674
with self._lock:
675+
if self._update_from_sse.get(event.value) is None:
676+
return 0
674677
update_from_sse = self._update_from_sse[event.value]
675678
self._update_from_sse[event.value] = 0
676679
return update_from_sse
@@ -758,7 +761,7 @@ def pop_streaming_events(self):
758761
with self._lock:
759762
streaming_events = self._streaming_events
760763
self._streaming_events = []
761-
return {StreamingEventsConstant.STREAMING_EVENTS.value: [{'e': streaming_event.type, 'd': streaming_event.data,
764+
return {_StreamingEventsConstant.STREAMING_EVENTS.value: [{'e': streaming_event.type, 'd': streaming_event.data,
762765
't': streaming_event.time} for streaming_event in streaming_events]}
763766

764767
class TelemetryConfig(object):
@@ -780,10 +783,10 @@ def _reset_all(self):
780783
self._operation_mode = None
781784
self._storage_type = None
782785
self._streaming_enabled = None
783-
self._refresh_rate = {ConfigParams.SPLITS_REFRESH_RATE.value: 0, ConfigParams.SEGMENTS_REFRESH_RATE.value: 0,
784-
ConfigParams.IMPRESSIONS_REFRESH_RATE.value: 0, ConfigParams.EVENTS_REFRESH_RATE.value: 0, ConfigParams.TELEMETRY_REFRESH_RATE.value: 0}
785-
self._url_override = {ApiURLs.SDK_URL.value: False, ApiURLs.EVENTS_URL.value: False, ApiURLs.AUTH_URL.value: False,
786-
ApiURLs.STREAMING_URL.value: False, ApiURLs.TELEMETRY_URL.value: False}
786+
self._refresh_rate = {_ConfigParams.SPLITS_REFRESH_RATE.value: 0, _ConfigParams.SEGMENTS_REFRESH_RATE.value: 0,
787+
_ConfigParams.IMPRESSIONS_REFRESH_RATE.value: 0, _ConfigParams.EVENTS_REFRESH_RATE.value: 0, _ConfigParams.TELEMETRY_REFRESH_RATE.value: 0}
788+
self._url_override = {_ApiURLs.SDK_URL.value: False, _ApiURLs.EVENTS_URL.value: False, _ApiURLs.AUTH_URL.value: False,
789+
_ApiURLs.STREAMING_URL.value: False, _ApiURLs.TELEMETRY_URL.value: False}
787790
self._impressions_queue_size = 0
788791
self._events_queue_size = 0
789792
self._impressions_mode = None
@@ -816,16 +819,17 @@ def record_config(self, config, extra_config):
816819
:type config: dict
817820
"""
818821
with self._lock:
819-
self._operation_mode = self._get_operation_mode(config[ConfigParams.OPERATION_MODE.value])
820-
self._storage_type = self._get_storage_type(config[ConfigParams.OPERATION_MODE.value], config[ConfigParams.STORAGE_TYPE.value])
821-
self._streaming_enabled = config[ConfigParams.STREAMING_ENABLED.value]
822+
self._operation_mode = self._get_operation_mode(config[_ConfigParams.OPERATION_MODE.value])
823+
self._storage_type = self._get_storage_type(config[_ConfigParams.OPERATION_MODE.value], config[_ConfigParams.STORAGE_TYPE.value])
824+
self._streaming_enabled = config[_ConfigParams.STREAMING_ENABLED.value]
822825
self._refresh_rate = self._get_refresh_rates(config)
823826
self._url_override = self._get_url_overrides(extra_config)
824-
self._impressions_queue_size = config[ConfigParams.IMPRESSIONS_QUEUE_SIZE.value]
825-
self._events_queue_size = config[ConfigParams.EVENTS_QUEUE_SIZE.value]
826-
self._impressions_mode = self._get_impressions_mode(config[ConfigParams.IMPRESSIONS_MODE.value])
827-
self._impression_listener = True if config[ConfigParams.IMPRESSIONS_LISTENER.value] is not None else False
827+
self._impressions_queue_size = config[_ConfigParams.IMPRESSIONS_QUEUE_SIZE.value]
828+
self._events_queue_size = config[_ConfigParams.EVENTS_QUEUE_SIZE.value]
829+
self._impressions_mode = self._get_impressions_mode(config[_ConfigParams.IMPRESSIONS_MODE.value])
830+
self._impression_listener = True if config[_ConfigParams.IMPRESSIONS_LISTENER.value] is not None else False
828831
self._http_proxy = self._check_if_proxy_detected()
832+
self._flag_sets = len(config[_ConfigParams.FLAG_SETS.value]) if config[_ConfigParams.FLAG_SETS.value] is not None else 0
829833

830834
def record_active_and_redundant_factories(self, active_factory_count, redundant_factory_count):
831835
with self._lock:
@@ -911,16 +915,16 @@ def get_stats(self):
911915
'oM': self._operation_mode,
912916
'sT': self._storage_type,
913917
'sE': self._streaming_enabled,
914-
'rR': {'sp': self._refresh_rate[ConfigParams.SPLITS_REFRESH_RATE.value],
915-
'se': self._refresh_rate[ConfigParams.SEGMENTS_REFRESH_RATE.value],
916-
'im': self._refresh_rate[ConfigParams.IMPRESSIONS_REFRESH_RATE.value],
917-
'ev': self._refresh_rate[ConfigParams.EVENTS_REFRESH_RATE.value],
918-
'te': self._refresh_rate[ConfigParams.TELEMETRY_REFRESH_RATE.value]},
919-
'uO': {'s': self._url_override[ApiURLs.SDK_URL.value],
920-
'e': self._url_override[ApiURLs.EVENTS_URL.value],
921-
'a': self._url_override[ApiURLs.AUTH_URL.value],
922-
'st': self._url_override[ApiURLs.STREAMING_URL.value],
923-
't': self._url_override[ApiURLs.TELEMETRY_URL.value]},
918+
'rR': {'sp': self._refresh_rate[_ConfigParams.SPLITS_REFRESH_RATE.value],
919+
'se': self._refresh_rate[_ConfigParams.SEGMENTS_REFRESH_RATE.value],
920+
'im': self._refresh_rate[_ConfigParams.IMPRESSIONS_REFRESH_RATE.value],
921+
'ev': self._refresh_rate[_ConfigParams.EVENTS_REFRESH_RATE.value],
922+
'te': self._refresh_rate[_ConfigParams.TELEMETRY_REFRESH_RATE.value]},
923+
'uO': {'s': self._url_override[_ApiURLs.SDK_URL.value],
924+
'e': self._url_override[_ApiURLs.EVENTS_URL.value],
925+
'a': self._url_override[_ApiURLs.AUTH_URL.value],
926+
'st': self._url_override[_ApiURLs.STREAMING_URL.value],
927+
't': self._url_override[_ApiURLs.TELEMETRY_URL.value]},
924928
'iQ': self._impressions_queue_size,
925929
'eQ': self._events_queue_size,
926930
'iM': self._impressions_mode,
@@ -979,11 +983,11 @@ def _get_refresh_rates(self, config):
979983
"""
980984
with self._lock:
981985
return {
982-
ConfigParams.SPLITS_REFRESH_RATE.value: config[ConfigParams.SPLITS_REFRESH_RATE.value],
983-
ConfigParams.SEGMENTS_REFRESH_RATE.value: config[ConfigParams.SEGMENTS_REFRESH_RATE.value],
984-
ConfigParams.IMPRESSIONS_REFRESH_RATE.value: config[ConfigParams.IMPRESSIONS_REFRESH_RATE.value],
985-
ConfigParams.EVENTS_REFRESH_RATE.value: config[ConfigParams.EVENTS_REFRESH_RATE.value],
986-
ConfigParams.TELEMETRY_REFRESH_RATE.value: config[ConfigParams.TELEMETRY_REFRESH_RATE.value]
986+
_ConfigParams.SPLITS_REFRESH_RATE.value: config[_ConfigParams.SPLITS_REFRESH_RATE.value],
987+
_ConfigParams.SEGMENTS_REFRESH_RATE.value: config[_ConfigParams.SEGMENTS_REFRESH_RATE.value],
988+
_ConfigParams.IMPRESSIONS_REFRESH_RATE.value: config[_ConfigParams.IMPRESSIONS_REFRESH_RATE.value],
989+
_ConfigParams.EVENTS_REFRESH_RATE.value: config[_ConfigParams.EVENTS_REFRESH_RATE.value],
990+
_ConfigParams.TELEMETRY_REFRESH_RATE.value: config[_ConfigParams.TELEMETRY_REFRESH_RATE.value]
987991
}
988992

989993
def _get_url_overrides(self, config):
@@ -998,11 +1002,11 @@ def _get_url_overrides(self, config):
9981002
"""
9991003
with self._lock:
10001004
return {
1001-
ApiURLs.SDK_URL.value: True if ApiURLs.SDK_URL.value in config else False,
1002-
ApiURLs.EVENTS_URL.value: True if ApiURLs.EVENTS_URL.value in config else False,
1003-
ApiURLs.AUTH_URL.value: True if ApiURLs.AUTH_URL.value in config else False,
1004-
ApiURLs.STREAMING_URL.value: True if ApiURLs.STREAMING_URL.value in config else False,
1005-
ApiURLs.TELEMETRY_URL.value: True if ApiURLs.TELEMETRY_URL.value in config else False
1005+
_ApiURLs.SDK_URL.value: True if _ApiURLs.SDK_URL.value in config else False,
1006+
_ApiURLs.EVENTS_URL.value: True if _ApiURLs.EVENTS_URL.value in config else False,
1007+
_ApiURLs.AUTH_URL.value: True if _ApiURLs.AUTH_URL.value in config else False,
1008+
_ApiURLs.STREAMING_URL.value: True if _ApiURLs.STREAMING_URL.value in config else False,
1009+
_ApiURLs.TELEMETRY_URL.value: True if _ApiURLs.TELEMETRY_URL.value in config else False
10061010
}
10071011

10081012
def _get_impressions_mode(self, imp_mode):
@@ -1032,6 +1036,6 @@ def _check_if_proxy_detected(self):
10321036
"""
10331037
with self._lock:
10341038
for x in os.environ:
1035-
if x.upper() == ExtraConfig.HTTPS_PROXY_ENV.value:
1039+
if x.upper() == _ExtraConfig.HTTPS_PROXY_ENV.value:
10361040
return True
10371041
return False

splitio/storage/pluggable.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,29 @@ def get_feature_flags_by_sets(self, flag_sets):
100100
_LOGGER.debug('Error: ', exc_info=True)
101101
return None
102102

103+
def get_feature_flags_by_sets(self, flag_sets):
104+
"""
105+
Retrieve feature flags by flag set.
106+
107+
:param flag_set: Names of the flag set to fetch.
108+
:type flag_set: str
109+
110+
:return: Feature flag names that are tagged with the flag set
111+
:rtype: listt(str)
112+
"""
113+
try:
114+
sets_to_fetch = get_valid_flag_sets(flag_sets, self._config_flag_sets)
115+
if sets_to_fetch == []:
116+
return []
117+
118+
keys = [self._feature_flag_set_prefix.format(flag_set=flag_set) for flag_set in sets_to_fetch]
119+
return self._pluggable_adapter.get_many(keys)
120+
except Exception:
121+
_LOGGER.error('Error fetching feature flag from storage')
122+
_LOGGER.debug('Error: ', exc_info=True)
123+
return None
124+
125+
103126
# TODO: To be added when producer mode is supported
104127
# def put_many(self, splits, change_number):
105128
# """

splitio/util/storage_helper.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,20 @@ def combine_valid_flag_sets(result_sets):
6969
if isinstance(result_set, set) and len(result_set) > 0:
7070
to_return.update(result_set)
7171
return to_return
72+
73+
def _check_flag_sets(feature_flag_storage, feature_flag):
74+
"""
75+
Check all flag sets in a feature flag, return True if any of sets exist in storage
76+
77+
:param feature_flag_storage: Feature flag storage instance
78+
:type feature_flag_storage: splitio.storage.inmemory.InMemorySplitStorage
79+
:param feature_flag: Feature flag instance to validate.
80+
:type feature_flag: splitio.models.splits.Split
81+
82+
:return: True if any of its flag_set exist. False otherwise.
83+
:rtype: bool
84+
"""
85+
for flag_set in feature_flag.sets:
86+
if feature_flag_storage.is_flag_set_exist(flag_set):
87+
return True
88+
return False

tests/models/test_telemetry_model.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,8 @@ def test_telemetry_config(self):
297297
'impressionsRefreshRate': 60,
298298
'eventsPushRate': 60,
299299
'metricsRefreshRate': 10,
300-
'storageType': None
300+
'storageType': None,
301+
'flagSetsFilter': None
301302
}
302303
telemetry_config.record_config(config, {})
303304
assert(telemetry_config.get_stats() == {'oM': 0,

tests/storage/test_pluggable.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ def get_many(self, keys):
8585
returned_keys = []
8686
for key in self._keys:
8787
if key in keys:
88-
returned_keys.append(self._keys[key])
88+
if isinstance(self._keys[key], list):
89+
returned_keys.extend(self._keys[key])
90+
else:
91+
returned_keys.append(self._keys[key])
8992
return returned_keys
9093

9194
def add_items(self, key, added_items):
@@ -164,10 +167,10 @@ def test_get(self):
164167
pluggable_split_storage = PluggableSplitStorage(self.mock_adapter, prefix=sprefix)
165168

166169
split1 = splits.from_raw(splits_json['splitChange1_2']['splits'][0])
167-
split_name = splits_json['splitChange1_2']['splits'][0]['name']
170+
feature_flag_name = splits_json['splitChange1_2']['splits'][0]['name']
168171

169-
self.mock_adapter.set(pluggable_split_storage._prefix.format(feature_flag_name=split_name), split1.to_json())
170-
assert(pluggable_split_storage.get(split_name).to_json() == splits.from_raw(splits_json['splitChange1_2']['splits'][0]).to_json())
172+
self.mock_adapter.set(pluggable_split_storage._prefix.format(feature_flag_name=feature_flag_name), split1.to_json())
173+
assert(pluggable_split_storage.get(feature_flag_name).to_json() == splits.from_raw(splits_json['splitChange1_2']['splits'][0]).to_json())
171174
assert(pluggable_split_storage.get('not_existing') == None)
172175

173176
def test_fetch_many(self):

tests/util/test_storage_helper.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,4 @@ def test_combine_valid_flag_sets(self):
126126
assert combine_valid_flag_sets(results_set) == {'set2', 'set3'}
127127

128128
results_set = ['set1', {'set2', 'set3'}]
129-
assert combine_valid_flag_sets(results_set) == {'set2', 'set3'}
129+
assert combine_valid_flag_sets(results_set) == {'set2', 'set3'}

0 commit comments

Comments
 (0)