|
24 | 24 | from splitio.storage.adapters import redis |
25 | 25 | from splitio.storage.redis import RedisSplitStorage, RedisSegmentStorage, RedisImpressionsStorage, \ |
26 | 26 | RedisEventsStorage, RedisTelemetryStorage |
| 27 | +from splitio.storage.pluggable import PluggableEventsStorage, PluggableImpressionsStorage, PluggableSegmentStorage, \ |
| 28 | + PluggableSplitStorage, PluggableTelemetryStorage |
27 | 29 |
|
28 | 30 | # APIs |
29 | 31 | from splitio.api.client import HttpClient |
|
45 | 47 |
|
46 | 48 | # Synchronizer |
47 | 49 | from splitio.sync.synchronizer import SplitTasks, SplitSynchronizers, Synchronizer, \ |
48 | | - LocalhostSynchronizer, RedisSynchronizer |
| 50 | + LocalhostSynchronizer, RedisSynchronizer, PluggableSynchronizer |
49 | 51 | from splitio.sync.manager import Manager, RedisManager |
50 | 52 | from splitio.sync.split import SplitSynchronizer, LocalSplitSynchronizer, LocalhostMode |
51 | 53 | from splitio.sync.segment import SegmentSynchronizer, LocalSegmentSynchronizer |
@@ -512,6 +514,84 @@ def _build_redis_factory(api_key, cfg): |
512 | 514 | return split_factory |
513 | 515 |
|
514 | 516 |
|
| 517 | +def _build_pluggable_factory(api_key, cfg): |
| 518 | + """Build and return a split factory with pluggable storage.""" |
| 519 | + sdk_metadata = util.get_metadata(cfg) |
| 520 | + if not input_validator.validate_pluggable_adapter(cfg): |
| 521 | + raise Exception("Pluggable Adapter validation failed, exiting") |
| 522 | + |
| 523 | + pluggable_adapter = cfg.get('storageWrapper') |
| 524 | + storage_prefix = cfg.get('storagePrefix') |
| 525 | + storages = { |
| 526 | + 'splits': PluggableSplitStorage(pluggable_adapter, storage_prefix), |
| 527 | + 'segments': PluggableSegmentStorage(pluggable_adapter, storage_prefix), |
| 528 | + 'impressions': PluggableImpressionsStorage(pluggable_adapter, sdk_metadata, storage_prefix), |
| 529 | + 'events': PluggableEventsStorage(pluggable_adapter, sdk_metadata, storage_prefix), |
| 530 | + 'telemetry': PluggableTelemetryStorage(pluggable_adapter, sdk_metadata, storage_prefix) |
| 531 | + } |
| 532 | + telemetry_producer = TelemetryStorageProducer(storages['telemetry']) |
| 533 | + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() |
| 534 | + telemetry_init_producer = telemetry_producer.get_telemetry_init_producer() |
| 535 | + # Using same class as redis |
| 536 | + telemetry_submitter = RedisTelemetrySubmitter(storages['telemetry']) |
| 537 | + |
| 538 | + unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \ |
| 539 | + clear_filter_task, impressions_count_sync, impressions_count_task, \ |
| 540 | + imp_strategy = set_classes('PLUGGABLE', cfg['impressionsMode'], pluggable_adapter, storage_prefix) |
| 541 | + |
| 542 | + imp_manager = ImpressionsManager( |
| 543 | + imp_strategy, |
| 544 | + telemetry_runtime_producer, |
| 545 | + _wrap_impression_listener(cfg['impressionListener'], sdk_metadata), |
| 546 | + ) |
| 547 | + |
| 548 | + synchronizers = SplitSynchronizers(None, None, None, None, |
| 549 | + impressions_count_sync, |
| 550 | + None, |
| 551 | + unique_keys_synchronizer, |
| 552 | + clear_filter_sync |
| 553 | + ) |
| 554 | + |
| 555 | + tasks = SplitTasks(None, None, None, None, |
| 556 | + impressions_count_task, |
| 557 | + None, |
| 558 | + unique_keys_task, |
| 559 | + clear_filter_task |
| 560 | + ) |
| 561 | + |
| 562 | + # Using same class as redis for consumer mode only |
| 563 | + synchronizer = RedisSynchronizer(synchronizers, tasks) |
| 564 | + recorder = StandardRecorder( |
| 565 | + imp_manager, |
| 566 | + storages['events'], |
| 567 | + storages['impressions'], |
| 568 | + storages['telemetry'] |
| 569 | + ) |
| 570 | + |
| 571 | + # Using same class as redis for consumer mode only |
| 572 | + manager = RedisManager(synchronizer) |
| 573 | + initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer", daemon=True) |
| 574 | + initialization_thread.start() |
| 575 | + |
| 576 | + telemetry_init_producer.record_config(cfg, {}) |
| 577 | + |
| 578 | + split_factory = SplitFactory( |
| 579 | + api_key, |
| 580 | + storages, |
| 581 | + cfg['labelsEnabled'], |
| 582 | + recorder, |
| 583 | + manager, |
| 584 | + sdk_ready_flag=None, |
| 585 | + telemetry_producer=telemetry_producer, |
| 586 | + telemetry_init_producer=telemetry_init_producer |
| 587 | + ) |
| 588 | + redundant_factory_count, active_factory_count = _get_active_and_redundant_count() |
| 589 | + storages['telemetry'].record_active_and_redundant_factories(active_factory_count, redundant_factory_count) |
| 590 | + telemetry_submitter.synchronize_config() |
| 591 | + |
| 592 | + return split_factory |
| 593 | + |
| 594 | + |
515 | 595 | def _build_localhost_factory(cfg): |
516 | 596 | """Build and return a localhost factory for testing/development purposes.""" |
517 | 597 | telemetry_storage = LocalhostTelemetryStorage() |
@@ -606,10 +686,12 @@ def get_factory(api_key, **kwargs): |
606 | 686 |
|
607 | 687 | config = sanitize_config(api_key, kwargs.get('config', {})) |
608 | 688 |
|
609 | | - if config['operationMode'] == 'localhost-standalone': |
| 689 | + if config['operationMode'] == 'localhost': |
610 | 690 | split_factory = _build_localhost_factory(config) |
611 | | - elif config['operationMode'] == 'redis-consumer': |
| 691 | + elif config['storageType'] == 'redis': |
612 | 692 | split_factory = _build_redis_factory(api_key, config) |
| 693 | + elif config['storageType'] == 'pluggable': |
| 694 | + split_factory = _build_pluggable_factory(api_key, config) |
613 | 695 | else: |
614 | 696 | split_factory = _build_in_memory_factory( |
615 | 697 | api_key, |
|
0 commit comments