Skip to content

Commit 74e5565

Browse files
authored
Merge pull request #268 from splitio/NoneImplementation
None implementation
2 parents edde08d + b78b7ee commit 74e5565

18 files changed

+487
-92
lines changed

splitio/api/telemetry.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@
66
from splitio.api.client import HttpClientException
77
from splitio.api.commons import headers_from_metadata
88

9-
_LOGGER = logging.getLogger(__name__)
10-
11-
129
class TelemetryAPI(object): # pylint: disable=too-few-public-methods
1310
"""Class that uses an httpClient to communicate with the Telemetry API."""
1411

@@ -34,8 +31,8 @@ def record_unique_keys(self, uniques):
3431
"""
3532
try:
3633
response = self._client.post(
37-
'keys',
38-
'/ss',
34+
'telemetry',
35+
'/keys/ss',
3936
self._apikey,
4037
body=uniques,
4138
extra_headers=self._metadata

splitio/client/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def _sanitize_impressions_mode(mode, refresh_rate=None):
9191
mode = ImpressionsMode(mode.upper())
9292
except (ValueError, AttributeError):
9393
_LOGGER.warning('You passed an invalid impressionsMode, impressionsMode should be '
94-
'one of the following values: `debug` or `optimized`. '
94+
'one of the following values: `debug`, `none` or `optimized`. '
9595
'Defaulting to `optimized` mode.')
9696
mode = ImpressionsMode.OPTIMIZED
9797

splitio/client/factory.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from splitio.engine.strategies import Counter as ImpressionsCounter
1717
from splitio.engine.strategies.strategy_debug_mode import StrategyDebugMode
1818
from splitio.engine.strategies.strategy_optimized_mode import StrategyOptimizedMode
19+
from splitio.engine.strategies.strategy_none_mode import StrategyNoneMode
1920
from splitio.engine.sender_adapters.in_memory_sender_adapter import InMemorySenderAdapter
2021

2122
# Storage
@@ -326,19 +327,21 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
326327
}
327328
imp_counter = ImpressionsCounter() if cfg['impressionsMode'] != ImpressionsMode.DEBUG else None
328329

329-
strategies = {
330-
ImpressionsMode.OPTIMIZED : StrategyOptimizedMode(imp_counter),
331-
ImpressionsMode.DEBUG : StrategyDebugMode(),
332-
}
333-
imp_strategy = strategies[cfg['impressionsMode']]
334-
335-
imp_counter = ImpressionsCounter() if cfg['impressionsMode'] != ImpressionsMode.DEBUG else None
336-
337-
strategies = {
338-
ImpressionsMode.OPTIMIZED : StrategyOptimizedMode(imp_counter),
339-
ImpressionsMode.DEBUG : StrategyDebugMode(),
340-
}
341-
imp_strategy = strategies[cfg['impressionsMode']]
330+
unique_keys_synchronizer = None
331+
clear_filter_sync = None
332+
unique_keys_task = None
333+
clear_filter_task = None
334+
if cfg['impressionsMode'] == ImpressionsMode.NONE:
335+
imp_strategy = StrategyNoneMode(imp_counter)
336+
clear_filter_sync = ClearFilterSynchronizer(imp_strategy._unique_keys_tracker)
337+
unique_keys_synchronizer = UniqueKeysSynchronizer(InMemorySenderAdapter(apis['telemetry']), imp_strategy._unique_keys_tracker)
338+
unique_keys_task = UniqueKeysSyncTask(unique_keys_synchronizer.send_all)
339+
clear_filter_task = ClearFilterSyncTask(clear_filter_sync.clear_all)
340+
imp_strategy._unique_keys_tracker.set_queue_full_hook(unique_keys_task.flush)
341+
elif cfg['impressionsMode'] == ImpressionsMode.DEBUG:
342+
imp_strategy = StrategyDebugMode()
343+
else:
344+
imp_strategy = StrategyOptimizedMode(imp_counter)
342345

343346
imp_manager = ImpressionsManager(
344347
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
@@ -350,9 +353,10 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
350353
ImpressionSynchronizer(apis['impressions'], storages['impressions'],
351354
cfg['impressionsBulkSize']),
352355
EventSynchronizer(apis['events'], storages['events'], cfg['eventsBulkSize']),
353-
ImpressionsCountSynchronizer(apis['impressions'], imp_manager),
356+
ImpressionsCountSynchronizer(apis['impressions'], imp_counter),
357+
unique_keys_synchronizer,
358+
clear_filter_sync
354359
)
355-
imp_count_sync_task = ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters) if cfg['impressionsMode'] == 'OPTIMIZED' else None
356360

357361
tasks = SplitTasks(
358362
SplitSynchronizationTask(
@@ -369,6 +373,8 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
369373
),
370374
EventsSyncTask(synchronizers.events_sync.synchronize_events, cfg['eventsPushRate']),
371375
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters),
376+
unique_keys_task,
377+
clear_filter_task
372378
)
373379

374380
synchronizer = Synchronizer(synchronizers, tasks)
@@ -438,6 +444,7 @@ def _build_redis_factory(api_key, cfg):
438444
recorder,
439445
)
440446

447+
441448
def _build_localhost_factory(cfg):
442449
"""Build and return a localhost factory for testing/development purposes."""
443450
storages = {
@@ -464,7 +471,6 @@ def _build_localhost_factory(cfg):
464471
synchronizer = LocalhostSynchronizer(synchronizers, tasks)
465472
manager = Manager(ready_event, synchronizer, None, False, sdk_metadata)
466473
manager.start()
467-
468474
recorder = StandardRecorder(
469475
ImpressionsManager(None, StrategyDebugMode()),
470476
storages['events'],

splitio/engine/sender_adapters/in_memory_sender_adapter.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ def _uniques_formatter(self, uniques):
3333
:return: unique keys JSON
3434
:rtype: json
3535
"""
36-
formatted_uniques = json.loads('{"keys": []}')
3736
if len(uniques) == 0:
3837
return json.loads('{"keys": []}')
3938

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from splitio.engine.strategies.base_strategy import BaseStrategy
2+
from splitio.engine.strategies import Counter, truncate_time
3+
from splitio.engine.unique_keys_tracker import UniqueKeysTracker
4+
from splitio import util
5+
6+
_IMPRESSION_OBSERVER_CACHE_SIZE = 500000
7+
_UNIQUE_KEYS_CACHE_SIZE = 30000
8+
9+
class StrategyNoneMode(BaseStrategy):
10+
"""Debug mode strategy."""
11+
12+
def __init__(self, counter):
13+
"""
14+
Construct a strategy instance for none mode.
15+
16+
"""
17+
self._counter = counter
18+
self._unique_keys_tracker = UniqueKeysTracker(_UNIQUE_KEYS_CACHE_SIZE)
19+
20+
def process_impressions(self, impressions):
21+
"""
22+
Process impressions.
23+
24+
Impressions are analyzed to see if they've been seen before and counted.
25+
Unique keys tracking are updated.
26+
27+
:param impressions: List of impression objects with attributes
28+
:type impressions: list[tuple[splitio.models.impression.Impression, dict]]
29+
30+
:returns: Empty list, no impressions to post
31+
:rtype: list[]
32+
"""
33+
self._counter.track([imp for imp, _ in impressions])
34+
[self._unique_keys_tracker.track(i.matching_key, i.feature_name) for i, _ in impressions]
35+
return [], impressions

