Skip to content

Commit 5524304

Browse files
authored
Merge pull request #311 from splitio/localhost-json-release
Localhost json release
2 parents 2325e67 + 68a38d1 commit 5524304

File tree

16 files changed

+1229
-70
lines changed

16 files changed

+1229
-70
lines changed

CHANGES.txt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
9.3.0-rc1 (Jan 20, 2023)
1+
9.4.0 (Feb 14, 2023)
2+
- Added support to use JSON files in localhost mode.
3+
- Updated default periodic telemetry post time to one hour.
4+
- Fixed unhandeled exception in push.manager.py class when SDK is connected to split proxy
5+
6+
9.3.0 (Jan 30, 2023)
27
- Updated SDK telemetry storage, metrics and updater to be more effective and send less often.
38
- Removed deprecated threading.Thread.setDaemon() method.
49

splitio/client/client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,9 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
115115
)
116116
self._record_stats([(impression, attributes)], start, metric_name, method_name)
117117
return result['treatment'], result['configurations']
118-
except Exception: # pylint: disable=broad-except
118+
except Exception as e: # pylint: disable=broad-except
119119
_LOGGER.error('Error getting treatment for feature')
120+
_LOGGER.error(str(e))
120121
_LOGGER.debug('Error: ', exc_info=True)
121122
self._telemetry_evaluation_producer.record_exception(metric_name)
122123
try:
@@ -370,7 +371,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
370371
_LOGGER.error("Client is not ready - no calls possible")
371372
return False
372373
if not self.ready:
373-
_LOGGER.warn("track: the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method")
374+
_LOGGER.warning("track: the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method")
374375
self._telemetry_init_producer.record_not_ready_usage()
375376

376377
start = get_current_epoch_time_ms()

splitio/client/config.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
'streamingEnabled': True,
1616
'featuresRefreshRate': 30,
1717
'segmentsRefreshRate': 30,
18-
'metricsRefreshRate': 60,
18+
'metricsRefreshRate': 3600,
1919
'impressionsRefreshRate': 5 * 60,
2020
'impressionsBulkSize': 5000,
2121
'impressionsQueueSize': 10000,
@@ -52,6 +52,8 @@
5252
'machineName': None,
5353
'machineIp': None,
5454
'splitFile': os.path.join(os.path.expanduser('~'), '.split'),
55+
'segmentDirectory': os.path.expanduser('~'),
56+
'localhostRefreshEnabled': False,
5557
'preforkedInitialization': False,
5658
'dataSampling': DEFAULT_DATA_SAMPLING,
5759
}
@@ -123,4 +125,8 @@ def sanitize(apikey, config):
123125
config.get('impressionsRefreshRate'))
124126
processed['impressionsMode'] = imp_mode
125127
processed['impressionsRefreshRate'] = imp_rate
128+
if processed['metricsRefreshRate'] < 60:
129+
_LOGGER.warning('metricRefreshRate parameter minimum value is 60 seconds, defaulting to 3600 seconds.')
130+
processed['metricsRefreshRate'] = 3600
131+
126132
return processed

splitio/client/factory.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@
4747
from splitio.sync.synchronizer import SplitTasks, SplitSynchronizers, Synchronizer, \
4848
LocalhostSynchronizer, RedisSynchronizer
4949
from splitio.sync.manager import Manager, RedisManager
50-
from splitio.sync.split import SplitSynchronizer, LocalSplitSynchronizer
51-
from splitio.sync.segment import SegmentSynchronizer
50+
from splitio.sync.split import SplitSynchronizer, LocalSplitSynchronizer, LocalhostMode
51+
from splitio.sync.segment import SegmentSynchronizer, LocalSegmentSynchronizer
5252
from splitio.sync.impression import ImpressionSynchronizer, ImpressionsCountSynchronizer
5353
from splitio.sync.event import EventSynchronizer
5454
from splitio.sync.unique_keys import UniqueKeysSynchronizer, ClearFilterSynchronizer
@@ -163,7 +163,6 @@ def _update_status_when_ready(self):
163163
config_post_thread.setDaemon(True)
164164
config_post_thread.start()
165165

