Skip to content

Commit edde08d

Browse files
authored
Merge pull request #267 from splitio/ImpressionsSender
Impressions sender
2 parents e2e4dc8 + 0903bd9 commit edde08d

File tree

16 files changed

+381
-59
lines changed

16 files changed

+381
-59
lines changed

splitio/api/client.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ class HttpClient(object):
2525
SDK_URL = 'https://sdk.split.io/api'
2626
EVENTS_URL = 'https://events.split.io/api'
2727
AUTH_URL = 'https://auth.split.io/api'
28+
TELEMETRY_URL = 'https://telemetry.split.io/api'
2829

29-
def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None):
30+
def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None, telemetry_url=None):
3031
"""
3132
Class constructor.
3233
@@ -38,12 +39,15 @@ def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None):
3839
:type events_url: str
3940
:param auth_url: Optional alternative auth URL.
4041
:type auth_url: str
42+
:param telemetry_url: Optional alternative telemetry URL.
43+
:type telemetry_url: str
4144
"""
4245
self._timeout = timeout/1000 if timeout else None # Convert ms to seconds.
4346
self._urls = {
4447
'sdk': sdk_url if sdk_url is not None else self.SDK_URL,
4548
'events': events_url if events_url is not None else self.EVENTS_URL,
4649
'auth': auth_url if auth_url is not None else self.AUTH_URL,
50+
'telemetry': telemetry_url if telemetry_url is not None else self.TELEMETRY_URL,
4751
}
4852

4953
def _build_url(self, server, path):

splitio/api/telemetry.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
"""Impressions API module."""
2+
3+
import logging
4+
5+
from splitio.api import APIException
6+
from splitio.api.client import HttpClientException
7+
from splitio.api.commons import headers_from_metadata
8+
9+
_LOGGER = logging.getLogger(__name__)
10+
11+
12+
class TelemetryAPI(object): # pylint: disable=too-few-public-methods
13+
"""Class that uses an httpClient to communicate with the Telemetry API."""
14+
15+
def __init__(self, client, apikey, sdk_metadata):
16+
"""
17+
Class constructor.
18+
19+
:param client: HTTP Client responsble for issuing calls to the backend.
20+
:type client: HttpClient
21+
:param apikey: User apikey token.
22+
:type apikey: string
23+
"""
24+
self._client = client
25+
self._apikey = apikey
26+
self._metadata = headers_from_metadata(sdk_metadata)
27+
28+
def record_unique_keys(self, uniques):
29+
"""
30+
Send unique keys to the backend.
31+
32+
:param uniques: Unique Keys
33+
:type json
34+
"""
35+
try:
36+
response = self._client.post(
37+
'keys',
38+
'/ss',
39+
self._apikey,
40+
body=uniques,
41+
extra_headers=self._metadata
42+
)
43+
if not 200 <= response.status_code < 300:
44+
raise APIException(response.body, response.status_code)
45+
except HttpClientException as exc:
46+
_LOGGER.error(
47+
'Error posting unique keys because an exception was raised by the HTTPClient'
48+
)
49+
_LOGGER.debug('Error: ', exc_info=True)
50+
raise APIException('Unique keys not flushed properly.') from exc

splitio/client/factory.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from splitio.engine.strategies import Counter as ImpressionsCounter
1717
from splitio.engine.strategies.strategy_debug_mode import StrategyDebugMode
1818
from splitio.engine.strategies.strategy_optimized_mode import StrategyOptimizedMode
19+
from splitio.engine.sender_adapters.in_memory_sender_adapter import InMemorySenderAdapter
1920

2021
# Storage
2122
from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \
@@ -31,12 +32,14 @@
3132
from splitio.api.impressions import ImpressionsAPI
3233
from splitio.api.events import EventsAPI
3334
from splitio.api.auth import AuthAPI
35+
from splitio.api.telemetry import TelemetryAPI
3436

3537
# Tasks
3638
from splitio.tasks.split_sync import SplitSynchronizationTask
3739
from splitio.tasks.segment_sync import SegmentSynchronizationTask
3840
from splitio.tasks.impressions_sync import ImpressionsSyncTask, ImpressionsCountSyncTask
3941
from splitio.tasks.events_sync import EventsSyncTask
42+
from splitio.tasks.unique_keys_sync import UniqueKeysSyncTask, ClearFilterSyncTask
4043

4144
# Synchronizer
4245
from splitio.sync.synchronizer import SplitTasks, SplitSynchronizers, Synchronizer, \
@@ -46,6 +49,8 @@
4649
from splitio.sync.segment import SegmentSynchronizer
4750
from splitio.sync.impression import ImpressionSynchronizer, ImpressionsCountSynchronizer
4851
from splitio.sync.event import EventSynchronizer
52+
from splitio.sync.unique_keys import UniqueKeysSynchronizer, ClearFilterSynchronizer
53+
4954

5055
# Recorder
5156
from splitio.recorder.recorder import StandardRecorder, PipelinedRecorder
@@ -287,7 +292,7 @@ def _wrap_impression_listener(listener, metadata):
287292

288293

