Skip to content

Commit 54c4a7c

Browse files
committed
Merge branch 'development' of github.com:splitio/python-client into development
2 parents a358613 + d1d8985 commit 54c4a7c

38 files changed

+1570
-316
lines changed

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
'pyyaml>=5.4',
2020
'docopt>=0.6.2',
2121
'enum34;python_version<"3.4"',
22+
'bloom-filter2>=2.0.0',
2223
]
2324

2425
with open(path.join(path.abspath(path.dirname(__file__)), 'splitio', 'version.py')) as f:

splitio/api/client.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ class HttpClient(object):
2525
SDK_URL = 'https://sdk.split.io/api'
2626
EVENTS_URL = 'https://events.split.io/api'
2727
AUTH_URL = 'https://auth.split.io/api'
28+
TELEMETRY_URL = 'https://telemetry.split.io/api'
2829

29-
def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None):
30+
def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None, telemetry_url=None):
3031
"""
3132
Class constructor.
3233
@@ -38,12 +39,15 @@ def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None):
3839
:type events_url: str
3940
:param auth_url: Optional alternative auth URL.
4041
:type auth_url: str
42+
:param telemetry_url: Optional alternative telemetry URL.
43+
:type telemetry_url: str
4144
"""
4245
self._timeout = timeout/1000 if timeout else None # Convert ms to seconds.
4346
self._urls = {
4447
'sdk': sdk_url if sdk_url is not None else self.SDK_URL,
4548
'events': events_url if events_url is not None else self.EVENTS_URL,
4649
'auth': auth_url if auth_url is not None else self.AUTH_URL,
50+
'telemetry': telemetry_url if telemetry_url is not None else self.TELEMETRY_URL,
4751
}
4852

4953
def _build_url(self, server, path):

splitio/api/impressions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from splitio.api import APIException
77
from splitio.api.client import HttpClientException
88
from splitio.api.commons import headers_from_metadata
9-
from splitio.engine.impressions import ImpressionsMode
9+
from splitio.engine.impressions.impressions import ImpressionsMode
1010

1111

1212
_LOGGER = logging.getLogger(__name__)

splitio/api/telemetry.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"""Impressions API module."""
2+
import logging
3+
4+
from splitio.api import APIException
5+
from splitio.api.client import HttpClientException
6+
from splitio.api.commons import headers_from_metadata
7+
8+
_LOGGER = logging.getLogger(__name__)
9+
10+
class TelemetryAPI(object): # pylint: disable=too-few-public-methods
11+
"""Class that uses an httpClient to communicate with the Telemetry API."""
12+
13+
def __init__(self, client, apikey, sdk_metadata):
14+
"""
15+
Class constructor.
16+
17+
:param client: HTTP Client responsble for issuing calls to the backend.
18+
:type client: HttpClient
19+
:param apikey: User apikey token.
20+
:type apikey: string
21+
"""
22+
self._client = client
23+
self._apikey = apikey
24+
self._metadata = headers_from_metadata(sdk_metadata)
25+
26+
def record_unique_keys(self, uniques):
27+
"""
28+
Send unique keys to the backend.
29+
30+
:param uniques: Unique Keys
31+
:type json
32+
"""
33+
try:
34+
response = self._client.post(
35+
'telemetry',
36+
'/keys/ss',
37+
self._apikey,
38+
body=uniques,
39+
extra_headers=self._metadata
40+
)
41+
if not 200 <= response.status_code < 300:
42+
raise APIException(response.body, response.status_code)
43+
except HttpClientException as exc:
44+
_LOGGER.error(
45+
'Error posting unique keys because an exception was raised by the HTTPClient'
46+
)
47+
_LOGGER.debug('Error: ', exc_info=True)
48+
raise APIException('Unique keys not flushed properly.') from exc

splitio/client/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os.path
33
import logging
44

5-
from splitio.engine.impressions import ImpressionsMode
5+
from splitio.engine.impressions.impressions import ImpressionsMode
66

77

88
_LOGGER = logging.getLogger(__name__)
@@ -91,7 +91,7 @@ def _sanitize_impressions_mode(mode, refresh_rate=None):
9191
mode = ImpressionsMode(mode.upper())
9292
except (ValueError, AttributeError):
9393
_LOGGER.warning('You passed an invalid impressionsMode, impressionsMode should be '
94-
'one of the following values: `debug` or `optimized`. '
94+
'one of the following values: `debug`, `none` or `optimized`. '
9595
'Defaulting to `optimized` mode.')
9696
mode = ImpressionsMode.OPTIMIZED
9797

splitio/client/factory.py

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@
1111
from splitio.client.config import sanitize as sanitize_config, DEFAULT_DATA_SAMPLING
1212
from splitio.client import util
1313
from splitio.client.listener import ImpressionListenerWrapper
14-
from splitio.engine.impressions import Manager as ImpressionsManager
14+
from splitio.engine.impressions.impressions import Manager as ImpressionsManager
15+
from splitio.engine.impressions.impressions import ImpressionsMode
16+
from splitio.engine.impressions.manager import Counter as ImpressionsCounter
17+
from splitio.engine.impressions.strategies import StrategyNoneMode, StrategyDebugMode, StrategyOptimizedMode
18+
from splitio.engine.impressions.adapters import InMemorySenderAdapter, RedisSenderAdapter
19+
from splitio.engine.impressions import set_classes
1520

