Skip to content

Commit 03f2d1f

Browse files
committed
minor fixes and tests
1 parent 367cacf commit 03f2d1f

File tree

10 files changed

+347
-75
lines changed

10 files changed

+347
-75
lines changed

splitio/api/telemetry.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def record_unique_keys(self, uniques):
3131
:param uniques: Unique Keys
3232
:type json
3333
"""
34+
_LOGGER.debug(uniques)
3435
try:
3536
response = self._client.post(
3637
'telemetry',

splitio/client/factory.py

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -347,32 +347,46 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
347347
ImpressionsCountSynchronizer(apis['impressions'], imp_manager),
348348
)
349349

350-
tasks = SplitTasks(
351-
SplitSynchronizationTask(
352-
synchronizers.split_sync.synchronize_splits,
353-
cfg['featuresRefreshRate'],
354-
),
355-
SegmentSynchronizationTask(
356-
synchronizers.segment_sync.synchronize_segments,
357-
cfg['segmentsRefreshRate'],
358-
),
359-
ImpressionsSyncTask(
360-
synchronizers.impressions_sync.synchronize_impressions,
361-
cfg['impressionsRefreshRate'],
362-
),
363-
EventsSyncTask(synchronizers.events_sync.synchronize_events, cfg['eventsPushRate']),
364-
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters),
365-
)
366-
367350
if cfg['impressionsMode'] == ImpressionsMode.NONE:
368-
synchronizers.set_none_syncs(
351+
synchronizers.set_none_sync(
369352
UniqueKeysSynchronizer(InMemorySenderAdapter(apis['telemetry']), imp_strategy._unique_keys_tracker),
370-
ClearFilterSynchronizer(imp_strategy._unique_keys_tracker),
353+
ClearFilterSynchronizer(imp_strategy._unique_keys_tracker)
371354
)
372-
tasks.set_none_tasks(
355+
tasks = SplitTasks(
356+
SplitSynchronizationTask(
357+
synchronizers.split_sync.synchronize_splits,
358+
cfg['featuresRefreshRate'],
359+
),
360+
SegmentSynchronizationTask(
361+
synchronizers.segment_sync.synchronize_segments,
362+
cfg['segmentsRefreshRate'],
363+
),
364+
ImpressionsSyncTask(
365+
synchronizers.impressions_sync.synchronize_impressions,
366+
cfg['impressionsRefreshRate'],
367+
),
368+
EventsSyncTask(synchronizers.events_sync.synchronize_events, cfg['eventsPushRate']),
369+
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters),
373370
UniqueKeysSyncTask(synchronizers.unique_keys_sync.SendAll),
374371
ClearFilterSyncTask(synchronizers.clear_filter_sync.clearAll)
375372
)
373+
else:
374+
tasks = SplitTasks(
375+
SplitSynchronizationTask(
376+
synchronizers.split_sync.synchronize_splits,
377+
cfg['featuresRefreshRate'],
378+
),
379+
SegmentSynchronizationTask(
380+
synchronizers.segment_sync.synchronize_segments,
381+
cfg['segmentsRefreshRate'],
382+
),
383+
ImpressionsSyncTask(
384+
synchronizers.impressions_sync.synchronize_impressions,
385+
cfg['impressionsRefreshRate'],
386+
),
387+
EventsSyncTask(synchronizers.events_sync.synchronize_events, cfg['eventsPushRate']),
388+
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters),
389+
)
376390

377391
synchronizer = Synchronizer(synchronizers, tasks)
378392

splitio/sync/synchronizer.py

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,7 @@ def __init__(self, split_sync, segment_sync, impressions_sync, events_sync, # p
3535
self._events_sync = events_sync
3636
self._impressions_count_sync = impressions_count_sync
3737

38-
def set_none_syncs(self, unique_keys_sync, clear_filter_sync):
39-
"""
40-
Set the NONE mode synchonizer objects.
41-
42-
:param unique_keys_sync: sync for unique_keys
43-
:type unique_keys_sync: splitio.sync.unique_keys.UniqueKeysSynchronizer
44-
:param clear_filter_sync: sync for clear_filter
45-
:type clear_filter_sync: splitio.sync.unique_keys.ClearFilterSynchronizer
46-
"""
38+
def set_none_sync(self, unique_keys_sync, clear_filter_sync):
4739
self._unique_keys_sync = unique_keys_sync
4840
self._clear_filter_sync = clear_filter_sync
4941

@@ -86,7 +78,7 @@ class SplitTasks(object):
8678
"""SplitTasks."""
8779

8880
def __init__(self, split_task, segment_task, impressions_task, events_task, # pylint:disable=too-many-arguments
89-
impressions_count_task):
81+
impressions_count_task, unique_keys_task = None, clear_filter_task = None):
9082
"""
9183
Class constructor.
9284
@@ -106,18 +98,6 @@ def __init__(self, split_task, segment_task, impressions_task, events_task, # p
10698
self._impressions_task = impressions_task
10799
self._events_task = events_task
108100
self._impressions_count_task = impressions_count_task
109-
self._unique_keys_task = None
110-
self._clear_filter_task = None
111-
112-
def set_none_tasks(self, unique_keys_task, clear_filter_task):
113-
"""
114-
Set the NONE mode synchonizer objects.
115-
116-
:param unique_keys_task: sync for unique_keys
117-
:type unique_keys_task: splitio.tasks.unique_keys_sync.UniqueKeysSyncTask
118-
:param clear_filter_task: sync for clear_filter
119-
:type clear_filter_task: splitio.tasks.unique_keys_sync.ClearFilterSyncTask
120-
"""
121101
self._unique_keys_task = unique_keys_task
122102
self._clear_filter_task = clear_filter_task
123103

splitio/sync/unique_keys.py

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
from splitio.engine.filters.bloom_filter import BloomFilter
22
from splitio.engine.sender_adapters.in_memory_sender_adapter import InMemorySenderAdapter
3+
import logging
34

45
_UNIQUE_KEYS_MAX_BULK_SIZE = 5000
5-
import threading
6-
import logging
7-
from splitio.engine.filters.bloom_filter import BloomFilter
8-
from splitio.engine.sender_adapters.in_memory_sender_adapter import InMemorySenderAdapter
96

107
_LOGGER = logging.getLogger(__name__)
118

@@ -19,6 +16,7 @@ def __init__(self, impressions_sender_adapter = None, uniqe_keys_tracker = None)
1916
:param uniqe_keys_tracker: instance of uniqe keys tracker
2017
:type uniqe_keys_tracker: splitio.engine.uniqur_key_tracker.UniqueKeysTracker
2118
"""
19+
_LOGGER.debug("UniqueKeysSynchronizer")
2220
self._uniqe_keys_tracker = uniqe_keys_tracker
2321
self._max_bulk_size = _UNIQUE_KEYS_MAX_BULK_SIZE
2422
self._impressions_sender_adapter = impressions_sender_adapter
@@ -37,17 +35,6 @@ def SendAll(self):
3735
self._impressions_sender_adapter.record_unique_keys(bulk)
3836

3937
def _split_cache_to_bulks(self, cache):
40-
cache_size = self._uniqe_keys_tracker._get_dict_size()
41-
if cache_size <= self._max_bulk_size:
42-
self._uniqe_keys_tracker._impressions_sender_adapter.record_unique_keys(self._uniqe_keys_tracker._cache)
43-
else:
44-
for bulk in self._split_cache_to_bulks():
45-
self._uniqe_keys_tracker._impressions_sender_adapter.record_unique_keys(bulk)
46-
47-
with self._lock:
48-
self._uniqe_keys_tracker._cache = {}
49-
50-
def _split_cache_to_bulks(self):
5138
"""
5239
Split the current unique keys dictionary into seperate dictionaries,
5340
each with the size of max_bulk_size. Overflow the last feature set() to new unique keys dictionary.
@@ -80,24 +67,24 @@ def _chunks(self, keys_list):
8067
"""
8168
Split array into chunks
8269
"""
83-
for i in range(0, len(keys_list), 5):
84-
yield keys_list[i:i + 5]
70+
for i in range(0, len(keys_list), self._max_bulk_size):
71+
yield keys_list[i:i + self._max_bulk_size]
8572

8673
class ClearFilterSynchronizer(object):
8774
"""Clear filter class."""
8875

89-
def __init__(self, uniqe_keys_tracker = None):
76+
def __init__(self, unique_keys_tracker = None):
9077
"""
9178
Initialize Unique keys synchronizer instance
9279
9380
:param uniqe_keys_tracker: instance of uniqe keys tracker
9481
:type uniqe_keys_tracker: splitio.engine.uniqur_key_tracker.UniqueKeysTracker
9582
"""
96-
self._uniqe_keys_tracker = uniqe_keys_tracker
83+
self._unique_keys_tracker = unique_keys_tracker
9784

9885
def clearAll(self):
9986
"""
10087
Clear the bloom filter cache
10188
10289
"""
103-
self._uniqe_keys_tracker.filter_pop_all()
90+
self._unique_keys_tracker.filter_pop_all()

splitio/tasks/unique_keys_sync.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66

77

88
_LOGGER = logging.getLogger(__name__)
9-
_PERIOD = 15 * 60 # 15 minutes
9+
_UNIQUE_KEYS_SYNC_PERIOD = 15 * 60 # 15 minutes
10+
_CLEAR_FILTER_SYNC_PERIOD = 60 * 60 * 24 # 24 hours
11+
1012

1113
class UniqueKeysSyncTask(BaseSynchronizationTask):
1214
"""Unique Keys synchronization task uses an asynctask.AsyncTask to send MTKs."""
1315

14-
def __init__(self, synchronize_unique_keys):
16+
def __init__(self, synchronize_unique_keys, period = None):
1517
"""
1618
Class constructor.
1719
@@ -21,7 +23,9 @@ def __init__(self, synchronize_unique_keys):
2123
:type period: int
2224
"""
2325

24-
self._task = AsyncTask(synchronize_unique_keys, _PERIOD,
26+
if period == None:
27+
period = _UNIQUE_KEYS_SYNC_PERIOD
28+
self._task = AsyncTask(synchronize_unique_keys, period,
2529
on_stop=synchronize_unique_keys)
2630

2731
def start(self):
@@ -51,7 +55,7 @@ def flush(self):
5155
class ClearFilterSyncTask(BaseSynchronizationTask):
5256
"""Unique Keys synchronization task uses an asynctask.AsyncTask to send MTKs."""
5357

54-
def __init__(self, clear_filter):
58+
def __init__(self, clear_filter, period = None):
5559
"""
5660
Class constructor.
5761
@@ -60,8 +64,10 @@ def __init__(self, clear_filter):
6064
:param period: How many seconds to wait between subsequent clearing of bloom filter
6165
:type period: int
6266
"""
63-
_period = 60 * 60 * 24 # 24 hours
64-
self._task = AsyncTask(clear_filter, _period,
67+
if period == None:
68+
period = _CLEAR_FILTER_SYNC_PERIOD
69+
70+
self._task = AsyncTask(clear_filter, period,
6571
on_stop=clear_filter)
6672

6773
def start(self):

0 commit comments

Comments
 (0)