289294
def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pylint:disable=too-many-arguments,too-many-locals
290-
auth_api_base_url=None, streaming_api_base_url=None):
295+
auth_api_base_url=None, streaming_api_base_url=None, telemetry_api_base_url=None):
291296
"""Build and return a split factory tailored to the supplied config."""
292297
if not input_validator.validate_factory_instantiation(api_key):
293298
return None
@@ -296,6 +301,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
296301
sdk_url=sdk_url,
297302
events_url=events_url,
298303
auth_url=auth_api_base_url,
304+
telemetry_url=telemetry_api_base_url,
299305
timeout=cfg.get('connectionTimeout')
300306
)
301307

@@ -306,6 +312,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
306312
'segments': SegmentsAPI(http_client, api_key, sdk_metadata),
307313
'impressions': ImpressionsAPI(http_client, api_key, sdk_metadata, cfg['impressionsMode']),
308314
'events': EventsAPI(http_client, api_key, sdk_metadata),
315+
'telemetry': TelemetryAPI(http_client, api_key, sdk_metadata),
309316
}
310317

311318
if not input_validator.validate_apikey_type(apis['segments']):
@@ -317,6 +324,13 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
317324
'impressions': InMemoryImpressionStorage(cfg['impressionsQueueSize']),
318325
'events': InMemoryEventStorage(cfg['eventsQueueSize']),
319326
}
327+
imp_counter = ImpressionsCounter() if cfg['impressionsMode'] != ImpressionsMode.DEBUG else None
328+
329+
strategies = {
330+
ImpressionsMode.OPTIMIZED : StrategyOptimizedMode(imp_counter),
331+
ImpressionsMode.DEBUG : StrategyDebugMode(),
332+
}
333+
imp_strategy = strategies[cfg['impressionsMode']]
320334

321335
imp_counter = ImpressionsCounter() if cfg['impressionsMode'] != ImpressionsMode.DEBUG else None
322336

@@ -354,7 +368,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
354368
cfg['impressionsRefreshRate'],
355369
),
356370
EventsSyncTask(synchronizers.events_sync.synchronize_events, cfg['eventsPushRate']),
357-
imp_count_sync_task
371+
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters),
358372
)
359373

360374
synchronizer = Synchronizer(synchronizers, tasks)
@@ -452,7 +466,7 @@ def _build_localhost_factory(cfg):
452466
manager.start()
453467

454468
recorder = StandardRecorder(
455-
ImpressionsManager(cfg['impressionsMode'], StrategyDebugMode()),
469+
ImpressionsManager(None, StrategyDebugMode()),
456470
storages['events'],
457471
storages['impressions'],
458472
)
@@ -501,7 +515,8 @@ def get_factory(api_key, **kwargs):
501515
kwargs.get('sdk_api_base_url'),
502516
kwargs.get('events_api_base_url'),
503517
kwargs.get('auth_api_base_url'),
504-
kwargs.get('streaming_api_base_url')
518+
kwargs.get('streaming_api_base_url'),
519+
kwargs.get('telemetry_api_base_url')
505520
)
506521
finally:
507522
_INSTANTIATED_FACTORIES.update([api_key])

splitio/engine/impressions.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ class ImpressionsMode(Enum):
88

99
OPTIMIZED = "OPTIMIZED"
1010
DEBUG = "DEBUG"
11+
NONE = "NONE"
1112

1213
class Manager(object): # pylint:disable=too-few-public-methods
1314
"""Impression manager."""
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import abc
2+
3+
class ImpressionsSenderAdapter(object, metaclass=abc.ABCMeta):
4+
"""Impressions Sender Adapter interface."""
5+
6+
@abc.abstractmethod
7+
def record_unique_keys(self, data):
8+
"""
9+
No Return value
10+
11+
"""
12+
pass
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import json
2+
3+
from splitio.engine.sender_adapters import ImpressionsSenderAdapter
4+
5+
class InMemorySenderAdapter(ImpressionsSenderAdapter):
6+
"""In Memory Impressions Sender Adapter class."""
7+
8+
def __init__(self, telemtry_http_client):
9+
"""
10+
Initialize In memory sender adapter instance
11+
12+
:param telemtry_http_client: instance of telemetry http api
13+
:type telemtry_http_client: splitio.api.telemetry.TelemetryAPI
14+
"""
15+
self._telemtry_http_client = telemtry_http_client
16+
17+
def record_unique_keys(self, uniques):
18+
"""
19+
post the unique keys to split back end.
20+
21+
:param uniques: unique keys disctionary
22+
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
23+
"""
24+
self._telemtry_http_client.record_unique_keys(self._uniques_formatter(uniques))
25+
26+
def _uniques_formatter(self, uniques):
27+
"""
28+
Format the unique keys dictionary to a JSON body
29+
30+
:param uniques: unique keys disctionary
31+
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
32+
33+
:return: unique keys JSON
34+
:rtype: json
35+
"""
36+
formatted_uniques = json.loads('{"keys": []}')
37+
if len(uniques) == 0:
38+
return json.loads('{"keys": []}')
39+
40+
return {
41+
'keys': [{'f': feature, 'ks': list(keys)} for feature, keys in uniques.items()]
42+
}