1621
# Storage
1722
from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \
@@ -27,21 +32,25 @@
2732
from splitio.api.impressions import ImpressionsAPI
2833
from splitio.api.events import EventsAPI
2934
from splitio.api.auth import AuthAPI
35+
from splitio.api.telemetry import TelemetryAPI
3036

3137
# Tasks
3238
from splitio.tasks.split_sync import SplitSynchronizationTask
3339
from splitio.tasks.segment_sync import SegmentSynchronizationTask
3440
from splitio.tasks.impressions_sync import ImpressionsSyncTask, ImpressionsCountSyncTask
3541
from splitio.tasks.events_sync import EventsSyncTask
42+
from splitio.tasks.unique_keys_sync import UniqueKeysSyncTask, ClearFilterSyncTask
3643

3744
# Synchronizer
3845
from splitio.sync.synchronizer import SplitTasks, SplitSynchronizers, Synchronizer, \
39-
LocalhostSynchronizer
40-
from splitio.sync.manager import Manager
46+
LocalhostSynchronizer, RedisSynchronizer
47+
from splitio.sync.manager import Manager, RedisManager
4148
from splitio.sync.split import SplitSynchronizer, LocalSplitSynchronizer
4249
from splitio.sync.segment import SegmentSynchronizer
4350
from splitio.sync.impression import ImpressionSynchronizer, ImpressionsCountSynchronizer
4451
from splitio.sync.event import EventSynchronizer
52+
from splitio.sync.unique_keys import UniqueKeysSynchronizer, ClearFilterSynchronizer
53+
4554

4655
# Recorder
4756
from splitio.recorder.recorder import StandardRecorder, PipelinedRecorder
@@ -207,6 +216,7 @@ def destroy(self, destroyed_event=None):
207216
return
208217

209218
try:
219+
_LOGGER.info('Factory destroy called, stopping tasks.')
210220
if self._sync_manager is not None:
211221
if destroyed_event is not None:
212222

@@ -283,7 +293,7 @@ def _wrap_impression_listener(listener, metadata):
283293

284294

285295
def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pylint:disable=too-many-arguments,too-many-locals
286-
auth_api_base_url=None, streaming_api_base_url=None):
296+
auth_api_base_url=None, streaming_api_base_url=None, telemetry_api_base_url=None):
287297
"""Build and return a split factory tailored to the supplied config."""
288298
if not input_validator.validate_factory_instantiation(api_key):
289299
return None
@@ -292,6 +302,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
292302
sdk_url=sdk_url,
293303
events_url=events_url,
294304
auth_url=auth_api_base_url,
305+
telemetry_url=telemetry_api_base_url,
295306
timeout=cfg.get('connectionTimeout')
296307
)
297308

@@ -302,6 +313,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
302313
'segments': SegmentsAPI(http_client, api_key, sdk_metadata),
303314
'impressions': ImpressionsAPI(http_client, api_key, sdk_metadata, cfg['impressionsMode']),
304315
'events': EventsAPI(http_client, api_key, sdk_metadata),
316+
'telemetry': TelemetryAPI(http_client, api_key, sdk_metadata),
305317
}
306318

307319
if not input_validator.validate_apikey_type(apis['segments']):
@@ -314,18 +326,23 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
314326
'events': InMemoryEventStorage(cfg['eventsQueueSize']),
315327
}
316328

329+
unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \
330+
clear_filter_task, impressions_count_sync, impressions_count_task, \
331+
imp_strategy = set_classes('MEMORY', cfg['impressionsMode'], apis)
332+
317333
imp_manager = ImpressionsManager(
318-
cfg['impressionsMode'],
319-
True,
320-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata))
334+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
335+
imp_strategy)
321336

322337
synchronizers = SplitSynchronizers(
323338
SplitSynchronizer(apis['splits'], storages['splits']),
324339
SegmentSynchronizer(apis['segments'], storages['splits'], storages['segments']),
325340
ImpressionSynchronizer(apis['impressions'], storages['impressions'],
326341
cfg['impressionsBulkSize']),
327342
EventSynchronizer(apis['events'], storages['events'], cfg['eventsBulkSize']),
328-
ImpressionsCountSynchronizer(apis['impressions'], imp_manager),
343+
impressions_count_sync,
344+
unique_keys_synchronizer,
345+
clear_filter_sync
329346
)
330347

331348
tasks = SplitTasks(
@@ -342,7 +359,9 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
342359
cfg['impressionsRefreshRate'],
343360
),
344361
EventsSyncTask(synchronizers.events_sync.synchronize_events, cfg['eventsPushRate']),
345-
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters)
362+
impressions_count_task,
363+
unique_keys_task,
364+
clear_filter_task
346365
)
347366