splitio/sync/impression.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from splitio.api import APIException
55
from splitio.engine.strategies import Counter
66

7-
87
_LOGGER = logging.getLogger(__name__)
98

109

@@ -69,22 +68,26 @@ def synchronize_impressions(self):
6968

7069

7170
class ImpressionsCountSynchronizer(object):
72-
def __init__(self, impressions_api, impressions_counter):
71+
def __init__(self, impressions_api, imp_counter):
7372
"""
7473
Class constructor.
7574
7675
:param impressions_api: Impressions Api object to send data to the backend
7776
:type impressions_api: splitio.api.impressions.ImpressionsAPI
7877
:param impressions_manager: Impressions manager instance
79-
:type impressions_counter: splitio.engine.strategies
78+
:type impressions_manager: splitio.engine.impressions.Manager
8079
8180
"""
8281
self._impressions_api = impressions_api
83-
self._impressions_counter = impressions_counter
82+
self._impressions_counter = imp_counter
8483

8584
def synchronize_counters(self):
8685
"""Send impressions from both the failed and new queues."""
87-
to_send = self._impressions_manager._strategy._counter.pop_all()
86+
87+
if not isinstance(self._impressions_counter, Counter):
88+
return
89+
90+
to_send = self._impressions_counter.pop_all()
8891
if not to_send:
8992
return
9093

splitio/sync/synchronizer.py

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class SplitSynchronizers(object):
1414
"""SplitSynchronizers."""
1515

