Skip to content

Commit 9824a52

Browse files
authored
Merge pull request #323 from splitio/factory-integration
Factory integration for pluggable storage and tests
2 parents 1763fc2 + 3419e0e commit 9824a52

File tree

14 files changed

+332
-105
lines changed

14 files changed

+332
-105
lines changed

splitio/client/config.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@
5656
'localhostRefreshEnabled': False,
5757
'preforkedInitialization': False,
5858
'dataSampling': DEFAULT_DATA_SAMPLING,
59+
'storageWrapper': None,
60+
'storagePrefix': None,
61+
'storageType': None
5962
}
6063

6164

@@ -70,15 +73,25 @@ def _parse_operation_mode(apikey, config):
7073
:rtype: str
7174
"""
7275
if apikey == 'localhost':
76+
_LOGGER.debug('Using Localhost operation mode')
7377
return 'localhost-standalone'
7478

7579
if 'redisHost' in config or 'redisSentinels' in config:
80+
_LOGGER.debug('Using Redis storage operation mode')
7681
return 'redis-consumer'
7782

83+
if 'storageType' in config:
84+
if config.get('storageType').lower() == 'pluggable':
85+
_LOGGER.debug('Using Pluggable storage operation mode')
86+
return 'pluggable'
87+
_LOGGER.warning('You passed an invalid storageType, acceptable value is '
88+
'`pluggable`. Defaulting storage to In-Memory mode.')
89+
90+
_LOGGER.debug('Using In-Memory operation mode')
7891
return 'inmemory-standalone'
7992

8093

81-
def _sanitize_impressions_mode(mode, refresh_rate=None):
94+
def _sanitize_impressions_mode(operation_mode, mode, refresh_rate=None):
8295
"""
8396
Check supplied impressions mode and adjust refresh rate.
8497
@@ -92,10 +105,14 @@ def _sanitize_impressions_mode(mode, refresh_rate=None):
92105
try:
93106
mode = ImpressionsMode(mode.upper())
94107
except (ValueError, AttributeError):
95-
_LOGGER.warning('You passed an invalid impressionsMode, impressionsMode should be '
96-
'one of the following values: `debug`, `none` or `optimized`. '
97-
'Defaulting to `optimized` mode.')
98108
mode = ImpressionsMode.OPTIMIZED
109+
_LOGGER.warning('You passed an invalid impressionsMode, impressionsMode should be ' \
110+
'one of the following values: `debug`, `none` or `optimized`. '
111+
' Defaulting to `optimized` mode.')
112+
113+
if operation_mode == 'pluggable' and mode != ImpressionsMode.DEBUG:
114+
mode = ImpressionsMode.DEBUG
115+
_LOGGER.warning('`pluggable` storageMode only support `debug` impressionMode, adjusting impressionsMode to `debug`. ')
99116

