Skip to content

Commit f0e9873

Browse files
authored
Merge pull request #535 from splitio/async-pr
Refactored Recorder classes
2 parents 5155e85 + 7337615 commit f0e9873

File tree

17 files changed

+218
-392
lines changed

17 files changed

+218
-392
lines changed

splitio/client/factory.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from splitio.engine.impressions.strategies import StrategyDebugMode
1717
from splitio.engine.telemetry import TelemetryStorageProducer, TelemetryStorageConsumer, \
1818
TelemetryStorageProducerAsync, TelemetryStorageConsumerAsync
19-
from splitio.engine.impressions.manager import Counter as ImpressionsCounter, CounterAsync as ImpressionsCounterAsync
19+
from splitio.engine.impressions.manager import Counter as ImpressionsCounter
2020
from splitio.engine.impressions.unique_keys_tracker import UniqueKeysTracker, UniqueKeysTrackerAsync
2121

2222
# Storage
@@ -663,7 +663,7 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=
663663

664664
telemetry_submitter = InMemoryTelemetrySubmitterAsync(telemetry_consumer, storages['splits'], storages['segments'], apis['telemetry'])
665665

666-
imp_counter = ImpressionsCounterAsync()
666+
imp_counter = ImpressionsCounter()
667667
unique_keys_tracker = UniqueKeysTrackerAsync(_UNIQUE_KEYS_CACHE_SIZE)
668668
unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \
669669
clear_filter_task, impressions_count_sync, impressions_count_task, \
@@ -840,7 +840,7 @@ async def _build_redis_factory_async(api_key, cfg):
840840
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED)
841841
data_sampling = _MIN_DEFAULT_DATA_SAMPLING_ALLOWED
842842

843-
imp_counter = ImpressionsCounterAsync()
843+
imp_counter = ImpressionsCounter()
844844
unique_keys_tracker = UniqueKeysTrackerAsync(_UNIQUE_KEYS_CACHE_SIZE)
845845
unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \
846846
clear_filter_task, impressions_count_sync, impressions_count_task, \
@@ -999,7 +999,7 @@ async def _build_pluggable_factory_async(api_key, cfg):
999999
# Using same class as redis
10001000
telemetry_submitter = RedisTelemetrySubmitterAsync(storages['telemetry'])
10011001

1002-
imp_counter = ImpressionsCounterAsync()
1002+
imp_counter = ImpressionsCounter()
10031003
unique_keys_tracker = UniqueKeysTrackerAsync(_UNIQUE_KEYS_CACHE_SIZE)
10041004
unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \
10051005
clear_filter_task, impressions_count_sync, impressions_count_task, \

