Skip to content

Commit deb2cf5

Browse files
authored
Merge pull request #271 from splitio/redis-none
Redis none
2 parents 4b5281f + 17097c9 commit deb2cf5

File tree

7 files changed

+304
-42
lines changed

7 files changed

+304
-42
lines changed

splitio/client/factory.py

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from splitio.engine.impressions import ImpressionsMode
1616
from splitio.engine.manager import Counter as ImpressionsCounter
1717
from splitio.engine.strategies import StrategyNoneMode, StrategyDebugMode, StrategyOptimizedMode
18-
from splitio.engine.adapters import InMemorySenderAdapter
18+
from splitio.engine.adapters import InMemorySenderAdapter, RedisSenderAdapter
1919

2020
# Storage
2121
from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \
@@ -42,8 +42,8 @@
4242

4343
# Synchronizer
4444
from splitio.sync.synchronizer import SplitTasks, SplitSynchronizers, Synchronizer, \
45-
LocalhostSynchronizer
46-
from splitio.sync.manager import Manager
45+
LocalhostSynchronizer, RedisSynchronizer
46+
from splitio.sync.manager import Manager, RedisManager
4747
from splitio.sync.split import SplitSynchronizer, LocalSplitSynchronizer
4848
from splitio.sync.segment import SegmentSynchronizer
4949
from splitio.sync.impression import ImpressionSynchronizer, ImpressionsCountSynchronizer
@@ -215,6 +215,7 @@ def destroy(self, destroyed_event=None):
215215
return
216216

217217
try:
218+
_LOGGER.info('Factory destroy called, stopping tasks.')
218219
if self._sync_manager is not None:
219220
if destroyed_event is not None:
220221

@@ -329,17 +330,24 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
329330
clear_filter_sync = None
330331
unique_keys_task = None
331332
clear_filter_task = None
333+
impressions_count_sync = None
334+
impressions_count_task = None
335+
332336
if cfg['impressionsMode'] == ImpressionsMode.NONE:
333337
imp_strategy = StrategyNoneMode(imp_counter)
334338
clear_filter_sync = ClearFilterSynchronizer(imp_strategy.get_unique_keys_tracker())
335339
unique_keys_synchronizer = UniqueKeysSynchronizer(InMemorySenderAdapter(apis['telemetry']), imp_strategy.get_unique_keys_tracker())
336340
unique_keys_task = UniqueKeysSyncTask(unique_keys_synchronizer.send_all)
337341
clear_filter_task = ClearFilterSyncTask(clear_filter_sync.clear_all)
338342
imp_strategy.get_unique_keys_tracker().set_queue_full_hook(unique_keys_task.flush)
343+
impressions_count_sync = ImpressionsCountSynchronizer(apis['impressions'], imp_counter)
344+
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
339345
elif cfg['impressionsMode'] == ImpressionsMode.DEBUG:
340346
imp_strategy = StrategyDebugMode()
341347
else:
342348
imp_strategy = StrategyOptimizedMode(imp_counter)
349+
impressions_count_sync = ImpressionsCountSynchronizer(apis['impressions'], imp_counter)
350+
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
343351

344352
imp_manager = ImpressionsManager(
345353
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
@@ -351,7 +359,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
351359
ImpressionSynchronizer(apis['impressions'], storages['impressions'],
352360
cfg['impressionsBulkSize']),
353361
EventSynchronizer(apis['events'], storages['events'], cfg['eventsBulkSize']),
354-
ImpressionsCountSynchronizer(apis['impressions'], imp_counter),
362+
impressions_count_sync,
355363
unique_keys_synchronizer,
356364
clear_filter_sync
357365
)
@@ -370,7 +378,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
370378
cfg['impressionsRefreshRate'],
371379
),
372380
EventsSyncTask(synchronizers.events_sync.synchronize_events, cfg['eventsPushRate']),
373-
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters),
381+
impressions_count_task,
374382
unique_keys_task,
375383
clear_filter_task
376384
)
@@ -424,23 +432,68 @@ def _build_redis_factory(api_key, cfg):
424432
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED)
425433
data_sampling = _MIN_DEFAULT_DATA_SAMPLING_ALLOWED
426434