166-
167166
def _get_storage(self, name):
168167
"""
169168
Return a reference to the specified storage.
@@ -526,24 +525,39 @@ def _build_localhost_factory(cfg):
526525
'impressions': LocalhostImpressionsStorage(),
527526
'events': LocalhostEventsStorage(),
528527
}
529-
528+
localhost_mode = LocalhostMode.JSON if cfg['splitFile'][-5:].lower() == '.json' else LocalhostMode.LEGACY
530529
synchronizers = SplitSynchronizers(
531-
LocalSplitSynchronizer(cfg['splitFile'], storages['splits']),
532-
None, None, None, None,
530+
LocalSplitSynchronizer(cfg['splitFile'],
531+
storages['splits'],
532+
localhost_mode),
533+
LocalSegmentSynchronizer(cfg['segmentDirectory'], storages['splits'], storages['segments']),
534+
None, None, None,
533535
)
534536

535-
tasks = SplitTasks(
536-
SplitSynchronizationTask(
537+
split_sync_task = None
538+
segment_sync_task = None
539+
if cfg['localhostRefreshEnabled'] and localhost_mode == LocalhostMode.JSON:
540+
split_sync_task = SplitSynchronizationTask(
537541
synchronizers.split_sync.synchronize_splits,
538542
cfg['featuresRefreshRate'],
539-
), None, None, None, None,
543+
)
544+
segment_sync_task = SegmentSynchronizationTask(
545+
synchronizers.segment_sync.synchronize_segments,
546+
cfg['segmentsRefreshRate'],
547+
)
548+
tasks = SplitTasks(
549+
split_sync_task,
550+
segment_sync_task,
551+
None, None, None,
540552
)
541553

542554
sdk_metadata = util.get_metadata(cfg)
543555
ready_event = threading.Event()
544556
synchronizer = LocalhostSynchronizer(synchronizers, tasks)
545557
manager = Manager(ready_event, synchronizer, None, False, sdk_metadata, telemetry_runtime_producer)
546-
manager.start()
558+
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer", daemon=True)
559+
initialization_thread.start()
560+
547561
recorder = StandardRecorder(
548562
ImpressionsManager(StrategyDebugMode(), telemetry_runtime_producer),
549563
storages['events'],

splitio/sync/segment.py

Lines changed: 144 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import logging
22
import time
3+
import json
34

45
from splitio.api import APIException
56
from splitio.api.commons import FetchOptions
67
from splitio.tasks.util import workerpool
78
from splitio.models import segments
89
from splitio.util.backoff import Backoff
9-
10+
from splitio.sync import util
1011

1112
_LOGGER = logging.getLogger(__name__)
1213

@@ -174,13 +175,153 @@ def synchronize_segments(self, segment_names = None, dont_wait = False):
174175
"""
175176
if segment_names is None:
176177
segment_names = self._split_storage.get_segment_names()
177-
178+
178179
for segment_name in segment_names:
179180
self._worker_pool.submit_work(segment_name)
180181
if (dont_wait):
181182
return True
182183
return not self._worker_pool.wait_for_completion()
183-
184+
185+
def segment_exist_in_storage(self, segment_name):
186+
"""
187+
Check if a segment exists in the storage
188+
189+
:param segment_name: Name of the segment
190+
:type segment_name: str
191+
192+
:return: True if segment exist. False otherwise.
193+
:rtype: bool
194+
"""
195+
return self._segment_storage.get(segment_name) != None
196+
197+
class LocalSegmentSynchronizer(object):
198+
"""Localhost mode segment synchronizer."""
199+
200+
_DEFAULT_SEGMENT_TILL = -1
201+
202+
def __init__(self, segment_folder, split_storage, segment_storage):
203+
"""
204+
Class constructor.
205+
206+
:param segment_folder: patch to the segment folder
207+
:type segment_folder: str
208+
209+
:param split_storage: Split Storage.
210+
:type split_storage: splitio.storage.InMemorySplitStorage
211+
212+
:param segment_storage: Segment storage reference.
213+
:type segment_storage: splitio.storage.SegmentStorage
214+
215+
"""
216+
self._segment_folder = segment_folder
217+
self._split_storage = split_storage
218+
self._segment_storage = segment_storage
219+
self._segment_sha = {}
220+
221+
def synchronize_segments(self, segment_names = None):
222+
"""
223+
Loop through given segment names and synchronize each one.
224+
225+
:param segment_names: Optional, array of segment names to update.
226+
:type segment_name: {str}
227+
228+
:return: True if no error occurs. False otherwise.
229+
:rtype: bool
230+
"""
231+
_LOGGER.info('Synchronizing segments now.')
232+
if segment_names is None:
233+
segment_names = self._split_storage.get_segment_names()
234+
235+
return_flag = True
236+
for segment_name in segment_names:
237+
if not self.synchronize_segment(segment_name):
238+
return_flag = False
239+
240+
return return_flag
241+
242+
def synchronize_segment(self, segment_name, till=None):
243+
"""
244+
Update a segment from queue
245+
246+
:param segment_name: Name of the segment to update.
247+
:type segment_name: str
248+
249+
:param till: ChangeNumber received.
250+
:type till: int
251+
252+
:return: True if no error occurs. False otherwise.
253+
:rtype: bool
254+
"""
255+
try:
256+
fetched = self._read_segment_from_json_file(segment_name)
257+
if fetched is None:
258+
return False
259+
fetched_sha = util._get_sha(json.dumps(fetched))
260+
if not self.segment_exist_in_storage(segment_name):
261+
self._segment_sha[segment_name] = fetched_sha
262+
self._segment_storage.put(segments.from_raw(fetched))
263+
_LOGGER.debug("segment %s is added to storage", segment_name)
264+
else:
265+
if fetched_sha != self._segment_sha[segment_name]:
266+
self._segment_sha[segment_name] = fetched_sha
267+
if self._segment_storage.get_change_number(segment_name) <= fetched['till'] or fetched['till'] == self._DEFAULT_SEGMENT_TILL:
268+
self._segment_storage.update(
269+
segment_name,
270+
fetched['added'],
271+
fetched['removed'],
272+
fetched['till']
273+
)
274+
_LOGGER.debug("segment %s is updated", segment_name)
275+
except Exception as e:
276+
_LOGGER.error("Could not fetch segment: %s \n" + str(e), segment_name)
277+
return False
278+
279+
return True
280+
281+
def _read_segment_from_json_file(self, filename):
282+
"""
283+
Parse a segment and store in segment storage.
284+
285+
:param filename: Path of the file containing split
286+
:type filename: str.
287+
288+
:return: Sanitized segment structure
289+
:rtype: Dict
290+
"""
291+
try:
292+
with open(self._segment_folder + '/' + filename + '.json', 'r') as flo:
293+
parsed = json.load(flo)
294+
santitized_segment = self._sanitize_segment(parsed)
295+
return santitized_segment
296+
except Exception as exc:
297+
raise ValueError("Error parsing file %s. Make sure it's readable." % filename) from exc
298+
299+
def _sanitize_segment(self, parsed):
300+
"""
301+
Sanitize json elements.
302+
303+
:param parsed: segment dict
304+
:type parsed: Dict
305+
306+
:return: sanitized segment structure dict
307+
:rtype: Dict
308+
"""
309+
if 'name' not in parsed or parsed['name'] is None:
310+
_LOGGER.warning("Segment does not have [name] element, skipping")
311+
return None
312+
if parsed['name'].strip() == '':
313+
_LOGGER.warning("Segment [name] element is blank, skipping")
314+
return None
315+
316+
for element in [('till', -1, -1, None, None, [0]),
317+
('added', [], None, None, None, None),
318+
('removed', [], None, None, None, None)
319+
]:
320+
parsed = util._sanitize_object_element(parsed, 'segment', element[0], element[1], lower_value=element[2], upper_value=element[3], in_list=None, not_in_list=element[5])
321+
parsed = util._sanitize_object_element(parsed, 'segment', 'since', parsed['till'], -1, parsed['till'], None, [0])
322+
323+
return parsed
324+
184325
def segment_exist_in_storage(self, segment_name):
185326
"""
186327
Check if a segment exists in the storage

0 commit comments

Comments
 (0)