Skip to content

Commit 2818dc6

Browse files
committed
Implemented impression count for redis and refactoring of synchronizer class for redis.
1 parent 9564eab commit 2818dc6

File tree

5 files changed

+164
-26
lines changed

5 files changed

+164
-26
lines changed

splitio/client/factory.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242

4343
# Synchronizer
4444
from splitio.sync.synchronizer import SplitTasks, SplitSynchronizers, Synchronizer, \
45-
LocalhostSynchronizer
45+
LocalhostSynchronizer, RedisSynchronizer
4646
from splitio.sync.manager import Manager, RedisManager
4747
from splitio.sync.split import SplitSynchronizer, LocalSplitSynchronizer
4848
from splitio.sync.segment import SegmentSynchronizer
@@ -430,10 +430,11 @@ def _build_redis_factory(api_key, cfg):
430430
clear_filter_sync = None
431431
unique_keys_task = None
432432
clear_filter_task = None
433+
redis_sender_adapter = RedisSenderAdapter(redis_adapter)
433434
if cfg['impressionsMode'] == ImpressionsMode.NONE:
434435
imp_strategy = StrategyNoneMode(imp_counter)
435436
clear_filter_sync = ClearFilterSynchronizer(imp_strategy.get_unique_keys_tracker())
436-
unique_keys_synchronizer = UniqueKeysSynchronizer(RedisSenderAdapter(redis_adapter), imp_strategy.get_unique_keys_tracker())
437+
unique_keys_synchronizer = UniqueKeysSynchronizer(redis_sender_adapter, imp_strategy.get_unique_keys_tracker())
437438
unique_keys_task = UniqueKeysSyncTask(unique_keys_synchronizer.send_all)
438439
clear_filter_task = ClearFilterSyncTask(clear_filter_sync.clear_all)
439440
imp_strategy.get_unique_keys_tracker().set_queue_full_hook(unique_keys_task.flush)
@@ -446,14 +447,28 @@ def _build_redis_factory(api_key, cfg):
446447
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
447448
imp_strategy)
448449

450+
synchronizers = SplitSynchronizers(None, None, None, None,
451+
ImpressionsCountSynchronizer(redis_sender_adapter, imp_counter),
452+
unique_keys_synchronizer,
453+
clear_filter_sync
454+
)
455+
456+
tasks = SplitTasks(None, None, None, None,
457+
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters),
458+
unique_keys_task,
459+
clear_filter_task
460+
)
461+
462+
synchronizer = RedisSynchronizer(synchronizers, tasks)
449463
recorder = PipelinedRecorder(
450464
redis_adapter.pipeline,
451465
imp_manager,
452466
storages['events'],
453467
storages['impressions'],
454468
data_sampling,
455469
)
456-
manager = RedisManager(unique_keys_task, clear_filter_task)
470+
471+
manager = RedisManager(synchronizer)
457472
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer")
458473
initialization_thread.setDaemon(True)
459474
initialization_thread.start()

splitio/engine/adapters.py

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ class RedisSenderAdapter(ImpressionsSenderAdapter):
5555

5656
MTK_QUEUE_KEY = 'SPLITIO.uniquekeys'
5757
MTK_KEY_DEFAULT_TTL = 3600
58+
IMP_COUNT_QUEUE_KEY = 'SPLITIO.impressions.count'
59+
IMP_COUNT_KEY_DEFAULT_TTL = 3600
5860

