Skip to content

Commit 3dc14ca

Browse files
authored
Merge pull request #320 from splitio/development
Release version 9.4.0
2 parents 9fe731e + 330f2ba commit 3dc14ca

17 files changed

+1268
-86
lines changed

CHANGES.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
9.4.0 (Mar 1, 2023)
2+
- Added support to use JSON files in localhost mode.
3+
- Updated default periodic telemetry post time to one hour.
4+
- Fixed unhandeled exception in push.manager.py class when SDK is connected to split proxy
5+
16
9.3.0 (Jan 30, 2023)
27
- Updated SDK telemetry storage, metrics and updater to be more effective and send less often.
38
- Removed deprecated threading.Thread.setDaemon() method.

splitio/client/client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,9 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
115115
)
116116
self._record_stats([(impression, attributes)], start, metric_name, method_name)
117117
return result['treatment'], result['configurations']
118-
except Exception: # pylint: disable=broad-except
118+
except Exception as e: # pylint: disable=broad-except
119119
_LOGGER.error('Error getting treatment for feature')
120+
_LOGGER.error(str(e))
120121
_LOGGER.debug('Error: ', exc_info=True)
121122
self._telemetry_evaluation_producer.record_exception(metric_name)
122123
try:
@@ -370,7 +371,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
370371
_LOGGER.error("Client is not ready - no calls possible")
371372
return False
372373
if not self.ready:
373-
_LOGGER.warn("track: the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method")
374+
_LOGGER.warning("track: the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method")
374375
self._telemetry_init_producer.record_not_ready_usage()
375376

376377
start = get_current_epoch_time_ms()

splitio/client/config.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
'streamingEnabled': True,
1616
'featuresRefreshRate': 30,
1717
'segmentsRefreshRate': 30,
18-
'metricsRefreshRate': 60,
18+
'metricsRefreshRate': 3600,
1919
'impressionsRefreshRate': 5 * 60,
2020
'impressionsBulkSize': 5000,
2121
'impressionsQueueSize': 10000,
@@ -52,6 +52,8 @@
5252
'machineName': None,
5353
'machineIp': None,
5454
'splitFile': os.path.join(os.path.expanduser('~'), '.split'),
55+
'segmentDirectory': os.path.expanduser('~'),
56+
'localhostRefreshEnabled': False,
5557
'preforkedInitialization': False,
5658
'dataSampling': DEFAULT_DATA_SAMPLING,
5759
}
@@ -123,4 +125,8 @@ def sanitize(apikey, config):
123125
config.get('impressionsRefreshRate'))
124126
processed['impressionsMode'] = imp_mode
125127
processed['impressionsRefreshRate'] = imp_rate
128+
if processed['metricsRefreshRate'] < 60:
129+
_LOGGER.warning('metricRefreshRate parameter minimum value is 60 seconds, defaulting to 3600 seconds.')
130+
processed['metricsRefreshRate'] = 3600
131+
126132
return processed

splitio/client/factory.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@
4747
from splitio.sync.synchronizer import SplitTasks, SplitSynchronizers, Synchronizer, \
4848
LocalhostSynchronizer, RedisSynchronizer
4949
from splitio.sync.manager import Manager, RedisManager
50-
from splitio.sync.split import SplitSynchronizer, LocalSplitSynchronizer
51-
from splitio.sync.segment import SegmentSynchronizer
50+
from splitio.sync.split import SplitSynchronizer, LocalSplitSynchronizer, LocalhostMode
51+
from splitio.sync.segment import SegmentSynchronizer, LocalSegmentSynchronizer
5252
from splitio.sync.impression import ImpressionSynchronizer, ImpressionsCountSynchronizer
5353
from splitio.sync.event import EventSynchronizer
5454
from splitio.sync.unique_keys import UniqueKeysSynchronizer, ClearFilterSynchronizer
@@ -163,7 +163,6 @@ def _update_status_when_ready(self):
163163
config_post_thread.setDaemon(True)
164164
config_post_thread.start()
165165

