Skip to content

Commit d5502ba

Browse files
authored
Merge pull request #277 from splitio/development
Version 9.2.0
2 parents 802a2f2 + 4c339ea commit d5502ba

39 files changed

+1573
-316
lines changed

CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
9.2.0 (Oct 14, 2022)
2+
- Added a new impressions mode for the SDK called NONE , to be used in factory when there is no desire to capture impressions on an SDK factory to feed Split's analytics engine. Running NONE mode, the SDK will only capture unique keys evaluated for a particular feature flag instead of full blown impressions
3+
14
9.1.3 (July 25, 2022)
25
- Fixed synching missed segment(s) after receiving split update
36

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
'pyyaml>=5.4',
2020
'docopt>=0.6.2',
2121
'enum34;python_version<"3.4"',
22+
'bloom-filter2>=2.0.0',
2223
]
2324

2425
with open(path.join(path.abspath(path.dirname(__file__)), 'splitio', 'version.py')) as f:

splitio/api/client.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ class HttpClient(object):
2525
SDK_URL = 'https://sdk.split.io/api'
2626
EVENTS_URL = 'https://events.split.io/api'
2727
AUTH_URL = 'https://auth.split.io/api'
28+
TELEMETRY_URL = 'https://telemetry.split.io/api'
2829

29-
def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None):
30+
def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None, telemetry_url=None):
3031
"""
3132
Class constructor.
3233
@@ -38,12 +39,15 @@ def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None):
3839
:type events_url: str
3940
:param auth_url: Optional alternative auth URL.
4041
:type auth_url: str
42+
:param telemetry_url: Optional alternative telemetry URL.
43+
:type telemetry_url: str
4144
"""
4245
self._timeout = timeout/1000 if timeout else None # Convert ms to seconds.
4346
self._urls = {
4447
'sdk': sdk_url if sdk_url is not None else self.SDK_URL,
4548
'events': events_url if events_url is not None else self.EVENTS_URL,
4649
'auth': auth_url if auth_url is not None else self.AUTH_URL,
50+
'telemetry': telemetry_url if telemetry_url is not None else self.TELEMETRY_URL,
4751
}
4852

4953
def _build_url(self, server, path):

splitio/api/impressions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from splitio.api import APIException
77
from splitio.api.client import HttpClientException
88
from splitio.api.commons import headers_from_metadata
9-
from splitio.engine.impressions import ImpressionsMode
9+
from splitio.engine.impressions.impressions import ImpressionsMode
1010

1111

1212
_LOGGER = logging.getLogger(__name__)

splitio/api/telemetry.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"""Impressions API module."""
2+
import logging
3+
4+
from splitio.api import APIException
5+
from splitio.api.client import HttpClientException
6+
from splitio.api.commons import headers_from_metadata
7+
8+
_LOGGER = logging.getLogger(__name__)
9+
10+
class TelemetryAPI(object): # pylint: disable=too-few-public-methods
11+
"""Class that uses an httpClient to communicate with the Telemetry API."""
12+
13+
def __init__(self, client, apikey, sdk_metadata):
14+
"""
15+
Class constructor.
16+
17+
:param client: HTTP Client responsble for issuing calls to the backend.
18+
:type client: HttpClient
19+
:param apikey: User apikey token.
20+
:type apikey: string
21+
"""
22+
self._client = client
23+
self._apikey = apikey
24+
self._metadata = headers_from_metadata(sdk_metadata)
25+
26+
def record_unique_keys(self, uniques):
27+
"""
28+
Send unique keys to the backend.
29+
30+
:param uniques: Unique Keys
31+
:type json
32+
"""
33+
try:
34+
response = self._client.post(
35+
'telemetry',
36+
'/v1/keys/ss',
37+
self._apikey,
38+
body=uniques,
39+
extra_headers=self._metadata
40+
)
41+
if not 200 <= response.status_code < 300:
42+
raise APIException(response.body, response.status_code)
43+
except HttpClientException as exc:
44+
_LOGGER.error(
45+
'Error posting unique keys because an exception was raised by the HTTPClient'
46+
)
47+
_LOGGER.debug('Error: ', exc_info=True)
48+
raise APIException('Unique keys not flushed properly.') from exc