100117
if mode == ImpressionsMode.DEBUG:
101118
refresh_rate = max(1, refresh_rate) if refresh_rate is not None else 60
@@ -121,7 +138,7 @@ def sanitize(apikey, config):
121138
config['operationMode'] = _parse_operation_mode(apikey, config)
122139
processed = DEFAULT_CONFIG.copy()
123140
processed.update(config)
124-
imp_mode, imp_rate = _sanitize_impressions_mode(config.get('impressionsMode'),
141+
imp_mode, imp_rate = _sanitize_impressions_mode(config['operationMode'], config.get('impressionsMode'),
125142
config.get('impressionsRefreshRate'))
126143
processed['impressionsMode'] = imp_mode
127144
processed['impressionsRefreshRate'] = imp_rate

splitio/client/factory.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
from splitio.storage.adapters import redis
2525
from splitio.storage.redis import RedisSplitStorage, RedisSegmentStorage, RedisImpressionsStorage, \
2626
RedisEventsStorage, RedisTelemetryStorage
27+
from splitio.storage.pluggable import PluggableEventsStorage, PluggableImpressionsStorage, PluggableSegmentStorage, \
28+
PluggableSplitStorage, PluggableTelemetryStorage
2729

2830
# APIs
2931
from splitio.api.client import HttpClient
@@ -45,7 +47,7 @@
4547

4648
# Synchronizer
4749
from splitio.sync.synchronizer import SplitTasks, SplitSynchronizers, Synchronizer, \
48-
LocalhostSynchronizer, RedisSynchronizer
50+
LocalhostSynchronizer, RedisSynchronizer, PluggableSynchronizer
4951
from splitio.sync.manager import Manager, RedisManager
5052
from splitio.sync.split import SplitSynchronizer, LocalSplitSynchronizer, LocalhostMode
5153
from splitio.sync.segment import SegmentSynchronizer, LocalSegmentSynchronizer
@@ -512,6 +514,69 @@ def _build_redis_factory(api_key, cfg):
512514
return split_factory
513515

514516

517+
def _build_pluggable_factory(api_key, cfg):
518+
"""Build and return a split factory with pluggable storage."""
519+
sdk_metadata = util.get_metadata(cfg)
520+
if not input_validator.validate_pluggable_adapter(cfg):
521+
raise Exception("Pluggable Adapter validation failed, exiting")
522+
523+
pluggable_adapter = cfg.get('storageWrapper')
524+
storage_prefix = cfg.get('storagePrefix')
525+
storages = {
526+
'splits': PluggableSplitStorage(pluggable_adapter, storage_prefix),
527+
'segments': PluggableSegmentStorage(pluggable_adapter, storage_prefix),
528+
'impressions': PluggableImpressionsStorage(pluggable_adapter, sdk_metadata, storage_prefix),
529+
'events': PluggableEventsStorage(pluggable_adapter, sdk_metadata, storage_prefix),
530+
'telemetry': PluggableTelemetryStorage(pluggable_adapter, sdk_metadata, storage_prefix)
531+
}
532+
telemetry_producer = TelemetryStorageProducer(storages['telemetry'])
533+
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
534+
telemetry_init_producer = telemetry_producer.get_telemetry_init_producer()
535+
# Using same class as redis
536+
telemetry_submitter = RedisTelemetrySubmitter(storages['telemetry'])
537+
538+
unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \
539+
clear_filter_task, impressions_count_sync, impressions_count_task, \
540+
imp_strategy = set_classes('PLUGGABLE', cfg['impressionsMode'], pluggable_adapter)
541+
542+
imp_manager = ImpressionsManager(
543+
imp_strategy,
544+
telemetry_runtime_producer,
545+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
546+
)
547+
548+
synchronizer = PluggableSynchronizer()
549+
recorder = StandardRecorder(
550+
imp_manager,
551+
storages['events'],
552+
storages['impressions'],
553+
storages['telemetry']
554+
)
555+
556+
# Using same class as redis for consumer mode only
557+
manager = RedisManager(synchronizer)
558+
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer", daemon=True)
559+
initialization_thread.start()
560+
561+
telemetry_init_producer.record_config(cfg, {})
562+
563+
split_factory = SplitFactory(
564+
api_key,
565+
storages,
566+
cfg['labelsEnabled'],
567+
recorder,
568+
manager,
569+
sdk_ready_flag=None,
570+
telemetry_producer=telemetry_producer,
571+
telemetry_init_producer=telemetry_init_producer
572+
)
573+
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
574+
storages['telemetry'].record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
575+
telemetry_submitter.synchronize_config()
576+
577+
return split_factory
578+
579+
515580
def _build_localhost_factory(cfg):
516581
"""Build and return a localhost factory for testing/development purposes."""
517582
telemetry_storage = LocalhostTelemetryStorage()
@@ -610,6 +675,8 @@ def get_factory(api_key, **kwargs):
610675
split_factory = _build_localhost_factory(config)
611676
elif config['operationMode'] == 'redis-consumer':
612677
split_factory = _build_redis_factory(api_key, config)
678+
elif config['operationMode'] == 'pluggable':
679+
split_factory = _build_pluggable_factory(api_key, config)
613680
else:
614681
split_factory = _build_in_memory_factory(
615682
api_key,

splitio/client/input_validator.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -529,21 +529,21 @@ def validate_pluggable_adapter(config):
529529
:return: True if no issue found otherwise False
530530
:rtype: bool
531531
"""
532-
if config.get('storageType') != 'PLUGGABLE':
532+
if config.get('storageType') != 'pluggable':
533533
return True
534534

535535
if config.get('storageWrapper') is None:
536-
_LOGGER.error("Expecting custom storage `wrapper` in options, but no valid wrapper instance was provided.")
536+
_LOGGER.error("Expecting pluggable storage `wrapper` in options, but no valid wrapper instance was provided.")
537537
return False
538538

539539
if config.get('storagePrefix') is not None:
540540
if not isinstance(config.get('storagePrefix'), str):
541-
_LOGGER.error("Custom storage prefix should be string type only")
541+
_LOGGER.error("Pluggable storage prefix should be string type only")
542542
return False
543543

544544
pluggable_adapter = config.get('storageWrapper')
545545
if not isinstance(pluggable_adapter, object):
546-
_LOGGER.error("Custom storage instance is not inherted from object class")
546+
_LOGGER.error("Pluggable storage instance is not inherted from object class")
547547
return False
548548

549549
expected_methods = {'get': 1, 'get_items': 1, 'get_many': 1, 'set': 2, 'push_items': 2,

splitio/engine/impressions/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@ def set_classes(storage_mode, impressions_mode, api_adapter):
1515
impressions_count_sync = None
1616
impressions_count_task = None
1717
sender_adapter = None
18-
if storage_mode == 'REDIS':
18+
if storage_mode == 'PLUGGABLE':
19+
api_telemetry_adapter = sender_adapter
20+
api_impressions_adapter = sender_adapter
21+
elif storage_mode == 'REDIS':
1922
sender_adapter = RedisSenderAdapter(api_adapter)
2023
api_telemetry_adapter = sender_adapter
2124
api_impressions_adapter = sender_adapter

splitio/models/telemetry.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,13 @@ class StorageType(Enum):
124124
MEMORY = 'memory'
125125
REDIS = 'redis'
126126
LOCALHOST = 'localhost'
127+
PLUGGABLE = 'pluggable'
127128

128129
class OperationMode(Enum):
129130
"""Storage modes constants"""
130131
MEMORY = 'inmemory'
131132
REDIS = 'redis-consumer'
133+
PLUGGABLE = 'pluggable'
132134

133135
def get_latency_bucket_index(micros):
134136
"""
@@ -874,7 +876,7 @@ def _get_storage_type(self, op_mode):
874876
elif StorageType.REDIS.value in op_mode:
875877
return StorageType.REDIS.value
876878
else:
877-
return StorageType.LOCALHOST.value
879+
return StorageType.PLUGGABLE.value
878880

879881
def _get_refresh_rates(self, config):
880882
"""

splitio/storage/pluggable.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -771,7 +771,7 @@ def record_active_and_redundant_factories(self, active_factory_count, redundant_
771771
"""
772772
self._tel_config.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
773773

774-
def record_latency(self, method, latency):
774+
def record_latency(self, method, bucket):
775775
"""
776776
record latency data
777777
@@ -780,7 +780,6 @@ def record_latency(self, method, latency):
780780
:param latency: latency
781781
:type latency: int64
782782
"""
783-
bucket = get_latency_bucket_index(latency)
784783
latency_key = self._telemetry_latencies_key + '::' + self._sdk_metadata + '/' + method.value + '/' + str(bucket)
785784
result = self._pluggable_adapter.increment(latency_key, 1)
786785
self.expire_keys(latency_key, self._TELEMETRY_KEY_DEFAULT_TTL, 1, result)

splitio/storage/redis.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ def record_active_and_redundant_factories(self, active_factory_count, redundant_
649649
"""Record active and redundant factories."""
650650
self._tel_config.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
651651

652-
def add_latency_to_pipe(self, method, latency, pipe):
652+
def add_latency_to_pipe(self, method, bucket, pipe):
653653
"""
654654
record latency data
655655
@@ -660,7 +660,6 @@ def add_latency_to_pipe(self, method, latency, pipe):
660660
:param pipe: Redis pipe.
661661
:type pipe: redis.pipe
662662
"""
663-
bucket = get_latency_bucket_index(latency)
664663
pipe.hincrby(self._TELEMETRY_LATENCIES_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
665664
method.value + '/' + str(bucket), 1)
666665

splitio/sync/synchronizer.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,3 +589,69 @@ def shutdown(self, blocking):
589589
:type blocking: bool
590590
"""
591591
self.stop_periodic_fetching()
592+
593+
594+
class PluggableSynchronizer(BaseSynchronizer):
595+
"""Plugable Synchronizer."""
596+
597+
def synchronize_segment(self, segment_name, till):
598+
"""
599+
Synchronize particular segment.
600+
601+
:param segment_name: segment associated
602+
:type segment_name: str
603+
:param till: to fetch
604+
:type till: int
605+
"""
606+
pass
607+
608+
def synchronize_splits(self, till):
609+
"""
610+
Synchronize all splits.
611+
612+
:param till: to fetch
613+
:type till: int
614+
"""
615+
pass
616+
617+
def sync_all(self):
618+
"""Synchronize all split data."""
619+
pass
620+
621+
def start_periodic_fetching(self):
622+
"""Start fetchers for splits and segments."""
623+
pass
624+
625+
def stop_periodic_fetching(self):
626+
"""Stop fetchers for splits and segments."""
627+
pass
628+
629+
def start_periodic_data_recording(self):
630+
"""Start recorders."""
631+
pass
632+
633+
def stop_periodic_data_recording(self, blocking):
634+
"""Stop recorders."""
635+
pass
636+
637+
def kill_split(self, split_name, default_treatment, change_number):
638+
"""
639+
Kill a split locally.
640+
641+
:param split_name: name of the split to perform kill
642+
:type split_name: str
643+
:param default_treatment: name of the default treatment to return
644+
:type default_treatment: str
645+
:param change_number: change_number
646+
:type change_number: int
647+
"""
648+
pass
649+
650+
def shutdown(self, blocking):
651+
"""
652+
Stop tasks.
653+
654+
:param blocking:flag to wait until tasks are stopped
655+
:type blocking: bool
656+
"""
657+
pass

tests/client/test_config.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,34 +13,48 @@ def test_parse_operation_mode(self):
1313
assert config._parse_operation_mode('some', {}) == 'inmemory-standalone'
1414
assert config._parse_operation_mode('localhost', {}) == 'localhost-standalone'
1515
assert config._parse_operation_mode('some', {'redisHost': 'x'}) == 'redis-consumer'
16+
assert config._parse_operation_mode('some', {'storageType': 'pluggable'}) == 'pluggable'
17+
assert config._parse_operation_mode('some', {'storageType': 'custom2'}) == 'inmemory-standalone'
1618

1719
def test_sanitize_imp_mode(self):
1820
"""Test sanitization of impressions mode."""
19-
mode, rate = config._sanitize_impressions_mode('OPTIMIZED', 1)
21+
mode, rate = config._sanitize_impressions_mode('inmemory-standalone', 'OPTIMIZED', 1)
2022
assert mode == ImpressionsMode.OPTIMIZED
2123
assert rate == 60
2224

23-
mode, rate = config._sanitize_impressions_mode('DEBUG', 1)
25+
mode, rate = config._sanitize_impressions_mode('inmemory-standalone', 'DEBUG', 1)
2426
assert mode == ImpressionsMode.DEBUG
2527
assert rate == 1
2628

27-
mode, rate = config._sanitize_impressions_mode('debug', 1)
29+
mode, rate = config._sanitize_impressions_mode('inmemory-standalone', 'debug', 1)
2830
assert mode == ImpressionsMode.DEBUG
2931
assert rate == 1
3032

31-
mode, rate = config._sanitize_impressions_mode('ANYTHING', 200)
33+
mode, rate = config._sanitize_impressions_mode('inmemory-standalone', 'ANYTHING', 200)
3234
assert mode == ImpressionsMode.OPTIMIZED
3335
assert rate == 200
3436

35-
mode, rate = config._sanitize_impressions_mode(43, -1)
37+
mode, rate = config._sanitize_impressions_mode('pluggable', 'ANYTHING', 200)
38+
assert mode == ImpressionsMode.DEBUG
39+
assert rate == 200
40+
41+
mode, rate = config._sanitize_impressions_mode('pluggable', 'NONE', 200)
42+
assert mode == ImpressionsMode.DEBUG
43+
assert rate == 200
44+
45+
mode, rate = config._sanitize_impressions_mode('pluggable', 'OPTIMIZED', 200)
46+
assert mode == ImpressionsMode.DEBUG
47+
assert rate == 200
48+
49+
mode, rate = config._sanitize_impressions_mode('inmemory-standalone', 43, -1)
3650
assert mode == ImpressionsMode.OPTIMIZED
3751
assert rate == 60
3852

39-
mode, rate = config._sanitize_impressions_mode('OPTIMIZED')
53+
mode, rate = config._sanitize_impressions_mode('inmemory-standalone', 'OPTIMIZED')
4054
assert mode == ImpressionsMode.OPTIMIZED
4155
assert rate == 300
4256

43-
mode, rate = config._sanitize_impressions_mode('DEBUG')
57+
mode, rate = config._sanitize_impressions_mode('inmemory-standalone', 'DEBUG')
4458
assert mode == ImpressionsMode.DEBUG
4559
assert rate == 60
4660

0 commit comments

Comments
 (0)