Skip to content

Commit f9d50ae

Browse files
committed
Created impressions sender, integrated with unique_keys_tracker, added synchronizer and task classes, added telemetry api endpoint.
1 parent f6f0037 commit f9d50ae

File tree

8 files changed

+382
-41
lines changed

8 files changed

+382
-41
lines changed

splitio/api/telemetry.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
"""Impressions API module."""
2+
3+
import logging
4+
5+
from splitio.api import APIException
6+
from splitio.api.client import HttpClientException
7+
from splitio.api.commons import headers_from_metadata
8+
9+
_LOGGER = logging.getLogger(__name__)
10+
11+
12+
class TelemetryAPI(object): # pylint: disable=too-few-public-methods
13+
"""Class that uses an httpClient to communicate with the Telemetry API."""
14+
15+
def __init__(self, client, apikey, sdk_metadata):
16+
"""
17+
Class constructor.
18+
19+
:param client: HTTP Client responsble for issuing calls to the backend.
20+
:type client: HttpClient
21+
:param apikey: User apikey token.
22+
:type apikey: string
23+
"""
24+
self._client = client
25+
self._apikey = apikey
26+
self._metadata = headers_from_metadata(sdk_metadata)
27+
28+
def record_unique_keys(self, uniques):
29+
"""
30+
Send unique keys to the backend.
31+
32+
:param uniques: Unique Keys
33+
:type json
34+
"""
35+
try:
36+
response = self._client.post(
37+
'keys',
38+
'/ss',
39+
self._apikey,
40+
body=uniques,
41+
extra_headers=self._metadata
42+
)
43+
if not 200 <= response.status_code < 300:
44+
raise APIException(response.body, response.status_code)
45+
except HttpClientException as exc:
46+
_LOGGER.error(
47+
'Error posting unique keys because an exception was raised by the HTTPClient'
48+
)
49+
_LOGGER.debug('Error: ', exc_info=True)
50+
raise APIException('Unique keys not flushed properly.') from exc

splitio/client/factory.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@
2727
from splitio.api.impressions import ImpressionsAPI
2828
from splitio.api.events import EventsAPI
2929
from splitio.api.auth import AuthAPI
30+
from splitio.api.telemetry import TelemetryAPI
3031

3132
# Tasks
3233
from splitio.tasks.split_sync import SplitSynchronizationTask
3334
from splitio.tasks.segment_sync import SegmentSynchronizationTask
3435
from splitio.tasks.impressions_sync import ImpressionsSyncTask, ImpressionsCountSyncTask
3536
from splitio.tasks.events_sync import EventsSyncTask
37+
from splitio.tasks.unique_keys_sync import UniqueKeysSyncTask, ClearFilterSyncTask
3638

3739
# Synchronizer
3840
from splitio.sync.synchronizer import SplitTasks, SplitSynchronizers, Synchronizer, \
@@ -42,6 +44,8 @@
4244
from splitio.sync.segment import SegmentSynchronizer
4345
from splitio.sync.impression import ImpressionSynchronizer, ImpressionsCountSynchronizer
4446
from splitio.sync.event import EventSynchronizer
47+
from splitio.sync.unique_keys import UniqueKeysSynchronizer, ClearFilterSynchronizer
48+
4549

4650
# Recorder
4751
from splitio.recorder.recorder import StandardRecorder, PipelinedRecorder
@@ -283,7 +287,7 @@ def _wrap_impression_listener(listener, metadata):
283287

284288

285289
def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pylint:disable=too-many-arguments,too-many-locals
286-
auth_api_base_url=None, streaming_api_base_url=None):
290+
auth_api_base_url=None, streaming_api_base_url=None, telemetry_api_base_url=None):
287291
"""Build and return a split factory tailored to the supplied config."""
288292
if not input_validator.validate_factory_instantiation(api_key):
289293
return None
@@ -292,6 +296,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
292296
sdk_url=sdk_url,
293297
events_url=events_url,
294298
auth_url=auth_api_base_url,
299+
telemetry_url=telemetry_api_base_url,
295300
timeout=cfg.get('connectionTimeout')
296301
)
297302