splitio/engine/impressions/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def set_classes(storage_mode, impressions_mode, api_adapter, imp_counter, unique
1818
:param api_adapter: api adapter instance(s)
1919
:type impressions_mode: dict or splitio.storage.adapters.redis.RedisAdapter/splitio.storage.adapters.redis.RedisAdapterAsync
2020
:param imp_counter: Impressions Counter instance
21-
:type imp_counter: splitio.engine.impressions.Counter/splitio.engine.impressions.CounterAsync
21+
:type imp_counter: splitio.engine.impressions.Counter/splitio.engine.impressions.Counter
2222
:param unique_keys_tracker: Unique Keys Tracker instance
2323
:type unique_keys_tracker: splitio.engine.unique_keys_tracker.UniqueKeysTracker/splitio.engine.unique_keys_tracker.UniqueKeysTrackerAsync
2424
:param prefix: Prefix used for redis or pluggable adapters
@@ -83,7 +83,7 @@ def set_classes_async(storage_mode, impressions_mode, api_adapter, imp_counter,
8383
:param api_adapter: api adapter instance(s)
8484
:type impressions_mode: dict or splitio.storage.adapters.redis.RedisAdapter/splitio.storage.adapters.redis.RedisAdapterAsync
8585
:param imp_counter: Impressions Counter instance
86-
:type imp_counter: splitio.engine.impressions.Counter/splitio.engine.impressions.CounterAsync
86+
:type imp_counter: splitio.engine.impressions.Counter/splitio.engine.impressions.Counter
8787
:param unique_keys_tracker: Unique Keys Tracker instance
8888
:type unique_keys_tracker: splitio.engine.unique_keys_tracker.UniqueKeysTracker/splitio.engine.unique_keys_tracker.UniqueKeysTrackerAsync
8989
:param prefix: Prefix used for redis or pluggable adapters

splitio/engine/impressions/manager.py

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -153,40 +153,3 @@ def pop_all(self):
153153

154154
return [Counter.CountPerFeature(k.feature, k.timeframe, v)
155155
for (k, v) in old.items()]
156-
157-
class CounterAsync(object):
158-
"""Class that counts impressions per timeframe."""
159-
160-
def __init__(self):
161-
"""Class constructor."""
162-
self._data = defaultdict(lambda: 0)
163-
self._lock = asyncio.Lock()
164-
165-
async def track(self, impressions, inc=1):
166-
"""
167-
Register N new impressions for a feature in a specific timeframe.
168-
169-
:param impressions: generated impressions
170-
:type impressions: list[splitio.models.impressions.Impression]
171-
172-
:param inc: amount to increment (defaults to 1)
173-
:type inc: int
174-
"""
175-
keys = [Counter.CounterKey(i.feature_name, truncate_time(i.time)) for i in impressions]
176-
async with self._lock:
177-
for key in keys:
178-
self._data[key] += inc
179-
180-
async def pop_all(self):
181-
"""
182-
Clear and return all the counters currently stored.
183-
184-
:returns: List of count per feature/timeframe objects
185-
:rtype: list[ImpressionCounter.CountPerFeature]
186-
"""
187-
async with self._lock:
188-
old = self._data
189-
self._data = defaultdict(lambda: 0)
190-
191-
return [Counter.CountPerFeature(k.feature, k.timeframe, v)
192-
for (k, v) in old.items()]

splitio/engine/telemetry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,7 @@ async def pop_formatted_stats(self):
668668
last_synchronization = await self.get_last_synchronization()
669669
http_errors = await self.pop_http_errors()
670670
http_latencies = await self.pop_http_latencies()
671-
671+
# TODO: if ufs value is too large, use gather to fetch events instead of serial style.
672672
return {
673673
'iQ': await self.get_impressions_stats(CounterConstants.IMPRESSIONS_QUEUED),
674674
'iDe': await self.get_impressions_stats(CounterConstants.IMPRESSIONS_DEDUPED),

splitio/recorder/recorder.py

Lines changed: 85 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,36 @@
77
from splitio.client.listener import ImpressionListenerException
88
from splitio.models.telemetry import MethodExceptionsAndLatencies
99
from splitio.models import telemetry
10+
from splitio.optional.loaders import asyncio
1011

1112
_LOGGER = logging.getLogger(__name__)
1213

1314

1415
class StatsRecorder(object, metaclass=abc.ABCMeta):
1516
"""StatsRecorder interface."""
1617

18+
def __init__(self, impressions_manager, event_storage, impression_storage, listener=None, unique_keys_tracker=None, imp_counter=None):
19+
"""
20+
Class constructor.
21+
22+
:param impressions_manager: impression manager instance
23+
:type impressions_manager: splitio.engine.impressions.Manager
24+
:param event_storage: event storage instance
25+
:type event_storage: splitio.storage.EventStorage
26+
:param impression_storage: impression storage instance
27+
:type impression_storage: splitio.storage.ImpressionStorage
28+
:param unique_keys_tracker: Unique Keys Tracker instance
29+
:type unique_keys_tracker: splitio.engine.unique_keys_tracker.UniqueKeysTracker
30+
:param imp_counter: Impressions Counter instance
31+
:type imp_counter: splitio.engine.impressions.Counter
32+
"""
33+
self._impressions_manager = impressions_manager
34+
self._event_sotrage = event_storage
35+
self._impression_storage = impression_storage
36+
self._listener = listener
37+
self._unique_keys_tracker = unique_keys_tracker
38+
self._imp_counter = imp_counter
39+
1740
@abc.abstractmethod
1841
def record_treatment_stats(self, impressions, latency, operation):
1942
"""
@@ -38,7 +61,27 @@ def record_track_stats(self, events):
3861
"""
3962
pass
4063

41-
async def _send_impressions_to_listener_async(self, impressions):
64+
class StatsRecorderThreadingBase(StatsRecorder):
65+
"""StandardRecorder class."""
66+
67+
def __init__(self, impressions_manager, event_storage, impression_storage, listener=None, unique_keys_tracker=None, imp_counter=None):
68+
"""
69+
Class constructor.
70+
71+
:param impressions_manager: impression manager instance
72+
:type impressions_manager: splitio.engine.impressions.Manager
73+
:param event_storage: event storage instance
74+
:type event_storage: splitio.storage.EventStorage
75+
:param impression_storage: impression storage instance
76+
:type impression_storage: splitio.storage.ImpressionStorage
77+
:param unique_keys_tracker: Unique Keys Tracker instance
78+
:type unique_keys_tracker: splitio.engine.unique_keys_tracker.UniqueKeysTracker
79+
:param imp_counter: Impressions Counter instance
80+
:type imp_counter: splitio.engine.impressions.Counter
81+
"""
82+
StatsRecorder.__init__(self, impressions_manager, event_storage, impression_storage, listener, unique_keys_tracker, imp_counter)
83+
84+
def _send_impressions_to_listener(self, impressions):
4285
"""
4386
Send impression result to custom listener.
4487
@@ -48,11 +91,31 @@ async def _send_impressions_to_listener_async(self, impressions):
4891
if self._listener is not None:
4992
try:
5093
for impression, attributes in impressions:
51-
await self._listener.log_impression(impression, attributes)
94+
self._listener.log_impression(impression, attributes)
5295
except ImpressionListenerException:
5396
pass
5497

55-
def _send_impressions_to_listener(self, impressions):
98+
class StatsRecorderAsyncBase(StatsRecorder):
99+
"""StandardRecorder class."""
100+
101+
def __init__(self, impressions_manager, event_storage, impression_storage, listener=None, unique_keys_tracker=None, imp_counter=None):
102+
"""
103+
Class constructor.
104+
105+
:param impressions_manager: impression manager instance
106+
:type impressions_manager: splitio.engine.impressions.Manager
107+
:param event_storage: event storage instance
108+
:type event_storage: splitio.storage.EventStorage
109+
:param impression_storage: impression storage instance
110+
:type impression_storage: splitio.storage.ImpressionStorage
111+
:param unique_keys_tracker: Unique Keys Tracker instance
112+
:type unique_keys_tracker: splitio.engine.unique_keys_tracker.UniqueKeysTracker
113+
:param imp_counter: Impressions Counter instance
114+
:type imp_counter: splitio.engine.impressions.Counter
115+
"""
116+
StatsRecorder.__init__(self, impressions_manager, event_storage, impression_storage, listener, unique_keys_tracker, imp_counter)
117+
118+
async def _send_impressions_to_listener_async(self, impressions):
56119
"""
57120
Send impression result to custom listener.
58121
@@ -62,11 +125,11 @@ def _send_impressions_to_listener(self, impressions):
62125
if self._listener is not None:
63126
try:
64127
for impression, attributes in impressions:
65-
self._listener.log_impression(impression, attributes)
128+
await self._listener.log_impression(impression, attributes)
66129
except ImpressionListenerException:
67130
pass
68131

69-
class StandardRecorder(StatsRecorder):
132+
class StandardRecorder(StatsRecorderThreadingBase):
70133
"""StandardRecorder class."""
71134

72135
def __init__(self, impressions_manager, event_storage, impression_storage, telemetry_evaluation_producer, telemetry_runtime_producer, listener=None, unique_keys_tracker=None, imp_counter=None):
@@ -84,14 +147,9 @@ def __init__(self, impressions_manager, event_storage, impression_storage, telem
84147
:param imp_counter: Impressions Counter instance
85148
:type imp_counter: splitio.engine.impressions.Counter
86149
"""
87-
self._impressions_manager = impressions_manager
88-
self._event_sotrage = event_storage
89-
self._impression_storage = impression_storage
150+
StatsRecorderThreadingBase.__init__(self, impressions_manager, event_storage, impression_storage, listener, unique_keys_tracker, imp_counter)
90151
self._telemetry_evaluation_producer = telemetry_evaluation_producer
91152
self._telemetry_runtime_producer = telemetry_runtime_producer
92-
self._listener = listener
93-
self._unique_keys_tracker = unique_keys_tracker
94-
self._imp_counter = imp_counter
95153

96154
def record_treatment_stats(self, impressions, latency, operation, method_name):
97155
"""
@@ -130,8 +188,7 @@ def record_track_stats(self, event, latency):
130188
self._telemetry_evaluation_producer.record_latency(MethodExceptionsAndLatencies.TRACK, latency)
131189
return self._event_sotrage.put(event)
132190

133-
134-
class StandardRecorderAsync(StatsRecorder):
191+
class StandardRecorderAsync(StatsRecorderAsyncBase):
135192
"""StandardRecorder async class."""
136193

137194
def __init__(self, impressions_manager, event_storage, impression_storage, telemetry_evaluation_producer, telemetry_runtime_producer, listener=None, unique_keys_tracker=None, imp_counter=None):
@@ -147,16 +204,11 @@ def __init__(self, impressions_manager, event_storage, impression_storage, telem
147204
:param unique_keys_tracker: Unique Keys Tracker instance
148205
:type unique_keys_tracker: splitio.engine.unique_keys_tracker.UniqueKeysTrackerAsync
149206
:param imp_counter: Impressions Counter instance
150-
:type imp_counter: splitio.engine.impressions.CounterAsync
207+
:type imp_counter: splitio.engine.impressions.Counter
151208
"""
152-
self._impressions_manager = impressions_manager
153-
self._event_sotrage = event_storage
154-
self._impression_storage = impression_storage
209+
StatsRecorderAsyncBase.__init__(self, impressions_manager, event_storage, impression_storage, listener, unique_keys_tracker, imp_counter)
155210
self._telemetry_evaluation_producer = telemetry_evaluation_producer
156211
self._telemetry_runtime_producer = telemetry_runtime_producer
157-
self._listener = listener
158-
self._unique_keys_tracker = unique_keys_tracker
159-
self._imp_counter = imp_counter
160212

161213
async def record_treatment_stats(self, impressions, latency, operation, method_name):
162214
"""
@@ -179,9 +231,10 @@ async def record_treatment_stats(self, impressions, latency, operation, method_n
179231
await self._impression_storage.put(impressions)
180232
await self._send_impressions_to_listener_async(for_listener)
181233
if len(for_counter) > 0:
182-
await self._imp_counter.track(for_counter)
234+
self._imp_counter.track(for_counter)
183235
if len(for_unique_keys_tracker) > 0:
184-
[await self._unique_keys_tracker.track(item[0], item[1]) for item in for_unique_keys_tracker]
236+
unique_keys_coros = [self._unique_keys_tracker.track(item[0], item[1]) for item in for_unique_keys_tracker]
237+
await asyncio.gather(*unique_keys_coros)
185238
except Exception: # pylint: disable=broad-except
186239
_LOGGER.error('Error recording impressions')
187240
_LOGGER.debug('Error: ', exc_info=True)
@@ -196,8 +249,7 @@ async def record_track_stats(self, event, latency):
196249
await self._telemetry_evaluation_producer.record_latency(MethodExceptionsAndLatencies.TRACK, latency)
197250
return await self._event_sotrage.put(event)
198251

199-
200-
class PipelinedRecorder(StatsRecorder):
252+
class PipelinedRecorder(StatsRecorderThreadingBase):
201253
"""PipelinedRecorder class."""
202254

203255
def __init__(self, pipe, impressions_manager, event_storage,
@@ -220,15 +272,10 @@ def __init__(self, pipe, impressions_manager, event_storage,
220272
:param imp_counter: Impressions Counter instance
221273
:type imp_counter: splitio.engine.impressions.Counter
222274
"""
275+
StatsRecorderThreadingBase.__init__(self, impressions_manager, event_storage, impression_storage, listener, unique_keys_tracker, imp_counter)
223276
self._make_pipe = pipe
224-
self._impressions_manager = impressions_manager
225-
self._event_sotrage = event_storage
226-
self._impression_storage = impression_storage
227277
self._data_sampling = data_sampling
228278
self._telemetry_redis_storage = telemetry_redis_storage
229-
self._listener = listener
230-
self._unique_keys_tracker = unique_keys_tracker
231-
self._imp_counter = imp_counter
232279

233280
def record_treatment_stats(self, impressions, latency, operation, method_name):
234281
"""
@@ -246,6 +293,7 @@ def record_treatment_stats(self, impressions, latency, operation, method_name):
246293
rnumber = random.uniform(0, 1)
247294
if self._data_sampling < rnumber:
248295
return
296+
249297
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions)
250298
if impressions:
251299
pipe = self._make_pipe()
@@ -291,7 +339,7 @@ def record_track_stats(self, event, latency):
291339
_LOGGER.debug('Error: ', exc_info=True)
292340
return False
293341

294-
class PipelinedRecorderAsync(StatsRecorder):
342+
class PipelinedRecorderAsync(StatsRecorderAsyncBase):
295343
"""PipelinedRecorder async class."""
296344

297345
def __init__(self, pipe, impressions_manager, event_storage,
@@ -312,17 +360,12 @@ def __init__(self, pipe, impressions_manager, event_storage,
312360
:param unique_keys_tracker: Unique Keys Tracker instance
313361
:type unique_keys_tracker: splitio.engine.unique_keys_tracker.UniqueKeysTrackerAsync
314362
:param imp_counter: Impressions Counter instance
315-
:type imp_counter: splitio.engine.impressions.CounterAsync
363+
:type imp_counter: splitio.engine.impressions.Counter
316364
"""
365+
StatsRecorderAsyncBase.__init__(self, impressions_manager, event_storage, impression_storage, listener, unique_keys_tracker, imp_counter)
317366
self._make_pipe = pipe
318-
self._impressions_manager = impressions_manager
319-
self._event_sotrage = event_storage
320-
self._impression_storage = impression_storage
321367
self._data_sampling = data_sampling
322368
self._telemetry_redis_storage = telemetry_redis_storage
323-
self._listener = listener
324-
self._unique_keys_tracker = unique_keys_tracker
325-
self._imp_counter = imp_counter
326369

327370
async def record_treatment_stats(self, impressions, latency, operation, method_name):
328371
"""
@@ -340,6 +383,7 @@ async def record_treatment_stats(self, impressions, latency, operation, method_n
340383
rnumber = random.uniform(0, 1)
341384
if self._data_sampling < rnumber:
342385
return
386+
343387
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions)
344388
if impressions:
345389
pipe = self._make_pipe()
@@ -353,9 +397,10 @@ async def record_treatment_stats(self, impressions, latency, operation, method_n
353397
await self._send_impressions_to_listener_async(for_listener)
354398

355399
if len(for_counter) > 0:
356-
await self._imp_counter.track(for_counter)
400+
self._imp_counter.track(for_counter)
357401
if len(for_unique_keys_tracker) > 0:
358-
[await self._unique_keys_tracker.track(item[0], item[1]) for item in for_unique_keys_tracker]
402+
unique_keys_coros = [self._unique_keys_tracker.track(item[0], item[1]) for item in for_unique_keys_tracker]
403+
await asyncio.gather(*unique_keys_coros)
359404
except Exception: # pylint: disable=broad-except
360405
_LOGGER.error('Error recording impressions')
361406
_LOGGER.debug('Error: ', exc_info=True)

0 commit comments

Comments
 (0)