5961
def __init__(self, redis_client):
6062
"""
@@ -67,21 +69,50 @@ def __init__(self, redis_client):
6769

6870
def record_unique_keys(self, uniques):
6971
"""
70-
post the unique keys to split back end.
72+
post the unique keys to redis.
7173
7274
:param uniques: unique keys disctionary
7375
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
7476
"""
7577
bulk_mtks = self._uniques_formatter(uniques)
7678
try:
77-
self._redis_client.rpush(self.MTK_QUEUE_KEY, *bulk_mtks)
78-
self._redis_client.expire(self.MTK_QUEUE_KEY, self.MTK_KEY_DEFAULT_TTL)
79+
inserted = self._redis_client.rpush(self.MTK_QUEUE_KEY, *bulk_mtks)
80+
self._expire_keys(self.MTK_QUEUE_KEY, self.MTK_KEY_DEFAULT_TTL, inserted, len(bulk_mtks))
7981
return True
8082
except RedisAdapterException:
8183
_LOGGER.error('Something went wrong when trying to add mtks to redis')
8284
_LOGGER.error('Error: ', exc_info=True)
8385
return False
8486

87+
def flush_counters(self, to_send):
88+
"""
89+
post the impression counters to redis.
90+
91+
:param uniques: unique keys disctionary
92+
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
93+
"""
94+
bulk_counts = self._build_counters(to_send)
95+
try:
96+
inserted = self._redis_client.rpush(self.IMP_COUNT_QUEUE_KEY, *bulk_counts)
97+
self._expire_keys(self.IMP_COUNT_QUEUE_KEY, self.IMP_COUNT_KEY_DEFAULT_TTL, inserted, len(bulk_counts))
98+
return True
99+
except RedisAdapterException:
100+
_LOGGER.error('Something went wrong when trying to add counters to redis')
101+
_LOGGER.error('Error: ', exc_info=True)
102+
return False
103+
104+
def _expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
105+
"""
106+
Set expire
107+
108+
:param total_keys: length of keys.
109+
:type total_keys: int
110+
:param inserted: added keys.
111+
:type inserted: int
112+
"""
113+
if total_keys == inserted:
114+
self._redis_client.expire(queue_key, key_default_ttl)
115+
85116
def _uniques_formatter(self, uniques):
86117
"""
87118
Format the unique keys dictionary array to a JSON body
@@ -93,3 +124,23 @@ def _uniques_formatter(self, uniques):
93124
:rtype: json
94125
"""
95126
return [json.dumps({'f': feature, 'ks': list(keys)}) for feature, keys in uniques.items()]
127+
128+
def _build_counters(self, counters):
129+
"""
130+
Build an impression bulk formatted as the API expects it.
131+
132+
:param counters: List of impression counters per feature.
133+
:type counters: list[splitio.engine.impressions.Counter.CountPerFeature]
134+
135+
:return: dict with list of impression count dtos
136+
:rtype: dict
137+
"""
138+
return [json.dumps({
139+
'pf': [
140+
{
141+
'f': pf_count.feature,
142+
'm': pf_count.timeframe,
143+
'rc': pf_count.count
144+
} for pf_count in counters
145+
]
146+
})]

splitio/sync/manager.py

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ class RedisManager(object): # pylint:disable=too-many-instance-attributes
134134

135135
_CENTINEL_EVENT = object()
136136

137-
def __init__(self, unique_keys_task, clear_filter_task): # pylint:disable=too-many-arguments
137+
def __init__(self, synchronizer): # pylint:disable=too-many-arguments
138138
"""
139139
Construct Manager.
140140
@@ -145,9 +145,8 @@ def __init__(self, unique_keys_task, clear_filter_task): # pylint:disable=too-m
145145
:type clear_filter_task: splitio.tasks.clear_filter_task.ClearFilterSynchronizer
146146
147147
"""
148-
self._unique_keys_task = unique_keys_task
149-
self._clear_filter_task = clear_filter_task
150148
self._ready_flag = True
149+
self._synchronizer = synchronizer
151150

152151
def recreate(self):
153152
"""Not implemented"""
@@ -156,8 +155,7 @@ def recreate(self):
156155
def start(self):
157156
"""Start the SDK synchronization tasks."""
158157
try:
159-
self._unique_keys_task.start()
160-
self._clear_filter_task.start()
158+
self._synchronizer.start_periodic_data_recording()
161159

162160
except (APIException, RuntimeError):
163161
_LOGGER.error('Exception raised starting Split Manager')
@@ -172,16 +170,4 @@ def stop(self, blocking):
172170
:type blocking: bool
173171
"""
174172
_LOGGER.info('Stopping manager tasks')
175-
if blocking:
176-
events = []
177-
tasks = [self._unique_keys_task,
178-
self._clear_filter_task]
179-
for task in tasks:
180-
stop_event = threading.Event()
181-
task.stop(stop_event)
182-
events.append(stop_event)
183-
if all(event.wait() for event in events):
184-
_LOGGER.debug('all tasks finished successfully.')
185-
else:
186-
self._unique_keys_task.stop()
187-
self._clear_filter_task.stop()
173+
self._synchronizer.shutdown(blocking)