427-
imp_strategy = StrategyDebugMode() if cfg['impressionsMode'] == ImpressionsMode.DEBUG else StrategyOptimizedMode(ImpressionsCounter())
435+
unique_keys_synchronizer = None
436+
clear_filter_sync = None
437+
unique_keys_task = None
438+
clear_filter_task = None
439+
impressions_count_sync = None
440+
impressions_count_task = None
441+
redis_sender_adapter = RedisSenderAdapter(redis_adapter)
442+
443+
if cfg['impressionsMode'] == ImpressionsMode.NONE:
444+
imp_counter = ImpressionsCounter()
445+
imp_strategy = StrategyNoneMode(imp_counter)
446+
clear_filter_sync = ClearFilterSynchronizer(imp_strategy.get_unique_keys_tracker())
447+
unique_keys_synchronizer = UniqueKeysSynchronizer(redis_sender_adapter, imp_strategy.get_unique_keys_tracker())
448+
unique_keys_task = UniqueKeysSyncTask(unique_keys_synchronizer.send_all)
449+
clear_filter_task = ClearFilterSyncTask(clear_filter_sync.clear_all)
450+
imp_strategy.get_unique_keys_tracker().set_queue_full_hook(unique_keys_task.flush)
451+
impressions_count_sync = ImpressionsCountSynchronizer(redis_sender_adapter, imp_counter)
452+
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
453+
elif cfg['impressionsMode'] == ImpressionsMode.DEBUG:
454+
imp_strategy = StrategyDebugMode()
455+
else:
456+
imp_counter = ImpressionsCounter()
457+
imp_strategy = StrategyOptimizedMode(imp_counter)
458+
impressions_count_sync = ImpressionsCountSynchronizer(redis_sender_adapter, imp_counter)
459+
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
460+
428461
imp_manager = ImpressionsManager(
429462
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
430463
imp_strategy)
431464

465+
synchronizers = SplitSynchronizers(None, None, None, None,
466+
impressions_count_sync,
467+
unique_keys_synchronizer,
468+
clear_filter_sync
469+
)
470+
471+
tasks = SplitTasks(None, None, None, None,
472+
impressions_count_task,
473+
unique_keys_task,
474+
clear_filter_task
475+
)
476+
477+
synchronizer = RedisSynchronizer(synchronizers, tasks)
432478
recorder = PipelinedRecorder(
433479
redis_adapter.pipeline,
434480
imp_manager,
435481
storages['events'],
436482
storages['impressions'],
437483
data_sampling,
438484
)
485+
486+
manager = RedisManager(synchronizer)
487+
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer")
488+
initialization_thread.setDaemon(True)
489+
initialization_thread.start()
490+
439491
return SplitFactory(
440492
api_key,
441493
storages,
442494
cfg['labelsEnabled'],
443495
recorder,
496+
manager,
444497
)
445498

446499

splitio/engine/adapters.py

Lines changed: 103 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import abc
2+
import logging
23
import json
34

5+
from splitio.storage.adapters.redis import RedisAdapterException
6+
7+
_LOGGER = logging.getLogger(__name__)
48

