Skip to content

Commit 5cc32c1

Browse files
committed
Bring branch up to date with other branches
1 parent 8367d59 commit 5cc32c1

17 files changed

+414
-328
lines changed

splitio/client/factory.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@
1212
from splitio.client import util
1313
from splitio.client.listener import ImpressionListenerWrapper
1414
from splitio.engine.impressions import Manager as ImpressionsManager
15+
from splitio.engine.impressions import ImpressionsMode
16+
from splitio.engine.strategies import Counter as ImpressionsCounter
17+
from splitio.engine.strategies.strategy_debug_mode import StrategyDebugMode
18+
from splitio.engine.strategies.strategy_optimized_mode import StrategyOptimizedMode
19+
from splitio.engine.sender_adapters.in_memory_sender_adapter import InMemorySenderAdapter
1520

1621
# Storage
1722
from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \
@@ -307,7 +312,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
307312
'segments': SegmentsAPI(http_client, api_key, sdk_metadata),
308313
'impressions': ImpressionsAPI(http_client, api_key, sdk_metadata, cfg['impressionsMode']),
309314
'events': EventsAPI(http_client, api_key, sdk_metadata),
310-
'telemtery': TelemetryAPI(http_client, api_key, sdk_metadata),
315+
'telemetry': TelemetryAPI(http_client, api_key, sdk_metadata),
311316
}
312317

313318
if not input_validator.validate_apikey_type(apis['segments']):
@@ -319,11 +324,17 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
319324
'impressions': InMemoryImpressionStorage(cfg['impressionsQueueSize']),
320325
'events': InMemoryEventStorage(cfg['eventsQueueSize']),
321326
}
327+
imp_counter = ImpressionsCounter() if cfg['impressionsMode'] != ImpressionsMode.DEBUG else None
328+
329+
strategies = {
330+
ImpressionsMode.OPTIMIZED : StrategyOptimizedMode(imp_counter),
331+
ImpressionsMode.DEBUG : StrategyDebugMode(),
332+
}
333+
imp_strategy = strategies[cfg['impressionsMode']]
322334

323335
imp_manager = ImpressionsManager(
324-
cfg['impressionsMode'],
325-
True,
326-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata))
336+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
337+
imp_strategy)
327338

328339
synchronizers = SplitSynchronizers(
329340
SplitSynchronizer(apis['splits'], storages['splits']),
@@ -332,8 +343,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
332343
cfg['impressionsBulkSize']),
333344
EventSynchronizer(apis['events'], storages['events'], cfg['eventsBulkSize']),
334345
ImpressionsCountSynchronizer(apis['impressions'], imp_manager),
335-
UniqueKeysSynchronizer(), # TODO: Pass the UniqueKeysTracker instance fetched from Strategy instance created above.
336-
ClearFilterSynchronizer(), # TODO: Pass the UniqueKeysTracker instance fetched from Strategy instance created above.
337346
)
338347

339348
tasks = SplitTasks(
@@ -351,8 +360,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
351360
),
352361
EventsSyncTask(synchronizers.events_sync.synchronize_events, cfg['eventsPushRate']),
353362
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters),
354-
UniqueKeysSyncTask(synchronizers.unique_keys_sync.SendAll),
355-
ClearFilterSyncTask(synchronizers.clear_filter_sync.clearAll)
356363
)
357364

358365
synchronizer = Synchronizer(synchronizers, tasks)
@@ -365,7 +372,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
365372

366373
storages['events'].set_queue_full_hook(tasks.events_task.flush)
367374
storages['impressions'].set_queue_full_hook(tasks.impressions_task.flush)
368-
# TODO: Add unique_keys_tracker.set_queue_full_hook(tasks.unique_keys.flush)
369375

370376
recorder = StandardRecorder(
371377
imp_manager,
@@ -404,10 +410,14 @@ def _build_redis_factory(api_key, cfg):
404410
_LOGGER.warning("dataSampling cannot be less than %.2f, defaulting to minimum",
405411
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED)
406412
data_sampling = _MIN_DEFAULT_DATA_SAMPLING_ALLOWED
413+
414+
imp_manager = ImpressionsManager(
415+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
416+
StrategyDebugMode())
417+
407418
recorder = PipelinedRecorder(
408419
redis_adapter.pipeline,
409-
ImpressionsManager(cfg['impressionsMode'], False,
410-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata)),
420+
imp_manager,
411421
storages['events'],
412422
storages['impressions'],
413423
data_sampling,
@@ -447,7 +457,7 @@ def _build_localhost_factory(cfg):
447457
manager = Manager(ready_event, synchronizer, None, False, sdk_metadata)
448458
manager.start()
449459
recorder = StandardRecorder(
450-
ImpressionsManager(cfg['impressionsMode'], True, None),
460+
ImpressionsManager(None, StrategyDebugMode()),
451461
storages['events'],
452462
storages['impressions'],
453463
)
@@ -496,7 +506,8 @@ def get_factory(api_key, **kwargs):
496506
kwargs.get('sdk_api_base_url'),
497507
kwargs.get('events_api_base_url'),
498508
kwargs.get('auth_api_base_url'),
499-
kwargs.get('streaming_api_base_url')
509+
kwargs.get('streaming_api_base_url'),
510+
kwargs.get('telemetry_api_base_url')
500511
)
501512
finally:
502513
_INSTANTIATED_FACTORIES.update([api_key])