splitio/sync/synchronizer.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,94 @@ def kill_split(self, split_name, default_treatment, change_number):
377377
self._split_synchronizers.split_sync.kill_split(split_name, default_treatment,
378378
change_number)
379379

380+
class RedisSynchronizer(BaseSynchronizer):
381+
"""Redis Synchronizer."""
382+
383+
def __init__(self, split_synchronizers, split_tasks):
384+
"""
385+
Class constructor.
386+
387+
:param split_synchronizers: syncs for performing synchronization of segments and splits
388+
:type split_synchronizers: splitio.sync.synchronizer.SplitSynchronizers
389+
:param split_tasks: tasks for starting/stopping tasks
390+
:type split_tasks: splitio.sync.synchronizer.SplitTasks
391+
"""
392+
self._split_synchronizers = split_synchronizers
393+
self._split_tasks = split_tasks
394+
395+
def sync_all(self):
396+
"""
397+
Not implemented
398+
"""
399+
pass
400+
401+
def shutdown(self, blocking):
402+
"""
403+
Stop tasks.
404+
405+
:param blocking:flag to wait until tasks are stopped
406+
:type blocking: bool
407+
"""
408+
_LOGGER.debug('Shutting down tasks.')
409+
self.stop_periodic_data_recording(blocking)
410+
411+
def start_periodic_data_recording(self):
412+
"""Start recorders."""
413+
_LOGGER.debug('Starting periodic data recording')
414+
self._split_tasks.impressions_count_task.start()
415+
if self._split_tasks.unique_keys_task is not None:
416+
self._split_tasks.unique_keys_task.start()
417+
if self._split_tasks.clear_filter_task is not None:
418+
self._split_tasks.clear_filter_task.start()
419+
420+
def stop_periodic_data_recording(self, blocking):
421+
"""
422+
Stop recorders.
423+
424+
:param blocking: flag to wait until tasks are stopped
425+
:type blocking: bool
426+
"""
427+
_LOGGER.debug('Stopping periodic data recording')
428+
if blocking:
429+
events = []
430+
tasks = [self._split_tasks.impressions_count_task]
431+
if self._split_tasks.unique_keys_task is not None:
432+
tasks.append(self._split_tasks.unique_keys_task)
433+
if self._split_tasks.clear_filter_task is not None:
434+
tasks.append(self._split_tasks.clear_filter_task)
435+
for task in tasks:
436+
stop_event = threading.Event()
437+
task.stop(stop_event)
438+
events.append(stop_event)
439+
if all(event.wait() for event in events):
440+
_LOGGER.debug('all tasks finished successfully.')
441+
else:
442+
self._split_tasks.impressions_count_task.stop()
443+
if self._split_tasks.unique_keys_task is not None:
444+
self._split_tasks.unique_keys_task.stop()
445+
if self._split_tasks.clear_filter_task is not None:
446+
self._split_tasks.clear_filter_task.stop()
447+
448+
def kill_split(self, split_name, default_treatment, change_number):
449+
"""Kill a split locally."""
450+
raise NotImplementedError()
451+
452+
def synchronize_splits(self, till):
453+
"""Synchronize all splits."""
454+
raise NotImplementedError()
455+
456+
def synchronize_segment(self, segment_name, till):
457+
"""Synchronize particular segment."""
458+
raise NotImplementedError()
459+
460+
def start_periodic_fetching(self):
461+
"""Start fetchers for splits and segments."""
462+
raise NotImplementedError()
463+
464+
def stop_periodic_fetching(self):
465+
"""Stop fetchers for splits and segments."""
466+
raise NotImplementedError()
467+
380468
class LocalhostSynchronizer(BaseSynchronizer):
381469
"""LocalhostSynchronizer."""
382470

splitio/tasks/unique_keys_sync.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,11 @@ def __init__(self, clear_filter, period = None):
7373
def start(self):
7474
"""Start executing the unique keys synchronization task."""
7575

76-
_LOGGER.debug('Starting periodic clear filter')
7776
self._task.start()
7877

7978
def stop(self, event=None):
8079
"""Stop executing the unique keys synchronization task."""
8180

82-
_LOGGER.debug('Stopping periodic clear filter')
8381
self._task.stop(event)
8482

8583
def is_running(self):

0 commit comments

Comments
 (0)