Skip to content

Commit 85984ca

Browse files
committed
Refactor and polishing
1 parent 4950b27 commit 85984ca

26 files changed

+106
-95
lines changed

splitio/api/impressions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from splitio.api import APIException
77
from splitio.api.client import HttpClientException
88
from splitio.api.commons import headers_from_metadata
9-
from splitio.engine.impressions import ImpressionsMode
9+
from splitio.engine.impressions.impressions import ImpressionsMode
1010

1111

1212
_LOGGER = logging.getLogger(__name__)

splitio/client/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os.path
33
import logging
44

5-
from splitio.engine.impressions import ImpressionsMode
5+
from splitio.engine.impressions.impressions import ImpressionsMode
66

77

88
_LOGGER = logging.getLogger(__name__)

splitio/client/factory.py

Lines changed: 13 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111
from splitio.client.config import sanitize as sanitize_config, DEFAULT_DATA_SAMPLING
1212
from splitio.client import util
1313
from splitio.client.listener import ImpressionListenerWrapper
14-
from splitio.engine.impressions import Manager as ImpressionsManager
15-
from splitio.engine.impressions import ImpressionsMode
16-
from splitio.engine.manager import Counter as ImpressionsCounter
17-
from splitio.engine.strategies import StrategyNoneMode, StrategyDebugMode, StrategyOptimizedMode
18-
from splitio.engine.adapters import InMemorySenderAdapter, RedisSenderAdapter
14+
from splitio.engine.impressions.impressions import Manager as ImpressionsManager
15+
from splitio.engine.impressions.impressions import ImpressionsMode
16+
from splitio.engine.impressions.manager import Counter as ImpressionsCounter
17+
from splitio.engine.impressions.strategies import StrategyNoneMode, StrategyDebugMode, StrategyOptimizedMode
18+
from splitio.engine.impressions.adapters import InMemorySenderAdapter, RedisSenderAdapter
19+
from splitio.engine.impressions import set_classes
1920

2021
# Storage
2122
from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \
@@ -324,30 +325,10 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
324325
'impressions': InMemoryImpressionStorage(cfg['impressionsQueueSize']),
325326
'events': InMemoryEventStorage(cfg['eventsQueueSize']),
326327
}
327-
imp_counter = ImpressionsCounter() if cfg['impressionsMode'] != ImpressionsMode.DEBUG else None
328-
329-
unique_keys_synchronizer = None
330-
clear_filter_sync = None
331-
unique_keys_task = None
332-
clear_filter_task = None
333-
impressions_count_sync = None
334-
impressions_count_task = None
335-
336-
if cfg['impressionsMode'] == ImpressionsMode.NONE:
337-
imp_strategy = StrategyNoneMode(imp_counter)
338-
clear_filter_sync = ClearFilterSynchronizer(imp_strategy.get_unique_keys_tracker())
339-
unique_keys_synchronizer = UniqueKeysSynchronizer(InMemorySenderAdapter(apis['telemetry']), imp_strategy.get_unique_keys_tracker())
340-
unique_keys_task = UniqueKeysSyncTask(unique_keys_synchronizer.send_all)
341-
clear_filter_task = ClearFilterSyncTask(clear_filter_sync.clear_all)
342-
imp_strategy.get_unique_keys_tracker().set_queue_full_hook(unique_keys_task.flush)
343-
impressions_count_sync = ImpressionsCountSynchronizer(apis['impressions'], imp_counter)
344-
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
345-
elif cfg['impressionsMode'] == ImpressionsMode.DEBUG:
346-
imp_strategy = StrategyDebugMode()
347-
else:
348-
imp_strategy = StrategyOptimizedMode(imp_counter)
349-
impressions_count_sync = ImpressionsCountSynchronizer(apis['impressions'], imp_counter)
350-
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
328+
329+
unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \
330+
clear_filter_task, impressions_count_sync, impressions_count_task, \
331+
imp_strategy = set_classes('MEMORY', cfg['impressionsMode'], apis)
351332

352333
imp_manager = ImpressionsManager(
353334
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
@@ -432,31 +413,9 @@ def _build_redis_factory(api_key, cfg):
432413
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED)
433414
data_sampling = _MIN_DEFAULT_DATA_SAMPLING_ALLOWED
434415