@@ -302,6 +307,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
302307
'segments': SegmentsAPI(http_client, api_key, sdk_metadata),
303308
'impressions': ImpressionsAPI(http_client, api_key, sdk_metadata, cfg['impressionsMode']),
304309
'events': EventsAPI(http_client, api_key, sdk_metadata),
310+
'telemtery': TelemetryAPI(http_client, api_key, sdk_metadata),
305311
}
306312

307313
if not input_validator.validate_apikey_type(apis['segments']):
@@ -326,6 +332,8 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
326332
cfg['impressionsBulkSize']),
327333
EventSynchronizer(apis['events'], storages['events'], cfg['eventsBulkSize']),
328334
ImpressionsCountSynchronizer(apis['impressions'], imp_manager),
335+
UniqueKeysSynchronizer(), # TODO: Pass the UniqueKeysTracker instance fetched from Strategy instance created above.
336+
ClearFilterSynchronizer(), # TODO: Pass the UniqueKeysTracker instance fetched from Strategy instance created above.
329337
)
330338

331339
tasks = SplitTasks(
@@ -342,7 +350,9 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
342350
cfg['impressionsRefreshRate'],
343351
),
344352
EventsSyncTask(synchronizers.events_sync.synchronize_events, cfg['eventsPushRate']),
345-
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters)
353+
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters),
354+
UniqueKeysSyncTask(synchronizers.unique_keys_sync.SendAll),
355+
ClearFilterSyncTask(synchronizers.clear_filter_sync.clearAll)
346356
)
347357

348358
synchronizer = Synchronizer(synchronizers, tasks)
@@ -355,6 +365,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
355365

356366
storages['events'].set_queue_full_hook(tasks.events_task.flush)
357367
storages['impressions'].set_queue_full_hook(tasks.impressions_task.flush)
368+
# TODO: Add unique_keys_tracker.set_queue_full_hook(tasks.unique_keys.flush)
358369

