Skip to content

Commit 2de48b9

Browse files
authored
Merge pull request #564 from splitio/rbs-old-spec-fetcher
Added support for old spec in fetcher
2 parents 6e8188d + 9aa56a1 commit 2de48b9

File tree

11 files changed

+326
-35
lines changed

11 files changed

+326
-35
lines changed

splitio/api/client.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,25 @@ def proxy_headers(self, proxy):
9292
class HttpClientBase(object, metaclass=abc.ABCMeta):
9393
"""HttpClient wrapper template."""
9494

95+
def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None, telemetry_url=None):
96+
"""
97+
Class constructor.
98+
99+
:param timeout: How many milliseconds to wait until the server responds.
100+
:type timeout: int
101+
:param sdk_url: Optional alternative sdk URL.
102+
:type sdk_url: str
103+
:param events_url: Optional alternative events URL.
104+
:type events_url: str
105+
:param auth_url: Optional alternative auth URL.
106+
:type auth_url: str
107+
:param telemetry_url: Optional alternative telemetry URL.
108+
:type telemetry_url: str
109+
"""
110+
_LOGGER.debug("Initializing httpclient")
111+
self._timeout = timeout/1000 if timeout else None # Convert ms to seconds.
112+
self._urls = _construct_urls(sdk_url, events_url, auth_url, telemetry_url)
113+
95114
@abc.abstractmethod
96115
def get(self, server, path, apikey):
97116
"""http get request"""
@@ -113,6 +132,9 @@ def set_telemetry_data(self, metric_name, telemetry_runtime_producer):
113132
self._telemetry_runtime_producer = telemetry_runtime_producer
114133
self._metric_name = metric_name
115134