splitio/client/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os.path
33
import logging
44

5-
from splitio.engine.impressions import ImpressionsMode
5+
from splitio.engine.impressions.impressions import ImpressionsMode
66

77

88
_LOGGER = logging.getLogger(__name__)
@@ -91,7 +91,7 @@ def _sanitize_impressions_mode(mode, refresh_rate=None):
9191
mode = ImpressionsMode(mode.upper())
9292
except (ValueError, AttributeError):
9393
_LOGGER.warning('You passed an invalid impressionsMode, impressionsMode should be '
94-
'one of the following values: `debug` or `optimized`. '
94+
'one of the following values: `debug`, `none` or `optimized`. '
9595
'Defaulting to `optimized` mode.')
9696
mode = ImpressionsMode.OPTIMIZED
9797

splitio/client/factory.py

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@
1111
from splitio.client.config import sanitize as sanitize_config, DEFAULT_DATA_SAMPLING
1212
from splitio.client import util
1313
from splitio.client.listener import ImpressionListenerWrapper
14-
from splitio.engine.impressions import Manager as ImpressionsManager
14+
from splitio.engine.impressions.impressions import Manager as ImpressionsManager
15+
from splitio.engine.impressions.impressions import ImpressionsMode
16+
from splitio.engine.impressions.manager import Counter as ImpressionsCounter
17+
from splitio.engine.impressions.strategies import StrategyNoneMode, StrategyDebugMode, StrategyOptimizedMode
18+
from splitio.engine.impressions.adapters import InMemorySenderAdapter, RedisSenderAdapter
19+
from splitio.engine.impressions import set_classes
1520

1621
# Storage
1722
from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \
@@ -27,21 +32,25 @@
2732
from splitio.api.impressions import ImpressionsAPI
2833
from splitio.api.events import EventsAPI
2934
from splitio.api.auth import AuthAPI
35+
from splitio.api.telemetry import TelemetryAPI
3036

3137
# Tasks
3238
from splitio.tasks.split_sync import SplitSynchronizationTask
3339
from splitio.tasks.segment_sync import SegmentSynchronizationTask
3440
from splitio.tasks.impressions_sync import ImpressionsSyncTask, ImpressionsCountSyncTask
3541
from splitio.tasks.events_sync import EventsSyncTask
42+
from splitio.tasks.unique_keys_sync import UniqueKeysSyncTask, ClearFilterSyncTask
3643

3744
# Synchronizer
3845
from splitio.sync.synchronizer import SplitTasks, SplitSynchronizers, Synchronizer, \
39-
LocalhostSynchronizer
40-
from splitio.sync.manager import Manager
46+
LocalhostSynchronizer, RedisSynchronizer
47+
from splitio.sync.manager import Manager, RedisManager
4148
from splitio.sync.split import SplitSynchronizer, LocalSplitSynchronizer
4249
from splitio.sync.segment import SegmentSynchronizer
4350
from splitio.sync.impression import ImpressionSynchronizer, ImpressionsCountSynchronizer
4451
from splitio.sync.event import EventSynchronizer
52+
from splitio.sync.unique_keys import UniqueKeysSynchronizer, ClearFilterSynchronizer
53+
4554

4655
# Recorder
4756
from splitio.recorder.recorder import StandardRecorder, PipelinedRecorder
@@ -207,6 +216,7 @@ def destroy(self, destroyed_event=None):
207216
return
208217

209218
try:
219+
_LOGGER.info('Factory destroy called, stopping tasks.')
210220
if self._sync_manager is not None:
211221
if destroyed_event is not None:
212222