166-
167166
def _get_storage(self, name):
168167
"""
169168
Return a reference to the specified storage.
@@ -526,24 +525,44 @@ def _build_localhost_factory(cfg):
526525
'impressions': LocalhostImpressionsStorage(),
527526
'events': LocalhostEventsStorage(),
528527
}
529-
528+
localhost_mode = LocalhostMode.JSON if cfg['splitFile'][-5:].lower() == '.json' else LocalhostMode.LEGACY
530529
synchronizers = SplitSynchronizers(
531-
LocalSplitSynchronizer(cfg['splitFile'], storages['splits']),
532-
None, None, None, None,
530+
LocalSplitSynchronizer(cfg['splitFile'],
531+
storages['splits'],
532+
localhost_mode),
533+
LocalSegmentSynchronizer(cfg['segmentDirectory'], storages['splits'], storages['segments']),
534+
None, None, None,
533535
)
534536

535-
tasks = SplitTasks(
536-
SplitSynchronizationTask(
537+
split_sync_task = None
538+
segment_sync_task = None
539+
if cfg['localhostRefreshEnabled'] and localhost_mode == LocalhostMode.JSON:
540+
split_sync_task = SplitSynchronizationTask(
537541
synchronizers.split_sync.synchronize_splits,
538542
cfg['featuresRefreshRate'],
539-
), None, None, None, None,
543+
)
544+
segment_sync_task = SegmentSynchronizationTask(
545+
synchronizers.segment_sync.synchronize_segments,
546+
cfg['segmentsRefreshRate'],
547+
)
548+
tasks = SplitTasks(
549+
split_sync_task,
550+
segment_sync_task,
551+
None, None, None,
540552
)
541553

542554
sdk_metadata = util.get_metadata(cfg)
543555
ready_event = threading.Event()
544-
synchronizer = LocalhostSynchronizer(synchronizers, tasks)
556+
synchronizer = LocalhostSynchronizer(synchronizers, tasks, localhost_mode)
545557
manager = Manager(ready_event, synchronizer, None, False, sdk_metadata, telemetry_runtime_producer)
546-
manager.start()
558+
559+
# TODO: BUR is only applied for Localhost JSON mode, in future legacy and yaml will also use BUR
560+
if localhost_mode == LocalhostMode.JSON:
561+
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer", daemon=True)
562+
initialization_thread.start()
563+
else:
564+
manager.start()
565+
547566
recorder = StandardRecorder(
548567
ImpressionsManager(StrategyDebugMode(), telemetry_runtime_producer),
549568
storages['events'],

splitio/sync/segment.py

Lines changed: 143 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import logging
22
import time
3+
import json
4+
import os
35

46
from splitio.api import APIException
57
from splitio.api.commons import FetchOptions
68
from splitio.tasks.util import workerpool
79
from splitio.models import segments
810
from splitio.util.backoff import Backoff
9-
11+
from splitio.sync import util
1012

1113
_LOGGER = logging.getLogger(__name__)
1214

@@ -174,13 +176,151 @@ def synchronize_segments(self, segment_names = None, dont_wait = False):
174176
"""
175177
if segment_names is None:
176178
segment_names = self._split_storage.get_segment_names()
177-
179+
178180
for segment_name in segment_names:
179181
self._worker_pool.submit_work(segment_name)
180182
if (dont_wait):
181183
return True
182184
return not self._worker_pool.wait_for_completion()
183-
185+
186+
def segment_exist_in_storage(self, segment_name):
187+
"""
188+
Check if a segment exists in the storage
189+
190+
:param segment_name: Name of the segment
191+
:type segment_name: str
192+
193+
:return: True if segment exist. False otherwise.
194+
:rtype: bool
195+
"""
196+
return self._segment_storage.get(segment_name) != None
197+
198+
class LocalSegmentSynchronizer(object):
199+
"""Localhost mode segment synchronizer."""
200+
201+
_DEFAULT_SEGMENT_TILL = -1
202+
203+
def __init__(self, segment_folder, split_storage, segment_storage):
204+
"""
205+
Class constructor.
206+
207+
:param segment_folder: patch to the segment folder
208+
:type segment_folder: str
209+
210+
:param split_storage: Split Storage.
211+
:type split_storage: splitio.storage.InMemorySplitStorage
212+
213+
:param segment_storage: Segment storage reference.
214+
:type segment_storage: splitio.storage.SegmentStorage
215+
216+
"""
217+
self._segment_folder = segment_folder
218+
self._split_storage = split_storage
219+
self._segment_storage = segment_storage
220+
self._segment_sha = {}
221+
222+
def synchronize_segments(self, segment_names = None):
223+
"""
224+
Loop through given segment names and synchronize each one.
225+
226+
:param segment_names: Optional, array of segment names to update.
227+
:type segment_name: {str}
228+
229+
:return: True if no error occurs. False otherwise.
230+
:rtype: bool
231+
"""
232+
_LOGGER.info('Synchronizing segments now.')
233+
if segment_names is None:
234+
segment_names = self._split_storage.get_segment_names()
235+
236+
return_flag = True
237+
for segment_name in segment_names:
238+
if not self.synchronize_segment(segment_name):
239+
return_flag = False
240+
241+
return return_flag
242+
243+
def synchronize_segment(self, segment_name, till=None):
244+
"""
245+
Update a segment from queue
246+
247+
:param segment_name: Name of the segment to update.
248+
:type segment_name: str
249+
250+
:param till: ChangeNumber received.
251+
:type till: int
252+
253+
:return: True if no error occurs. False otherwise.
254+
:rtype: bool
255+
"""
256+
try:
257+
fetched = self._read_segment_from_json_file(segment_name)
258+
fetched_sha = util._get_sha(json.dumps(fetched))
259+
if not self.segment_exist_in_storage(segment_name):
260+
self._segment_sha[segment_name] = fetched_sha
261+
self._segment_storage.put(segments.from_raw(fetched))
262+
_LOGGER.debug("segment %s is added to storage", segment_name)
263+
return True
264+
265+
if fetched_sha == self._segment_sha[segment_name]:
266+
return True
267+
268+
self._segment_sha[segment_name] = fetched_sha
269+
if self._segment_storage.get_change_number(segment_name) > fetched['till'] and fetched['till'] != self._DEFAULT_SEGMENT_TILL:
270+
return True
271+
272+
self._segment_storage.update(segment_name, fetched['added'], fetched['removed'], fetched['till'])
273+
_LOGGER.debug("segment %s is updated", segment_name)
274+
except Exception as e:
275+
_LOGGER.error("Could not fetch segment: %s \n" + str(e), segment_name)
276+
return False
277+
278+
return True
279+
280+
def _read_segment_from_json_file(self, filename):
281+
"""
282+
Parse a segment and store in segment storage.
283+
284+
:param filename: Path of the file containing split
285+
:type filename: str.
286+
287+
:return: Sanitized segment structure
288+
:rtype: Dict
289+
"""
290+
try:
291+
with open(os.path.join(self._segment_folder, "%s.json" % filename), 'r') as flo:
292+
parsed = json.load(flo)
293+
santitized_segment = self._sanitize_segment(parsed)
294+
return santitized_segment
295+
except Exception as exc:
296+
raise ValueError("Error parsing file %s. Make sure it's readable." % filename) from exc
297+
298+
def _sanitize_segment(self, parsed):
299+
"""
300+
Sanitize json elements.
301+
302+
:param parsed: segment dict
303+
:type parsed: Dict
304+
305+
:return: sanitized segment structure dict
306+
:rtype: Dict
307+
"""
308+
if 'name' not in parsed or parsed['name'] is None:
309+
_LOGGER.warning("Segment does not have [name] element, skipping")
310+
raise Exception("Segment does not have [name] element")
311+
if parsed['name'].strip() == '':
312+
_LOGGER.warning("Segment [name] element is blank, skipping")
313+
raise Exception("Segment [name] element is blank")
314+
315+
for element in [('till', -1, -1, None, None, [0]),
316+
('added', [], None, None, None, None),
317+
('removed', [], None, None, None, None)
318+
]:
319+
parsed = util._sanitize_object_element(parsed, 'segment', element[0], element[1], lower_value=element[2], upper_value=element[3], in_list=None, not_in_list=element[5])
320+
parsed = util._sanitize_object_element(parsed, 'segment', 'since', parsed['till'], -1, parsed['till'], None, [0])
321+
322+
return parsed
323+
184324
def segment_exist_in_storage(self, segment_name):
185325
"""
186326
Check if a segment exists in the storage

0 commit comments

Comments
 (0)