59
class ImpressionsSenderAdapter(object, metaclass=abc.ABCMeta):
610
"""Impressions Sender Adapter interface."""
@@ -32,18 +36,111 @@ def record_unique_keys(self, uniques):
3236
:param uniques: unique keys disctionary
3337
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
3438
"""
35-
self._telemtry_http_client.record_unique_keys(self._uniques_formatter(uniques))
39+
self._telemtry_http_client.record_unique_keys({'keys': self._uniques_formatter(uniques)})
40+
41+
def _uniques_formatter(self, uniques):
42+
"""
43+
Format the unique keys dictionary array to a JSON body
44+
45+
:param uniques: unique keys disctionary
46+
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
47+
48+
:return: unique keys JSON array
49+
:rtype: json
50+
"""
51+
return [{'f': feature, 'ks': list(keys)} for feature, keys in uniques.items()]
52+
53+
class RedisSenderAdapter(ImpressionsSenderAdapter):
54+
"""In Memory Impressions Sender Adapter class."""
55+
56+
MTK_QUEUE_KEY = 'SPLITIO.uniquekeys'
57+
MTK_KEY_DEFAULT_TTL = 3600
58+
IMP_COUNT_QUEUE_KEY = 'SPLITIO.impressions.count'
59+
IMP_COUNT_KEY_DEFAULT_TTL = 3600
60+
61+
def __init__(self, redis_client):
62+
"""
63+
Initialize In memory sender adapter instance
64+
65+
:param telemtry_http_client: instance of telemetry http api
66+
:type telemtry_http_client: splitio.api.telemetry.TelemetryAPI
67+
"""
68+
self._redis_client = redis_client
69+
70+
def record_unique_keys(self, uniques):
71+
"""
72+
post the unique keys to redis.
73+
74+
:param uniques: unique keys disctionary
75+
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
76+
"""
77+
bulk_mtks = self._uniques_formatter(uniques)
78+
try:
79+
inserted = self._redis_client.rpush(self.MTK_QUEUE_KEY, *bulk_mtks)
80+
self._expire_keys(self.MTK_QUEUE_KEY, self.MTK_KEY_DEFAULT_TTL, inserted, len(bulk_mtks))
81+
return True
82+
except RedisAdapterException:
83+
_LOGGER.error('Something went wrong when trying to add mtks to redis')
84+
_LOGGER.error('Error: ', exc_info=True)
85+
return False
86+
87+
def flush_counters(self, to_send):
88+
"""
89+
post the impression counters to redis.
90+
91+
:param uniques: unique keys disctionary
92+
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
93+
"""
94+
bulk_counts = self._build_counters(to_send)
95+
try:
96+
inserted = self._redis_client.rpush(self.IMP_COUNT_QUEUE_KEY, *bulk_counts)
97+
self._expire_keys(self.IMP_COUNT_QUEUE_KEY, self.IMP_COUNT_KEY_DEFAULT_TTL, inserted, len(bulk_counts))
98+
return True
99+
except RedisAdapterException:
100+
_LOGGER.error('Something went wrong when trying to add counters to redis')
101+
_LOGGER.error('Error: ', exc_info=True)
102+
return False
103+
104+
def _expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
105+
"""
106+
Set expire
107+
108+
:param total_keys: length of keys.
109+
:type total_keys: int
110+
:param inserted: added keys.
111+
:type inserted: int
112+
"""
113+
if total_keys == inserted:
114+
self._redis_client.expire(queue_key, key_default_ttl)
36115

37116
def _uniques_formatter(self, uniques):
38117
"""
39-
Format the unique keys dictionary to a JSON body
118+
Format the unique keys dictionary array to a JSON body
40119
41120
:param uniques: unique keys disctionary
42121
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
43122
44-
:return: unique keys JSON
123+
:return: unique keys JSON array
45124
:rtype: json
46125
"""
47-
return {
48-
'keys': [{'f': feature, 'ks': list(keys)} for feature, keys in uniques.items()]
49-
}
126+
return [json.dumps({'f': feature, 'ks': list(keys)}) for feature, keys in uniques.items()]
127+
128+
def _build_counters(self, counters):
129+
"""
130+
Build an impression bulk formatted as the API expects it.
131+
132+
:param counters: List of impression counters per feature.
133+
:type counters: list[splitio.engine.impressions.Counter.CountPerFeature]
134+
135+
:return: dict with list of impression count dtos
136+
:rtype: dict
137+
"""
138+
return [json.dumps({
139+
'pf': [
140+
{
141+
'f': pf_count.feature,
142+
'm': pf_count.timeframe,
143+
'rc': pf_count.count
144+
} for pf_count in counters
145+
]
146+
})]

splitio/engine/unique_keys_tracker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def track(self, key, feature_name):
5959
'Unique Keys queue is full, flushing the current queue now.'
6060
)
6161
if self._queue_full_hook is not None and callable(self._queue_full_hook):
62+
_LOGGER.info('Calling hook.')
6263
self._queue_full_hook()
6364
return True
6465

splitio/sync/manager.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Synchronization manager module."""
22
import logging
33
import time
4+
import threading
45
from threading import Thread
56
from queue import Queue
67
from splitio.push.manager import PushManager, Status
@@ -127,3 +128,44 @@ def _streaming_feedback_handler(self):
127128
self._synchronizer.start_periodic_fetching()
128129
_LOGGER.info('non-recoverable error in streaming. switching to polling.')
129130
return
131+
132+
class RedisManager(object): # pylint:disable=too-many-instance-attributes
133+
"""Manager Class."""
134+
135+
def __init__(self, synchronizer): # pylint:disable=too-many-arguments
136+
"""
137+
Construct Manager.
138+
139+
:param unique_keys_task: unique keys task instance
140+
:type unique_keys_task: splitio.tasks.unique_keys_sync.UniqueKeysSyncTask
141+
142+
:param clear_filter_task: clear filter task instance
143+
:type clear_filter_task: splitio.tasks.clear_filter_task.ClearFilterSynchronizer
144+
145+
"""
146+
self._ready_flag = True
147+
self._synchronizer = synchronizer
148+
149+
def recreate(self):
150+
"""Not implemented"""
151+
return
152+
153+
def start(self):
154+
"""Start the SDK synchronization tasks."""
155+
try:
156+
self._synchronizer.start_periodic_data_recording()
157+
158+
except (APIException, RuntimeError):
159+
_LOGGER.error('Exception raised starting Split Manager')
160+
_LOGGER.debug('Exception information: ', exc_info=True)
161+
raise
162+
163+
def stop(self, blocking):
164+
"""
165+
Stop manager logic.
166+
167+
:param blocking: flag to wait until tasks are stopped
168+
:type blocking: bool
169+
"""
170+
_LOGGER.info('Stopping manager tasks')
171+
self._synchronizer.shutdown(blocking)

0 commit comments

Comments
 (0)