@@ -283,7 +293,7 @@ def _wrap_impression_listener(listener, metadata):
283293

284294

285295
def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pylint:disable=too-many-arguments,too-many-locals
286-
auth_api_base_url=None, streaming_api_base_url=None):
296+
auth_api_base_url=None, streaming_api_base_url=None, telemetry_api_base_url=None):
287297
"""Build and return a split factory tailored to the supplied config."""
288298
if not input_validator.validate_factory_instantiation(api_key):
289299
return None
@@ -292,6 +302,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
292302
sdk_url=sdk_url,
293303
events_url=events_url,
294304
auth_url=auth_api_base_url,
305+
telemetry_url=telemetry_api_base_url,
295306
timeout=cfg.get('connectionTimeout')
296307
)
297308

@@ -302,6 +313,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
302313
'segments': SegmentsAPI(http_client, api_key, sdk_metadata),
303314
'impressions': ImpressionsAPI(http_client, api_key, sdk_metadata, cfg['impressionsMode']),
304315
'events': EventsAPI(http_client, api_key, sdk_metadata),
316+
'telemetry': TelemetryAPI(http_client, api_key, sdk_metadata),
305317
}
306318

307319
if not input_validator.validate_apikey_type(apis['segments']):
@@ -314,18 +326,23 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
314326
'events': InMemoryEventStorage(cfg['eventsQueueSize']),
315327
}
316328

329+
unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \
330+
clear_filter_task, impressions_count_sync, impressions_count_task, \
331+
imp_strategy = set_classes('MEMORY', cfg['impressionsMode'], apis)
332+
317333
imp_manager = ImpressionsManager(
318-
cfg['impressionsMode'],
319-
True,
320-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata))
334+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
335+
imp_strategy)
321336

322337
synchronizers = SplitSynchronizers(
323338
SplitSynchronizer(apis['splits'], storages['splits']),
324339
SegmentSynchronizer(apis['segments'], storages['splits'], storages['segments']),
325340
ImpressionSynchronizer(apis['impressions'], storages['impressions'],
326341
cfg['impressionsBulkSize']),
327342
EventSynchronizer(apis['events'], storages['events'], cfg['eventsBulkSize']),
328-
ImpressionsCountSynchronizer(apis['impressions'], imp_manager),
343+
impressions_count_sync,
344+
unique_keys_synchronizer,
345+
clear_filter_sync
329346
)
330347

331348
tasks = SplitTasks(
@@ -342,7 +359,9 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
342359
cfg['impressionsRefreshRate'],
343360
),
344361
EventsSyncTask(synchronizers.events_sync.synchronize_events, cfg['eventsPushRate']),
345-
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters)
362+
impressions_count_task,
363+
unique_keys_task,
364+
clear_filter_task
346365
)
347366

348367
synchronizer = Synchronizer(synchronizers, tasks)
@@ -393,19 +412,47 @@ def _build_redis_factory(api_key, cfg):
393412
_LOGGER.warning("dataSampling cannot be less than %.2f, defaulting to minimum",
394413
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED)
395414
data_sampling = _MIN_DEFAULT_DATA_SAMPLING_ALLOWED
415+
416+
unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \
417+
clear_filter_task, impressions_count_sync, impressions_count_task, \
418+
imp_strategy = set_classes('REDIS', cfg['impressionsMode'], redis_adapter)
419+
420+
imp_manager = ImpressionsManager(
421+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
422+
imp_strategy)
423+
424+
synchronizers = SplitSynchronizers(None, None, None, None,
425+
impressions_count_sync,
426+
unique_keys_synchronizer,
427+
clear_filter_sync
428+
)
429+
430+
tasks = SplitTasks(None, None, None, None,
431+
impressions_count_task,
432+
unique_keys_task,
433+
clear_filter_task
434+
)
435+
436+
synchronizer = RedisSynchronizer(synchronizers, tasks)
396437
recorder = PipelinedRecorder(
397438
redis_adapter.pipeline,
398-
ImpressionsManager(cfg['impressionsMode'], False,
399-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata)),
439+
imp_manager,
400440
storages['events'],
401441
storages['impressions'],
402442
data_sampling,
403443
)
444+
445+
manager = RedisManager(synchronizer)
446+
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer")
447+
initialization_thread.setDaemon(True)
448+
initialization_thread.start()
449+
404450
return SplitFactory(
405451
api_key,
406452
storages,
407453
cfg['labelsEnabled'],
408454
recorder,
455+
manager,
409456
)
410457