1616
def __init__(self, split_sync, segment_sync, impressions_sync, events_sync, # pylint:disable=too-many-arguments
17-
impressions_count_sync):
17+
impressions_count_sync, unique_keys_sync = None, clear_filter_sync = None):
1818
"""
1919
Class constructor.
2020
@@ -34,6 +34,8 @@ def __init__(self, split_sync, segment_sync, impressions_sync, events_sync, # p
3434
self._impressions_sync = impressions_sync
3535
self._events_sync = events_sync
3636
self._impressions_count_sync = impressions_count_sync
37+
self._unique_keys_sync = unique_keys_sync
38+
self._clear_filter_sync = clear_filter_sync
3739

3840
@property
3941
def split_sync(self):
@@ -60,11 +62,21 @@ def impressions_count_sync(self):
6062
"""Return impressions count synchonizer."""
6163
return self._impressions_count_sync
6264

65+
@property
66+
def unique_keys_sync(self):
67+
"""Return unique keys synchonizer."""
68+
return self._unique_keys_sync
69+
70+
@property
71+
def clear_filter_sync(self):
72+
"""Return clear filter synchonizer."""
73+
return self._clear_filter_sync
74+
6375
class SplitTasks(object):
6476
"""SplitTasks."""
6577

6678
def __init__(self, split_task, segment_task, impressions_task, events_task, # pylint:disable=too-many-arguments
67-
impressions_count_task):
79+
impressions_count_task, unique_keys_task = None, clear_filter_task = None):
6880
"""
6981
Class constructor.
7082
@@ -84,6 +96,8 @@ def __init__(self, split_task, segment_task, impressions_task, events_task, # p
8496
self._impressions_task = impressions_task
8597
self._events_task = events_task
8698
self._impressions_count_task = impressions_count_task
99+
self._unique_keys_task = unique_keys_task
100+
self._clear_filter_task = clear_filter_task
87101

88102
@property
89103
def split_task(self):
@@ -110,6 +124,16 @@ def impressions_count_task(self):
110124
"""Return impressions count sync task."""
111125
return self._impressions_count_task
112126

127+
@property
128+
def unique_keys_task(self):
129+
"""Return unique keys sync task."""
130+
return self._unique_keys_task
131+
132+
@property
133+
def clear_filter_task(self):
134+
"""Return clear filter sync task."""
135+
return self._clear_filter_task
136+
113137
class BaseSynchronizer(object, metaclass=abc.ABCMeta):
114138
"""Synchronizer interface."""
115139

@@ -300,8 +324,11 @@ def start_periodic_data_recording(self):
300324
_LOGGER.debug('Starting periodic data recording')
301325
self._split_tasks.impressions_task.start()
302326
self._split_tasks.events_task.start()
303-
if self._split_tasks.impressions_count_task is not None:
304-
self._split_tasks.impressions_count_task.start()
327+
self._split_tasks.impressions_count_task.start()
328+
if self._split_tasks.unique_keys_task is not None:
329+
self._split_tasks.unique_keys_task.start()
330+
if self._split_tasks.clear_filter_task is not None:
331+
self._split_tasks.clear_filter_task.start()
305332

306333
def stop_periodic_data_recording(self, blocking):
307334
"""
@@ -316,6 +343,11 @@ def stop_periodic_data_recording(self, blocking):
316343
tasks = [self._split_tasks.impressions_task,
317344
self._split_tasks.events_task,
318345
self._split_tasks.impressions_count_task]
346+
if self._split_tasks.unique_keys_task is not None:
347+
tasks.append(self._split_tasks.unique_keys_task)
348+
if self._split_tasks.clear_filter_task is not None:
349+
tasks.append(self._split_tasks.clear_filter_task)
350+
319351
for task in tasks:
320352
stop_event = threading.Event()
321353
task.stop(stop_event)
@@ -325,8 +357,11 @@ def stop_periodic_data_recording(self, blocking):
325357
else:
326358
self._split_tasks.impressions_task.stop()
327359
self._split_tasks.events_task.stop()
328-
if self._split_tasks.impressions_count_task is not None:
329-
self._split_tasks.impressions_count_task.stop()
360+
self._split_tasks.impressions_count_task.stop()
361+
if self._split_tasks.unique_keys_task is not None:
362+
self._split_tasks.unique_keys_task.stop()
363+
if self._split_tasks.clear_filter_task is not None:
364+
self._split_tasks.clear_filter_task.stop()
330365

331366
def kill_split(self, split_name, default_treatment, change_number):
332367
"""
@@ -342,8 +377,6 @@ def kill_split(self, split_name, default_treatment, change_number):
342377
self._split_synchronizers.split_sync.kill_split(split_name, default_treatment,
343378
change_number)
344379

345-
346-
347380
class LocalhostSynchronizer(BaseSynchronizer):
348381
"""LocalhostSynchronizer."""
349382

0 commit comments

Comments
 (0)