135+
def is_sdk_endpoint_overridden(self):
136+
return self._urls['sdk'] == SDK_URL
137+
116138
def _get_headers(self, extra_headers, sdk_key):
117139
headers = _build_basic_headers(sdk_key)
118140
if extra_headers is not None:
@@ -154,10 +176,8 @@ def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None, t
154176
:param telemetry_url: Optional alternative telemetry URL.
155177
:type telemetry_url: str
156178
"""
157-
_LOGGER.debug("Initializing httpclient")
158-
self._timeout = timeout/1000 if timeout else None # Convert ms to seconds.
159-
self._urls = _construct_urls(sdk_url, events_url, auth_url, telemetry_url)
160-
179+
HttpClientBase.__init__(self, timeout, sdk_url, events_url, auth_url, telemetry_url)
180+
161181
def get(self, server, path, sdk_key, query=None, extra_headers=None): # pylint: disable=too-many-arguments
162182
"""
163183
Issue a get request.
@@ -241,8 +261,7 @@ def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None, t
241261
:param telemetry_url: Optional alternative telemetry URL.
242262
:type telemetry_url: str
243263
"""
244-
self._timeout = timeout/1000 if timeout else None # Convert ms to seconds.
245-
self._urls = _construct_urls(sdk_url, events_url, auth_url, telemetry_url)
264+
HttpClientBase.__init__(self, timeout, sdk_url, events_url, auth_url, telemetry_url)
246265
self._session = aiohttp.ClientSession()
247266

248267
async def get(self, server, path, apikey, query=None, extra_headers=None): # pylint: disable=too-many-arguments

splitio/api/splits.py

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@
44
import json
55

66
from splitio.api import APIException, headers_from_metadata
7-
from splitio.api.commons import build_fetch
7+
from splitio.api.commons import build_fetch, FetchOptions
88
from splitio.api.client import HttpClientException
99
from splitio.models.telemetry import HTTPExceptionsAndLatencies
10+
from splitio.util.time import utctime_ms
11+
from splitio.spec import SPEC_VERSION
1012

1113
_LOGGER = logging.getLogger(__name__)
14+
_SPEC_1_1 = "1.1"
15+
_PROXY_CHECK_INTERVAL_MILLISECONDS_SS = 24 * 60 * 60 * 1000
1216

13-
14-
class SplitsAPI(object): # pylint: disable=too-few-public-methods
17+
class SplitsAPIBase(object): # pylint: disable=too-few-public-methods
1518
"""Class that uses an httpClient to communicate with the splits API."""
1619

1720
def __init__(self, client, sdk_key, sdk_metadata, telemetry_runtime_producer):
@@ -30,6 +33,35 @@ def __init__(self, client, sdk_key, sdk_metadata, telemetry_runtime_producer):
3033
self._metadata = headers_from_metadata(sdk_metadata)
3134
self._telemetry_runtime_producer = telemetry_runtime_producer
3235
self._client.set_telemetry_data(HTTPExceptionsAndLatencies.SPLIT, self._telemetry_runtime_producer)
36+
self._spec_version = SPEC_VERSION
37+
self._last_proxy_check_timestamp = 0
38+
self.clear_storage = False
39+
40+
def _convert_to_new_spec(self, body):
41+
return {"ff": {"d": body["splits"], "s": body["since"], "t": body["till"]},
42+
"rbs": {"d": [], "s": -1, "t": -1}}
43+
44+
def _check_last_proxy_check_timestamp(self):
45+
if self._spec_version == _SPEC_1_1 and ((utctime_ms() - self._last_proxy_check_timestamp) >= _PROXY_CHECK_INTERVAL_MILLISECONDS_SS):
46+
_LOGGER.info("Switching to new Feature flag spec (%s) and fetching.", SPEC_VERSION);
47+
self._spec_version = SPEC_VERSION
48+
49+
50+
class SplitsAPI(SplitsAPIBase): # pylint: disable=too-few-public-methods
51+
"""Class that uses an httpClient to communicate with the splits API."""
52+
53+
def __init__(self, client, sdk_key, sdk_metadata, telemetry_runtime_producer):
54+
"""
55+
Class constructor.
56+
57+
:param client: HTTP Client responsble for issuing calls to the backend.
58+
:type client: HttpClient
59+
:param sdk_key: User sdk_key token.
60+
:type sdk_key: string
61+
:param sdk_metadata: SDK version & machine name & IP.
62+
:type sdk_metadata: splitio.client.util.SdkMetadata
63+
"""
64+
SplitsAPIBase.__init__(self, client, sdk_key, sdk_metadata, telemetry_runtime_producer)
3365

3466
def fetch_splits(self, change_number, rbs_change_number, fetch_options):
3567
"""
@@ -48,6 +80,7 @@ def fetch_splits(self, change_number, rbs_change_number, fetch_options):
4880
:rtype: dict
4981
"""
5082
try:
83+
self._check_last_proxy_check_timestamp()
5184
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata, rbs_change_number)
5285
response = self._client.get(
5386
'sdk',
@@ -57,19 +90,32 @@ def fetch_splits(self, change_number, rbs_change_number, fetch_options):
5790
query=query,
5891
)
5992
if 200 <= response.status_code < 300:
93+
if self._spec_version == _SPEC_1_1:
94+
return self._convert_to_new_spec(json.loads(response.body))
95+
96+
self.clear_storage = self._last_proxy_check_timestamp != 0
97+
self._last_proxy_check_timestamp = 0
6098
return json.loads(response.body)
6199

62100
else:
63101
if response.status_code == 414:
64102
_LOGGER.error('Error fetching feature flags; the amount of flag sets provided are too big, causing uri length error.')
103+
104+
if self._client.is_sdk_endpoint_overridden() and response.status_code == 400 and self._spec_version == SPEC_VERSION:
105+
_LOGGER.warning('Detected proxy response error, changing spec version from %s to %s and re-fetching.', self._spec_version, _SPEC_1_1)
106+
self._spec_version = _SPEC_1_1
107+
self._last_proxy_check_timestamp = utctime_ms()
108+
return self.fetch_splits(change_number, None, FetchOptions(fetch_options.cache_control_headers, fetch_options.change_number,
109+
None, fetch_options.sets, self._spec_version))
110+
65111
raise APIException(response.body, response.status_code)
112+
66113
except HttpClientException as exc:
67114
_LOGGER.error('Error fetching feature flags because an exception was raised by the HTTPClient')
68115
_LOGGER.debug('Error: ', exc_info=True)
69116
raise APIException('Feature flags not fetched correctly.') from exc
70117

71-
72-
class SplitsAPIAsync(object): # pylint: disable=too-few-public-methods
118+
class SplitsAPIAsync(SplitsAPIBase): # pylint: disable=too-few-public-methods
73119
"""Class that uses an httpClient to communicate with the splits API."""
74120

75121
def __init__(self, client, sdk_key, sdk_metadata, telemetry_runtime_producer):
@@ -83,11 +129,7 @@ def __init__(self, client, sdk_key, sdk_metadata, telemetry_runtime_producer):
83129
:param sdk_metadata: SDK version & machine name & IP.
84130
:type sdk_metadata: splitio.client.util.SdkMetadata
85131
"""
86-
self._client = client
87-
self._sdk_key = sdk_key
88-
self._metadata = headers_from_metadata(sdk_metadata)
89-
self._telemetry_runtime_producer = telemetry_runtime_producer
90-
self._client.set_telemetry_data(HTTPExceptionsAndLatencies.SPLIT, self._telemetry_runtime_producer)
132+
SplitsAPIBase.__init__(self, client, sdk_key, sdk_metadata, telemetry_runtime_producer)
91133