348367
synchronizer = Synchronizer(synchronizers, tasks)
@@ -393,19 +412,47 @@ def _build_redis_factory(api_key, cfg):
393412
_LOGGER.warning("dataSampling cannot be less than %.2f, defaulting to minimum",
394413
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED)
395414
data_sampling = _MIN_DEFAULT_DATA_SAMPLING_ALLOWED
415+
416+
unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \
417+
clear_filter_task, impressions_count_sync, impressions_count_task, \
418+
imp_strategy = set_classes('REDIS', cfg['impressionsMode'], redis_adapter)
419+
420+
imp_manager = ImpressionsManager(
421+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
422+
imp_strategy)
423+
424+
synchronizers = SplitSynchronizers(None, None, None, None,
425+
impressions_count_sync,
426+
unique_keys_synchronizer,
427+
clear_filter_sync
428+
)
429+
430+
tasks = SplitTasks(None, None, None, None,
431+
impressions_count_task,
432+
unique_keys_task,
433+
clear_filter_task
434+
)
435+
436+
synchronizer = RedisSynchronizer(synchronizers, tasks)
396437
recorder = PipelinedRecorder(
397438
redis_adapter.pipeline,
398-
ImpressionsManager(cfg['impressionsMode'], False,
399-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata)),
439+
imp_manager,
400440
storages['events'],
401441
storages['impressions'],
402442
data_sampling,
403443
)
444+
445+
manager = RedisManager(synchronizer)
446+
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer")
447+
initialization_thread.setDaemon(True)
448+
initialization_thread.start()
449+
404450
return SplitFactory(
405451
api_key,
406452
storages,
407453
cfg['labelsEnabled'],
408454
recorder,
455+
manager,
409456
)
410457

411458

@@ -436,7 +483,7 @@ def _build_localhost_factory(cfg):
436483
manager = Manager(ready_event, synchronizer, None, False, sdk_metadata)
437484
manager.start()
438485
recorder = StandardRecorder(
439-
ImpressionsManager(cfg['impressionsMode'], True, None),
486+
ImpressionsManager(None, StrategyDebugMode()),
440487
storages['events'],
441488
storages['impressions'],
442489
)
@@ -485,7 +532,8 @@ def get_factory(api_key, **kwargs):
485532
kwargs.get('sdk_api_base_url'),
486533
kwargs.get('events_api_base_url'),
487534
kwargs.get('auth_api_base_url'),
488-
kwargs.get('streaming_api_base_url')
535+
kwargs.get('streaming_api_base_url'),
536+
kwargs.get('telemetry_api_base_url')
489537
)
490538
finally:
491539
_INSTANTIATED_FACTORIES.update([api_key])

splitio/engine/filters.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import abc
2+
import threading
3+
4+
from bloom_filter2 import BloomFilter as BloomFilter2
5+
6+
class BaseFilter(object, metaclass=abc.ABCMeta):
7+
"""Impressions Filter interface."""
8+
9+
@abc.abstractmethod
10+
def add(self, data):
11+
"""
12+
Return a boolean flag
13+
14+
"""
15+
pass
16+
17+
@abc.abstractmethod
18+
def contains(self, data):
19+
"""
20+
Return a boolean flag
21+
22+
"""
23+
pass
24+
25+
@abc.abstractmethod
26+
def clear(self):
27+
"""
28+
No return
29+
30+
"""
31+
pass
32+
33+
class BloomFilter(BaseFilter):
34+
"""Optimized mode strategy."""
35+
36+
def __init__(self, max_elements=5000, error_rate=0.01):
37+
"""
38+
Construct a bloom filter instance.
39+
40+
:param max_element: maximum elements in the filter
41+
:type string:
42+
43+
:param error_rate: error rate for the false positives, reduce it will consume more memory
44+
:type numeric:
45+
"""
46+
self._max_elements = max_elements
47+
self._error_rate = error_rate
48+
self._imps_bloom_filter = BloomFilter2(max_elements=self._max_elements, error_rate=self._error_rate)
49+
self._lock = threading.RLock()
50+
51+
def add(self, data):
52+
"""
53+
Add an item to the bloom filter instance.
54+
55+
:param data: element to be added
56+
:type string:
57+
58+
:return: True if successful
59+
:rtype: boolean
60+
"""
61+
with self._lock:
62+
self._imps_bloom_filter.add(data)
63+
return data in self._imps_bloom_filter
64+
65+
def contains(self, data):
66+
"""
67+
Check if an item exist in the bloom filter instance.
68+
69+
:param data: element to be checked
70+
:type string:
71+
72+
:return: True if exist
73+
:rtype: boolean
74+
"""
75+
with self._lock:
76+
return data in self._imps_bloom_filter
77+
78+
def clear(self):
79+
"""
80+
Destroy the current filter instance and create new one.
81+
82+
"""
83+
with self._lock:
84+
self._imps_bloom_filter.close()
85+
self._imps_bloom_filter = BloomFilter2(max_elements=self._max_elements, error_rate=self._error_rate)

0 commit comments

Comments
 (0)