411458

@@ -436,7 +483,7 @@ def _build_localhost_factory(cfg):
436483
manager = Manager(ready_event, synchronizer, None, False, sdk_metadata)
437484
manager.start()
438485
recorder = StandardRecorder(
439-
ImpressionsManager(cfg['impressionsMode'], True, None),
486+
ImpressionsManager(None, StrategyDebugMode()),
440487
storages['events'],
441488
storages['impressions'],
442489
)
@@ -485,7 +532,8 @@ def get_factory(api_key, **kwargs):
485532
kwargs.get('sdk_api_base_url'),
486533
kwargs.get('events_api_base_url'),
487534
kwargs.get('auth_api_base_url'),
488-
kwargs.get('streaming_api_base_url')
535+
kwargs.get('streaming_api_base_url'),
536+
kwargs.get('telemetry_api_base_url')
489537
)
490538
finally:
491539
_INSTANTIATED_FACTORIES.update([api_key])

splitio/engine/filters.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import abc
2+
import threading
3+
4+
from bloom_filter2 import BloomFilter as BloomFilter2
5+
6+
class BaseFilter(object, metaclass=abc.ABCMeta):
7+
"""Impressions Filter interface."""
8+
9+
@abc.abstractmethod
10+
def add(self, data):
11+
"""
12+
Return a boolean flag
13+
14+
"""
15+
pass
16+
17+
@abc.abstractmethod
18+
def contains(self, data):
19+
"""
20+
Return a boolean flag
21+
22+
"""
23+
pass
24+
25+
@abc.abstractmethod
26+
def clear(self):
27+
"""
28+
No return
29+
30+
"""
31+
pass
32+
33+
class BloomFilter(BaseFilter):
34+
"""Optimized mode strategy."""
35+
36+
def __init__(self, max_elements=5000, error_rate=0.01):
37+
"""
38+
Construct a bloom filter instance.
39+
40+
:param max_element: maximum elements in the filter
41+
:type string:
42+
43+
:param error_rate: error rate for the false positives, reduce it will consume more memory
44+
:type numeric:
45+
"""
46+
self._max_elements = max_elements
47+
self._error_rate = error_rate
48+
self._imps_bloom_filter = BloomFilter2(max_elements=self._max_elements, error_rate=self._error_rate)
49+
self._lock = threading.RLock()
50+
51+
def add(self, data):
52+
"""
53+
Add an item to the bloom filter instance.
54+
55+
:param data: element to be added
56+
:type string:
57+
58+
:return: True if successful
59+
:rtype: boolean
60+
"""
61+
with self._lock:
62+
self._imps_bloom_filter.add(data)
63+
return data in self._imps_bloom_filter
64+
65+
def contains(self, data):
66+
"""
67+
Check if an item exist in the bloom filter instance.
68+
69+
:param data: element to be checked
70+
:type string:
71+
72+
:return: True if exist
73+
:rtype: boolean
74+
"""
75+
with self._lock:
76+
return data in self._imps_bloom_filter
77+
78+
def clear(self):
79+
"""
80+
Destroy the current filter instance and create new one.
81+
82+
"""
83+
with self._lock:
84+
self._imps_bloom_filter.close()
85+
self._imps_bloom_filter = BloomFilter2(max_elements=self._max_elements, error_rate=self._error_rate)

0 commit comments

Comments
 (0)