92134
async def fetch_splits(self, change_number, rbs_change_number, fetch_options):
93135
"""
@@ -106,6 +148,7 @@ async def fetch_splits(self, change_number, rbs_change_number, fetch_options):
106148
:rtype: dict
107149
"""
108150
try:
151+
self._check_last_proxy_check_timestamp()
109152
query, extra_headers = build_fetch(change_number, fetch_options, self._metadata, rbs_change_number)
110153
response = await self._client.get(
111154
'sdk',
@@ -115,12 +158,26 @@ async def fetch_splits(self, change_number, rbs_change_number, fetch_options):
115158
query=query,
116159
)
117160
if 200 <= response.status_code < 300:
161+
if self._spec_version == _SPEC_1_1:
162+
return self._convert_to_new_spec(json.loads(response.body))
163+
164+
self.clear_storage = self._last_proxy_check_timestamp != 0
165+
self._last_proxy_check_timestamp = 0
118166
return json.loads(response.body)
119167

120168
else:
121169
if response.status_code == 414:
122170
_LOGGER.error('Error fetching feature flags; the amount of flag sets provided are too big, causing uri length error.')
171+
172+
if self._client.is_sdk_endpoint_overridden() and response.status_code == 400 and self._spec_version == SPEC_VERSION:
173+
_LOGGER.warning('Detected proxy response error, changing spec version from %s to %s and re-fetching.', self._spec_version, _SPEC_1_1)
174+
self._spec_version = _SPEC_1_1
175+
self._last_proxy_check_timestamp = utctime_ms()
176+
return await self.fetch_splits(change_number, None, FetchOptions(fetch_options.cache_control_headers, fetch_options.change_number,
177+
None, fetch_options.sets, self._spec_version))
178+
123179
raise APIException(response.body, response.status_code)
180+
124181
except HttpClientException as exc:
125182
_LOGGER.error('Error fetching feature flags because an exception was raised by the HTTPClient')
126183
_LOGGER.debug('Error: ', exc_info=True)

splitio/models/rule_based_segments.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from splitio.models import MatcherNotFoundException
66
from splitio.models.splits import _DEFAULT_CONDITIONS_TEMPLATE
77
from splitio.models.grammar import condition
8+
from splitio.models.splits import Status
89

910
_LOGGER = logging.getLogger(__name__)
1011

@@ -31,9 +32,12 @@ def __init__(self, name, traffic_type_name, change_number, status, conditions, e
3132
self._name = name
3233
self._traffic_type_name = traffic_type_name
3334
self._change_number = change_number
34-
self._status = status
3535
self._conditions = conditions
3636
self._excluded = excluded
37+
try:
38+
self._status = Status(status)
39+
except ValueError:
40+
self._status = Status.ARCHIVED
3741

3842
@property
3943
def name(self):
@@ -71,7 +75,7 @@ def to_json(self):
7175
'changeNumber': self.change_number,
7276
'trafficTypeName': self.traffic_type_name,
7377
'name': self.name,
74-
'status': self.status,
78+
'status': self.status.value,
7579
'conditions': [c.to_json() for c in self.conditions],
7680
'excluded': self.excluded.to_json()
7781
}

splitio/storage/inmemmory.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,14 @@ def __init__(self):
116116
self._rule_based_segments = {}
117117
self._change_number = -1
118118