splitio/engine/impressions.py

Lines changed: 7 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -1,160 +1,19 @@
11
"""Split evaluator module."""
2-
import threading
3-
from collections import defaultdict, namedtuple
42
from enum import Enum
53

6-
from splitio.models.impressions import Impression
7-
from splitio.engine.hashfns import murmur_128
8-
from splitio.engine.cache.lru import SimpleLruCache
94
from splitio.client.listener import ImpressionListenerException
10-
from splitio import util
11-
12-
13-
_TIME_INTERVAL_MS = 3600 * 1000 # one hour
14-
_IMPRESSION_OBSERVER_CACHE_SIZE = 500000
15-
165

176
class ImpressionsMode(Enum):
187
"""Impressions tracking mode."""
198

209
OPTIMIZED = "OPTIMIZED"
2110
DEBUG = "DEBUG"
22-
23-
24-
def truncate_time(timestamp_ms):
25-
"""
26-
Truncate a timestamp in milliseconds to have hour granularity.
27-
28-
:param timestamp_ms: timestamp generated in the impression.
29-
:type timestamp_ms: int
30-
31-
:returns: a timestamp with hour, min, seconds, and ms set to 0.
32-
:rtype: int
33-
"""
34-
return timestamp_ms - (timestamp_ms % _TIME_INTERVAL_MS)
35-
36-
37-
class Hasher(object): # pylint:disable=too-few-public-methods
38-
"""Impression hasher."""
39-
40-
_PATTERN = "%s:%s:%s:%s:%d"
41-
42-
def __init__(self, hash_fn=murmur_128, seed=0):
43-
"""
44-
Class constructor.
45-
46-
:param hash_fn: Hash function to apply (str, int) -> int
47-
:type hash_fn: callable
48-
49-
:param seed: seed to be provided when hashing
50-
:type seed: int
51-
"""
52-
self._hash_fn = hash_fn
53-
self._seed = seed
54-
55-
def _stringify(self, impression):
56-
"""
57-
Stringify an impression.
58-
59-
:param impression: Impression to stringify using _PATTERN
60-
:type impression: splitio.models.impressions.Impression
61-
62-
:returns: a string representation of the impression
63-
:rtype: str
64-
"""
65-
return self._PATTERN % (impression.matching_key if impression.matching_key else 'UNKNOWN',
66-
impression.feature_name if impression.feature_name else 'UNKNOWN',
67-
impression.treatment if impression.treatment else 'UNKNOWN',
68-
impression.label if impression.label else 'UNKNOWN',
69-
impression.change_number if impression.change_number else 0)
70-
71-
def process(self, impression):
72-
"""
73-
Hash an impression.
74-
75-
:param impression: Impression to hash.
76-
:type impression: splitio.models.impressions.Impression
77-
78-
:returns: a hash of the supplied impression's relevant fields.
79-
:rtype: int
80-
"""
81-
return self._hash_fn(self._stringify(impression), self._seed)
82-
83-
84-
class Observer(object): # pylint:disable=too-few-public-methods
85-
"""Observe impression and add a previous time if applicable."""
86-
87-
def __init__(self, size):
88-
"""Class constructor."""
89-
self._hasher = Hasher()
90-
self._cache = SimpleLruCache(size)
91-
92-
def test_and_set(self, impression):
93-
"""
94-
Examine an impression to determine and set it's previous time accordingly.
95-
96-
:param impression: Impression to track
97-
:type impression: splitio.models.impressions.Impression
98-
99-
:returns: Impression with populated previous time
100-
:rtype: splitio.models.impressions.Impression
101-
"""
102-
previous_time = self._cache.test_and_set(self._hasher.process(impression), impression.time)
103-
return Impression(impression.matching_key,
104-
impression.feature_name,
105-
impression.treatment,
106-
impression.label,
107-
impression.change_number,
108-
impression.bucketing_key,
109-
impression.time,
110-
previous_time)
111-
112-
113-
class Counter(object):
114-
"""Class that counts impressions per timeframe."""
115-
116-
CounterKey = namedtuple('Count', ['feature', 'timeframe'])
117-
CountPerFeature = namedtuple('CountPerFeature', ['feature', 'timeframe', 'count'])
118-
119-
def __init__(self):
120-
"""Class constructor."""
121-
self._data = defaultdict(lambda: 0)
122-
self._lock = threading.Lock()
123-
124-
def track(self, impressions, inc=1):
125-
"""
126-
Register N new impressions for a feature in a specific timeframe.
127-
128-
:param impressions: generated impressions
129-
:type impressions: list[splitio.models.impressions.Impression]
130-
131-
:param inc: amount to increment (defaults to 1)
132-
:type inc: int
133-
"""
134-
keys = [Counter.CounterKey(i.feature_name, truncate_time(i.time)) for i in impressions]
135-
with self._lock:
136-
for key in keys:
137-
self._data[key] += inc
138-
139-
def pop_all(self):
140-
"""
141-
Clear and return all the counters currently stored.
142-
143-
:returns: List of count per feature/timeframe objects
144-
:rtype: list[ImpressionCounter.CountPerFeature]
145-
"""
146-
with self._lock:
147-
old = self._data
148-
self._data = defaultdict(lambda: 0)
149-
150-
return [Counter.CountPerFeature(k.feature, k.timeframe, v)
151-
for (k, v) in old.items()]
152-
11+
NONE = "NONE"
15312