435-
unique_keys_synchronizer = None
436-
clear_filter_sync = None
437-
unique_keys_task = None
438-
clear_filter_task = None
439-
impressions_count_sync = None
440-
impressions_count_task = None
441-
redis_sender_adapter = RedisSenderAdapter(redis_adapter)
442-
443-
if cfg['impressionsMode'] == ImpressionsMode.NONE:
444-
imp_counter = ImpressionsCounter()
445-
imp_strategy = StrategyNoneMode(imp_counter)
446-
clear_filter_sync = ClearFilterSynchronizer(imp_strategy.get_unique_keys_tracker())
447-
unique_keys_synchronizer = UniqueKeysSynchronizer(redis_sender_adapter, imp_strategy.get_unique_keys_tracker())
448-
unique_keys_task = UniqueKeysSyncTask(unique_keys_synchronizer.send_all)
449-
clear_filter_task = ClearFilterSyncTask(clear_filter_sync.clear_all)
450-
imp_strategy.get_unique_keys_tracker().set_queue_full_hook(unique_keys_task.flush)
451-
impressions_count_sync = ImpressionsCountSynchronizer(redis_sender_adapter, imp_counter)
452-
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
453-
elif cfg['impressionsMode'] == ImpressionsMode.DEBUG:
454-
imp_strategy = StrategyDebugMode()
455-
else:
456-
imp_counter = ImpressionsCounter()
457-
imp_strategy = StrategyOptimizedMode(imp_counter)
458-
impressions_count_sync = ImpressionsCountSynchronizer(redis_sender_adapter, imp_counter)
459-
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
416+
unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \
417+
clear_filter_task, impressions_count_sync, impressions_count_task, \
418+
imp_strategy = set_classes('REDIS', cfg['impressionsMode'], redis_adapter)
460419

461420
imp_manager = ImpressionsManager(
462421
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),

splitio/engine/filters.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import abc
2+
import threading
23

34
from bloom_filter2 import BloomFilter as BloomFilter2
45

@@ -45,6 +46,7 @@ def __init__(self, max_elements=5000, error_rate=0.01):
4546
self._max_elements = max_elements
4647
self._error_rate = error_rate
4748
self._imps_bloom_filter = BloomFilter2(max_elements=self._max_elements, error_rate=self._error_rate)
49+
self._lock = threading.RLock()
4850

4951
def add(self, data):
5052
"""
@@ -56,8 +58,9 @@ def add(self, data):
5658
:return: True if successful
5759
:rtype: boolean
5860
"""
59-
self._imps_bloom_filter.add(data)
60-
return data in self._imps_bloom_filter
61+
with self._lock:
62+
self._imps_bloom_filter.add(data)
63+
return data in self._imps_bloom_filter
6164

6265
def contains(self, data):
6366
"""
@@ -69,12 +72,14 @@ def contains(self, data):
6972
:return: True if exist
7073
:rtype: boolean
7174
"""
72-
return data in self._imps_bloom_filter
75+
with self._lock:
76+
return data in self._imps_bloom_filter
7377