119+
def clear(self):
120+
"""
121+
Clear storage
122+
"""
123+
with self._lock:
124+
self._rule_based_segments = {}
125+
self._change_number = -1
126+
119127
def get(self, segment_name):
120128
"""
121129
Retrieve a rule based segment.
@@ -231,6 +239,14 @@ def __init__(self):
231239
self._rule_based_segments = {}
232240
self._change_number = -1
233241

242+
async def clear(self):
243+
"""
244+
Clear storage
245+
"""
246+
with self._lock:
247+
self._rule_based_segments = {}
248+
self._change_number = -1
249+
234250
async def get(self, segment_name):
235251
"""
236252
Retrieve a rule based segment.
@@ -466,6 +482,16 @@ def __init__(self, flag_sets=[]):
466482
self.flag_set = FlagSets(flag_sets)
467483
self.flag_set_filter = FlagSetsFilter(flag_sets)
468484

485+
def clear(self):
486+
"""
487+
Clear storage
488+
"""
489+
with self._lock:
490+
self._feature_flags = {}
491+
self._change_number = -1
492+
self._traffic_types = Counter()
493+
self.flag_set = FlagSets(self.flag_set_filter.flag_sets)
494+
469495
def get(self, feature_flag_name):
470496
"""
471497
Retrieve a feature flag.
@@ -672,6 +698,16 @@ def __init__(self, flag_sets=[]):
672698
self.flag_set = FlagSets(flag_sets)
673699
self.flag_set_filter = FlagSetsFilter(flag_sets)
674700

701+
async def clear(self):
702+
"""
703+
Clear storage
704+
"""
705+
with self._lock:
706+
self._feature_flags = {}
707+
self._change_number = -1
708+
self._traffic_types = Counter()
709+
self.flag_set = FlagSets(self.flag_set_filter.flag_sets)
710+
675711
async def get(self, feature_flag_name):
676712
"""
677713
Retrieve a feature flag.

splitio/sync/split.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,10 @@ def _fetch_until(self, fetch_options, till=None, rbs_till=None):
135135
raise exc
136136

137137
fetched_rule_based_segments = [(rule_based_segments.from_raw(rule_based_segment)) for rule_based_segment in feature_flag_changes.get('rbs').get('d', [])]
138-
rbs_segment_list = update_rule_based_segment_storage(self._rule_based_segment_storage, fetched_rule_based_segments, feature_flag_changes.get('rbs')['t'])
138+
rbs_segment_list = update_rule_based_segment_storage(self._rule_based_segment_storage, fetched_rule_based_segments, feature_flag_changes.get('rbs')['t'], self._api.clear_storage)
139139

140140
fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('ff').get('d', [])]
141-
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes.get('ff')['t'])
141+
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes.get('ff')['t'], self._api.clear_storage)
142142
segment_list.update(rbs_segment_list)
143143

144144
if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']:
@@ -294,10 +294,10 @@ async def _fetch_until(self, fetch_options, till=None, rbs_till=None):
294294
raise exc
295295

296296
fetched_rule_based_segments = [(rule_based_segments.from_raw(rule_based_segment)) for rule_based_segment in feature_flag_changes.get('rbs').get('d', [])]
297-
rbs_segment_list = await update_rule_based_segment_storage_async(self._rule_based_segment_storage, fetched_rule_based_segments, feature_flag_changes.get('rbs')['t'])
297+
rbs_segment_list = await update_rule_based_segment_storage_async(self._rule_based_segment_storage, fetched_rule_based_segments, feature_flag_changes.get('rbs')['t'], self._api.clear_storage)
298298

299299
fetched_feature_flags = [(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('ff').get('d', [])]
300-
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes.get('ff')['t'])
300+
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes.get('ff')['t'], self._api.clear_storage)
301301
segment_list.update(rbs_segment_list)
302302

303303
if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']:
@@ -541,7 +541,7 @@ def _sanitize_rb_segment_elements(self, parsed_rb_segments):
541541
_LOGGER.warning("A rule based segment in json file does not have (Name) or property is empty, skipping.")
542542
continue
543543
for element in [('trafficTypeName', 'user', None, None, None, None),
544-
('status', 'ACTIVE', None, None, ['ACTIVE', 'ARCHIVED'], None),
544+
('status', splits.Status.ACTIVE, None, None, [splits.Status.ACTIVE, splits.Status.ARCHIVED], None),
545545
('changeNumber', 0, 0, None, None, None)]:
546546
rb_segment = util._sanitize_object_element(rb_segment, 'rule based segment', element[0], element[1], lower_value=element[2], upper_value=element[3], in_list=element[4], not_in_list=element[5])
547547
rb_segment = self._sanitize_condition(rb_segment)

0 commit comments

Comments
 (0)