15413
class Manager(object): # pylint:disable=too-few-public-methods
15514
"""Impression manager."""
15615

157-
def __init__(self, mode=ImpressionsMode.OPTIMIZED, standalone=True, listener=None):
16+
def __init__(self, listener=None, strategy=None):
15817
"""
15918
Construct a manger to track and forward impressions to the queue.
16019
@@ -167,8 +26,8 @@ def __init__(self, mode=ImpressionsMode.OPTIMIZED, standalone=True, listener=Non
16726
:param listener: Optional impressions listener that will capture all seen impressions.
16827
:type listener: splitio.client.listener.ImpressionListenerWrapper
16928
"""
170-
self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE) if standalone else None
171-
self._counter = Counter() if standalone and mode == ImpressionsMode.OPTIMIZED else None
29+
30+
self._strategy = strategy
17231
self._listener = listener
17332

17433
def process_impressions(self, impressions):
@@ -180,26 +39,9 @@ def process_impressions(self, impressions):
18039
:param impressions: List of impression objects with attributes
18140
:type impressions: list[tuple[splitio.models.impression.Impression, dict]]
18241
"""
183-
imps = [(self._observer.test_and_set(imp), attrs) for imp, attrs in impressions] \
184-
if self._observer else impressions
185-
186-
if self._counter:
187-
self._counter.track([imp for imp, _ in imps])
188-
189-
self._send_impressions_to_listener(imps)
190-
191-
this_hour = truncate_time(util.utctime_ms())
192-
return [imp for imp, _ in imps] if self._counter is None \
193-
else [i for i, _ in imps if i.previous_time is None or i.previous_time < this_hour]
194-
195-
def get_counts(self):
196-
"""
197-
Return counts of impressions per features.
198-
199-
:returns: A list of counter objects.
200-
:rtype: list[Counter.CountPerFeature]
201-
"""
202-
return self._counter.pop_all() if self._counter is not None else []
42+
for_log, for_listener = self._strategy.process_impressions(impressions)
43+
self._send_impressions_to_listener(for_listener)
44+
return for_log
20345

20446
def _send_impressions_to_listener(self, impressions):
20547
"""

splitio/engine/sender_adapters/__init__.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,3 @@ def record_unique_keys(self, data):
1010
1111
"""
1212
pass
13-
14-
@abc.abstractmethod
15-
def record_impressions_count(self, data):
16-
"""
17-
No Return value
18-
19-
"""
20-
pass

splitio/engine/sender_adapters/in_memory_sender_adapter.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@ def _uniques_formatter(self, uniques):
3333
:return: unique keys JSON
3434
:rtype: json
3535
"""
36-
formatted_uniques = json.load('{keys: []}')
36+
formatted_uniques = json.loads('{"keys": []}')
3737
if len(uniques) == 0:
38-
return formatted_uniques
39-
for key in uniques:
40-
formatted_uniques['keys'].append('{"f":"' + key +'", "ks:['+ json.dump(uniques[key])+']}')
41-
return formatted_uniques
38+
return json.loads('{"keys": []}')
39+
40+
return {
41+
'keys': [{'f': feature, 'ks': list(keys)} for feature, keys in uniques.items()]
42+
}

0 commit comments

Comments
 (0)