splitio/engine/strategies/strategy_optimized_mode.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
class StrategyOptimizedMode(BaseStrategy):
88
"""Optimized mode strategy."""
99

10-
def __init__(self, counter=None):
10+
def __init__(self, counter):
1111
"""
1212
Construct a strategy instance for optimized mode.
1313

splitio/engine/unique_keys_tracker.py

Lines changed: 52 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,74 +16,87 @@ def track(self, key, feature_name):
1616
"""
1717
pass
1818

19-
@abc.abstractmethod
20-
def start(self):
21-
"""
22-
No return value
19+
class UniqueKeysTracker(BaseUniqueKeysTracker):
20+
"""Unique Keys Tracker class."""
2321

22+
def __init__(self, cache_size=30000):
2423
"""
25-
pass
24+
Initialize unique keys tracker instance
2625
27-
@abc.abstractmethod
28-
def stop(self):
26+
:param cache_size: The size of the unique keys dictionary
27+
:type key: int
2928
"""
30-
No return value
31-
32-
"""
33-
pass
34-
35-
class UniqueKeysTracker(BaseUniqueKeysTracker):
36-
"""Unique Keys Tracker class."""
37-
38-
def __init__(self, cache_size=30000, max_bulk_size=5000, task_refresh_rate = 24):
3929
self._cache_size = cache_size
40-
self._max_bulk_size = max_bulk_size
41-
self._task_refresh_rate = task_refresh_rate
4230
self._filter = BloomFilter(cache_size)
4331
self._lock = threading.RLock()
4432
self._cache = {}
45-
# TODO: initialize impressions sender adapter and task referesh rate in next PR
33+
self._queue_full_hook = None
34+
self._current_cache_size = 0
4635

4736
def track(self, key, feature_name):
4837
"""
4938
Return a boolean flag
5039
40+
:param key: key to be added to MTK list
41+
:type key: int
42+
:param feature_name: split name associated with the key
43+
:type feature_name: str
44+
45+
:return: True if successful
46+
:rtype: boolean
5147
"""
52-
if self._filter.contains(feature_name+key):
53-
return False
48+
with self._lock:
49+
if self._filter.contains(feature_name+key):
50+
return False
5451

5552
with self._lock:
5653
self._add_or_update(feature_name, key)
5754
self._filter.add(feature_name+key)
58-
59-
if len(self._cache[feature_name]) == self._cache_size:
60-
_LOGGER.warn("MTK Cache size for Split [%s] has reach maximum unique keys [%d], flushing data now.", feature_name, self._cache_size)
61-
# TODO: Flush the data and reset split cache in next PR
62-
if self._get_dict_size() >= self._max_bulk_size:
63-
_LOGGER.info("Bulk MTK cache size has reach maximum, flushing data now.")
64-
# TODO: Flush the data and reset split cache in next PR
65-
55+
self._current_cache_size = self._current_cache_size + 1
56+
57+
if self._current_cache_size > self._cache_size:
58+
_LOGGER.info(
59+
'Unique Keys queue is full, flushing the current queue now.'
60+
)
61+
if self._queue_full_hook is not None and callable(self._queue_full_hook):
62+
self._queue_full_hook()
6663
return True
6764

68-
def _get_dict_size(self):
69-
total_size = 0
70-
for key in self._cache:
71-
total_size = total_size + len(self._cache[key])
72-
return total_size
73-
7465
def _add_or_update(self, feature_name, key):
66+
"""
67+
Add the feature_name+key to both bloom filter and dictionary.
68+
69+
:param feature_name: split name associated with the key
70+
:type feature_name: str
71+
:param key: key to be added to MTK list
72+
:type key: int
73+
"""
7574
if feature_name not in self._cache:
7675
self._cache[feature_name] = set()
7776
self._cache[feature_name].add(key)
7877

79-
def start(self):
78+
def set_queue_full_hook(self, hook):
8079
"""
81-
TODO: Add start posting impressions job in next PR
80+
Set a hook to be called when the queue is full.
8281
82+
:param h: Hook to be called when the queue is full
8383
"""
84+
if callable(hook):
85+
self._queue_full_hook = hook
8486

85-
def stop(self):
87+
def filter_pop_all(self):
8688
"""
87-
TODO: Add stop posting impressions job in next PR
89+
Delete the filter items
8890
8991
"""
92+
with self._lock:
93+
self._filter.clear()
94+
95+
def get_cache_info_and_pop_all(self):
96+
with self._lock:
97+
temp_cach = self._cache.copy()
98+
temp_cache_size = self._current_cache_size
99+
self._cache = {}
100+
self._current_cache_size = 0
101+
102+
return temp_cach, temp_cache_size

splitio/sync/impression.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def __init__(self, impressions_api, impressions_counter):
8484

8585
def synchronize_counters(self):
8686
"""Send impressions from both the failed and new queues."""
87-
to_send = self._impressions_counter.pop_all()
87+
to_send = self._impressions_manager._strategy._counter.pop_all()
8888
if not to_send:
8989
return
9090

0 commit comments

Comments
 (0)