7478
def clear(self):
7579
"""
7680
Destroy the current filter instance and create new one.
7781
7882
"""
79-
self._imps_bloom_filter.close()
80-
self._imps_bloom_filter = BloomFilter2(max_elements=self._max_elements, error_rate=self._error_rate)
83+
with self._lock:
84+
self._imps_bloom_filter.close()
85+
self._imps_bloom_filter = BloomFilter2(max_elements=self._max_elements, error_rate=self._error_rate)
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import imp
2+
from splitio.engine.impressions.impressions import Manager as ImpressionsManager
3+
from splitio.engine.impressions.impressions import ImpressionsMode
4+
from splitio.engine.impressions.manager import Counter as ImpressionsCounter
5+
from splitio.engine.impressions.strategies import StrategyNoneMode, StrategyDebugMode, StrategyOptimizedMode
6+
from splitio.engine.impressions.adapters import InMemorySenderAdapter, RedisSenderAdapter
7+
from splitio.tasks.unique_keys_sync import UniqueKeysSyncTask, ClearFilterSyncTask
8+
from splitio.sync.unique_keys import UniqueKeysSynchronizer, ClearFilterSynchronizer
9+
from splitio.sync.impression import ImpressionSynchronizer, ImpressionsCountSynchronizer
10+
from splitio.tasks.impressions_sync import ImpressionsSyncTask, ImpressionsCountSyncTask
11+
12+
13+
def set_classes(storage_mode, impressions_mode, api_adapter):
14+
unique_keys_synchronizer = None
15+
clear_filter_sync = None
16+
unique_keys_task = None
17+
clear_filter_task = None
18+
impressions_count_sync = None
19+
impressions_count_task = None
20+
if storage_mode == 'REDIS':
21+
redis_sender_adapter = RedisSenderAdapter(api_adapter)
22+
api_telemetry_adapter = redis_sender_adapter
23+
api_impressions_adapter = redis_sender_adapter
24+
else:
25+
api_telemetry_adapter = api_adapter['telemetry']
26+
api_impressions_adapter = api_adapter['impressions']
27+
28+
if impressions_mode == ImpressionsMode.NONE:
29+
imp_counter = ImpressionsCounter()
30+
imp_strategy = StrategyNoneMode(imp_counter)
31+
clear_filter_sync = ClearFilterSynchronizer(imp_strategy.get_unique_keys_tracker())
32+
unique_keys_synchronizer = UniqueKeysSynchronizer(InMemorySenderAdapter(api_telemetry_adapter), imp_strategy.get_unique_keys_tracker())
33+
unique_keys_task = UniqueKeysSyncTask(unique_keys_synchronizer.send_all)
34+
clear_filter_task = ClearFilterSyncTask(clear_filter_sync.clear_all)
35+
imp_strategy.get_unique_keys_tracker().set_queue_full_hook(unique_keys_task.flush)
36+
impressions_count_sync = ImpressionsCountSynchronizer(api_impressions_adapter, imp_counter)
37+
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
38+
elif impressions_mode == ImpressionsMode.DEBUG:
39+
imp_strategy = StrategyDebugMode()
40+
else:
41+
imp_counter = ImpressionsCounter()
42+
imp_strategy = StrategyOptimizedMode(imp_counter)
43+
impressions_count_sync = ImpressionsCountSynchronizer(api_impressions_adapter, imp_counter)
44+
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
45+
46+
return unique_keys_synchronizer, clear_filter_sync, unique_keys_task, clear_filter_task, \
47+
impressions_count_sync, impressions_count_task, imp_strategy
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import abc
22

3-
from splitio.engine.manager import Observer, truncate_impressions_time, Counter, truncate_time
4-
from splitio.engine.unique_keys_tracker import UniqueKeysTracker
3+
from splitio.engine.impressions.manager import Observer, truncate_impressions_time, Counter, truncate_time
4+
from splitio.engine.impressions.unique_keys_tracker import UniqueKeysTracker
55
from splitio import util
66

77
_IMPRESSION_OBSERVER_CACHE_SIZE = 500000

splitio/engine/unique_keys_tracker.py renamed to splitio/engine/impressions/unique_keys_tracker.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ def track(self, key, feature_name):
4848
with self._lock:
4949
if self._filter.contains(feature_name+key):
5050
return False
51-
52-
with self._lock:
5351
self._add_or_update(feature_name, key)
5452
self._filter.add(feature_name+key)
5553
self._current_cache_size = self._current_cache_size + 1
@@ -72,9 +70,11 @@ def _add_or_update(self, feature_name, key):
7270
:param key: key to be added to MTK list
7371
:type key: int
7472
"""
75-
if feature_name not in self._cache:
76-
self._cache[feature_name] = set()
77-
self._cache[feature_name].add(key)
73+
74+
with self._lock:
75+
if feature_name not in self._cache:
76+
self._cache[feature_name] = set()
77+
self._cache[feature_name].add(key)
7878

7979
def set_queue_full_hook(self, hook):
8080
"""
@@ -85,7 +85,7 @@ def set_queue_full_hook(self, hook):
8585
if callable(hook):
8686
self._queue_full_hook = hook
8787

88-
def filter_pop_all(self):
88+
def clear_filter(self):
8989
"""
9090
Delete the filter items
9191

0 commit comments

Comments
 (0)