359370
recorder = StandardRecorder(
360371
imp_manager,
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import abc
2+
3+
class ImpressionsSenderAdapter(object, metaclass=abc.ABCMeta):
4+
"""Impressions Sender Adapter interface."""
5+
6+
@abc.abstractmethod
7+
def record_unique_keys(self, data):
8+
"""
9+
No Return value
10+
11+
"""
12+
pass
13+
14+
@abc.abstractmethod
15+
def record_impressions_count(self, data):
16+
"""
17+
No Return value
18+
19+
"""
20+
pass
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import json
2+
3+
from splitio.engine.sender_adapters import ImpressionsSenderAdapter
4+
5+
class InMemorySenderAdapter(ImpressionsSenderAdapter):
6+
"""In Memory Impressions Sender Adapter class."""
7+
8+
def __init__(self, telemtry_http_client):
9+
"""
10+
Initialize In memory sender adapter instance
11+
12+
:param telemtry_http_client: instance of telemetry http api
13+
:type telemtry_http_client: splitio.api.telemetry.TelemetryAPI
14+
"""
15+
self._telemtry_http_client = telemtry_http_client
16+
17+
def record_unique_keys(self, uniques):
18+
"""
19+
post the unique keys to split back end.
20+
21+
:param uniques: unique keys disctionary
22+
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
23+
"""
24+
self._telemtry_http_client.record_unique_keys(self._uniques_formatter(uniques))
25+
26+
def _uniques_formatter(self, uniques):
27+
"""
28+
Format the unique keys dictionary to a JSON body
29+
30+
:param uniques: unique keys disctionary
31+
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
32+
33+
:return: unique keys JSON
34+
:rtype: json
35+
"""
36+
formatted_uniques = json.load('{keys: []}')
37+
if len(uniques) == 0:
38+
return formatted_uniques
39+
for key in uniques:
40+
formatted_uniques['keys'].append('{"f":"' + key +'", "ks:['+ json.dump(uniques[key])+']}')
41+
return formatted_uniques

splitio/engine/unique_keys_tracker.py

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import threading
33
import logging
44
from splitio.engine.filters.bloom_filter import BloomFilter
5+
from splitio.engine.sender_adapters.in_memory_sender_adapter import InMemorySenderAdapter
56

67
_LOGGER = logging.getLogger(__name__)
78

@@ -16,38 +17,33 @@ def track(self, key, feature_name):
1617
"""
1718
pass
1819

19-
@abc.abstractmethod
20-
def start(self):
21-
"""
22-
No return value
20+
class UniqueKeysTracker(BaseUniqueKeysTracker):
21+
"""Unique Keys Tracker class."""
2322

23+
def __init__(self, cache_size=30000):
2424
"""
25-
pass
25+
Initialize unique keys tracker instance
2626
27-
@abc.abstractmethod
28-
def stop(self):
27+
:param cache_size: The size of the unique keys dictionary
28+
:type key: int
2929
"""
30-
No return value
31-
32-
"""
33-
pass
34-
35-
class UniqueKeysTracker(BaseUniqueKeysTracker):
36-
"""Unique Keys Tracker class."""
37-
38-
def __init__(self, cache_size=30000, max_bulk_size=5000, task_refresh_rate = 24):
3930
self._cache_size = cache_size
40-
self._max_bulk_size = max_bulk_size
41-
self._task_refresh_rate = task_refresh_rate
4231
self._filter = BloomFilter(cache_size)
4332
self._lock = threading.RLock()
4433
self._cache = {}
45-
# TODO: initialize impressions sender adapter and task referesh rate in next PR
34+
self._queue_full_hook = None
4635

4736
def track(self, key, feature_name):
4837
"""
4938
Return a boolean flag
5039
40+
:param key: key to be added to MTK list
41+
:type key: int
42+
:param feature_name: split name associated with the key
43+
:type feature_name: str
44+
45+
:return: True if successful
46+
:rtype: boolean
5147
"""
5248
if self._filter.contains(feature_name+key):
5349
return False
@@ -56,34 +52,44 @@ def track(self, key, feature_name):
5652
self._add_or_update(feature_name, key)
5753
self._filter.add(feature_name+key)
5854

59-
if len(self._cache[feature_name]) == self._cache_size:
60-
_LOGGER.warn("MTK Cache size for Split [%s] has reach maximum unique keys [%d], flushing data now.", feature_name, self._cache_size)
61-
# TODO: Flush the data and reset split cache in next PR
62-
if self._get_dict_size() >= self._max_bulk_size:
63-
_LOGGER.info("Bulk MTK cache size has reach maximum, flushing data now.")
64-
# TODO: Flush the data and reset split cache in next PR
65-
55+
if self._get_dict_size() > self._cache_size:
56+
if self._queue_full_hook is not None and callable(self._queue_full_hook):
57+
self._queue_full_hook()
58+
_LOGGER.info(
59+
'Unique Keys queue is full, flushing the current queue now.'
60+
)
6661
return True
6762

6863
def _get_dict_size(self):
64+
"""
65+
Return the size of unique keys dictionary (number of keys in all features)
66+
67+
:return: dictionary set() items count
68+
:rtype: int
69+
"""
6970
total_size = 0
70-
for key in self._cache:
71-
total_size = total_size + len(self._cache[key])
71+
for key in self._uniqe_keys_tracker._cache:
72+
total_size = total_size + len(self._uniqe_keys_tracker._cache[key])
7273
return total_size
7374

7475
def _add_or_update(self, feature_name, key):
75-
if feature_name not in self._cache:
76-
self._cache[feature_name] = set()
77-
self._cache[feature_name].add(key)
78-
79-
def start(self):
8076
"""
81-
TODO: Add start posting impressions job in next PR
77+
Add the feature_name+key to both bloom filter and dictionary.
8278
79+
:param feature_name: split name associated with the key
80+
:type feature_name: str
81+
:param key: key to be added to MTK list
82+
:type key: int
8383
"""
84+
if feature_name not in self._cache:
85+
self._cache[feature_name] = set()
86+
self._cache[feature_name].add(key)
8487

85-
def stop(self):
88+
def set_queue_full_hook(self, hook):
8689
"""
87-
TODO: Add stop posting impressions job in next PR
90+
Set a hook to be called when the queue is full.
8891
92+
:param h: Hook to be called when the queue is full
8993
"""
94+
if callable(hook):
95+
self._queue_full_hook = hook

0 commit comments

Comments
 (0)