Skip to content

Commit e2e4dc8

Browse files
authored
Merge pull request #266 from splitio/UniqueKeysTracker
Unique keys tracker
2 parents 8bdc402 + f6f0037 commit e2e4dc8

File tree

2 files changed

+140
-0
lines changed

2 files changed

+140
-0
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import abc
2+
import threading
3+
import logging
4+
from splitio.engine.filters.bloom_filter import BloomFilter
5+
6+
_LOGGER = logging.getLogger(__name__)
7+
8+
class BaseUniqueKeysTracker(object, metaclass=abc.ABCMeta):
9+
"""Unique Keys Tracker interface."""
10+
11+
@abc.abstractmethod
12+
def track(self, key, feature_name):
13+
"""
14+
Return a boolean flag
15+
16+
"""
17+
pass
18+
19+
@abc.abstractmethod
20+
def start(self):
21+
"""
22+
No return value
23+
24+
"""
25+
pass
26+
27+
@abc.abstractmethod
28+
def stop(self):
29+
"""
30+
No return value
31+
32+
"""
33+
pass
34+
35+
class UniqueKeysTracker(BaseUniqueKeysTracker):
36+
"""Unique Keys Tracker class."""
37+
38+
def __init__(self, cache_size=30000, max_bulk_size=5000, task_refresh_rate = 24):
39+
self._cache_size = cache_size
40+
self._max_bulk_size = max_bulk_size
41+
self._task_refresh_rate = task_refresh_rate
42+
self._filter = BloomFilter(cache_size)
43+
self._lock = threading.RLock()
44+
self._cache = {}
45+
# TODO: initialize impressions sender adapter and task referesh rate in next PR
46+
47+
def track(self, key, feature_name):
48+
"""
49+
Return a boolean flag
50+
51+
"""
52+
if self._filter.contains(feature_name+key):
53+
return False
54+
55+
with self._lock:
56+
self._add_or_update(feature_name, key)
57+
self._filter.add(feature_name+key)
58+
59+
if len(self._cache[feature_name]) == self._cache_size:
60+
_LOGGER.warn("MTK Cache size for Split [%s] has reach maximum unique keys [%d], flushing data now.", feature_name, self._cache_size)
61+
# TODO: Flush the data and reset split cache in next PR
62+
if self._get_dict_size() >= self._max_bulk_size:
63+
_LOGGER.info("Bulk MTK cache size has reach maximum, flushing data now.")
64+
# TODO: Flush the data and reset split cache in next PR
65+
66+
return True
67+
68+
def _get_dict_size(self):
69+
total_size = 0
70+
for key in self._cache:
71+
total_size = total_size + len(self._cache[key])
72+
return total_size
73+
74+
def _add_or_update(self, feature_name, key):
75+
if feature_name not in self._cache:
76+
self._cache[feature_name] = set()
77+
self._cache[feature_name].add(key)
78+
79+
def start(self):
80+
"""
81+
TODO: Add start posting impressions job in next PR
82+
83+
"""
84+
85+
def stop(self):
86+
"""
87+
TODO: Add stop posting impressions job in next PR
88+
89+
"""
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
"""BloomFilter unit tests."""
2+
3+
import threading
4+
from splitio.engine.unique_keys_tracker import UniqueKeysTracker
5+
from splitio.engine.filters.bloom_filter import BloomFilter
6+
7+
class UniqueKeysTrackerTests(object):
8+
"""StandardRecorderTests test cases."""
9+
10+
def test_adding_and_removing_keys(self, mocker):
11+
tracker = UniqueKeysTracker()
12+
13+
assert(tracker._cache_size > 0)
14+
assert(tracker._max_bulk_size > 0)
15+
assert(tracker._task_refresh_rate > 0)
16+
assert(isinstance(tracker._filter, BloomFilter))
17+
18+
key1 = 'key1'
19+
key2 = 'key2'
20+
key3 = 'key3'
21+
split1= 'feature1'
22+
split2= 'feature2'
23+
24+
assert(tracker.track(key1, split1))
25+
assert(tracker.track(key3, split1))
26+
assert(not tracker.track(key1, split1))
27+
assert(tracker.track(key2, split2))
28+
29+
assert(tracker._filter.contains(split1+key1))
30+
assert(not tracker._filter.contains(split1+key2))
31+
assert(tracker._filter.contains(split2+key2))
32+
assert(not tracker._filter.contains(split2+key1))
33+
assert(key1 in tracker._cache[split1])
34+
assert(key3 in tracker._cache[split1])
35+
assert(key2 in tracker._cache[split2])
36+
assert(not key3 in tracker._cache[split2])
37+
38+
def test_cache_size(self, mocker):
39+
cache_size = 10
40+
tracker = UniqueKeysTracker(cache_size)
41+
42+
split1= 'feature1'
43+
for x in range(1, cache_size + 1):
44+
tracker.track('key' + str(x), split1)
45+
split2= 'feature2'
46+
for x in range(1, int(cache_size / 2) + 1):
47+
tracker.track('key' + str(x), split2)
48+
49+
assert(tracker._get_dict_size() == (cache_size + (cache_size / 2)))
50+
assert(len(tracker._cache[split1]) == cache_size)
51+
assert(len(tracker._cache[split2]) == cache_size / 2)

0 commit comments

Comments
 (0)