-
Notifications
You must be signed in to change notification settings - Fork 55
chore: Use FDv1 DataSystem in the ldclient #345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
9c2fa14
fbdd98b
8579305
ff845d5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,20 +2,15 @@ | |
| require "ldclient-rb/impl/broadcaster" | ||
| require "ldclient-rb/impl/context" | ||
| require "ldclient-rb/impl/data_source" | ||
| require "ldclient-rb/impl/data_source/null_processor" | ||
| require "ldclient-rb/impl/data_source/polling" | ||
| require "ldclient-rb/impl/data_source/requestor" | ||
| require "ldclient-rb/impl/data_source/stream" | ||
| require "ldclient-rb/impl/data_store" | ||
| require "ldclient-rb/impl/data_system/fdv1" | ||
| require "ldclient-rb/impl/diagnostic_events" | ||
| require "ldclient-rb/impl/evaluation_with_hook_result" | ||
| require "ldclient-rb/impl/evaluator" | ||
| require "ldclient-rb/impl/flag_tracker" | ||
| require "ldclient-rb/impl/migrations/tracker" | ||
| require "ldclient-rb/impl/store_client_wrapper" | ||
| require "ldclient-rb/impl/util" | ||
| require "ldclient-rb/events" | ||
| require "ldclient-rb/in_memory_store" | ||
| require "concurrent" | ||
| require "concurrent/atomics" | ||
| require "digest/sha1" | ||
|
|
@@ -57,11 +52,12 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5) | |
| # Note that sdk_key is normally a required parameter, and a nil value would cause the SDK to | ||
| # fail in most configurations. However, there are some configurations where it would be OK | ||
| # (offline = true, *or* we are using LDD mode or the file data source and events are disabled | ||
| # so we're not connecting to any LD services) so rather than try to check for all of those | ||
| # up front, we will let the constructors for the data source implementations implement this | ||
| # fail-fast as appropriate, and just check here for the part regarding events. | ||
| if !config.offline? && config.send_events | ||
| raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil? | ||
| # so we're not connecting to any LD services). | ||
| if !config.offline? && sdk_key.nil? | ||
| # SDK key can be nil only if using LDD or custom data source with events disabled | ||
| if config.send_events || (!config.use_ldd? && config.data_source.nil?) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This conditional doesn't make sense to me. The SDK key is required if:
If we aren't in daemon mode, then we should have a data source, and that's when an SDK key is required. If there isn't a data source, what is the key for? |
||
| raise ArgumentError, "sdk_key must not be nil" | ||
| end | ||
| end | ||
|
|
||
| @sdk_key = sdk_key | ||
|
|
@@ -89,9 +85,10 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5) | |
| # @param wait_for_sec [Float] maximum time (in seconds) to wait for initialization | ||
| # | ||
| def postfork(wait_for_sec = 5) | ||
| @data_source = nil | ||
| @data_system = nil | ||
| @event_processor = nil | ||
| @big_segment_store_manager = nil | ||
| @flag_tracker = nil | ||
|
|
||
| start_up(wait_for_sec) | ||
| end | ||
|
|
@@ -102,32 +99,22 @@ def postfork(wait_for_sec = 5) | |
|
|
||
| @hooks = Concurrent::Array.new(@config.hooks + plugin_hooks) | ||
|
|
||
| @shared_executor = Concurrent::SingleThreadExecutor.new | ||
|
|
||
| data_store_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) | ||
| store_sink = LaunchDarkly::Impl::DataStore::UpdateSink.new(data_store_broadcaster) | ||
|
|
||
| # We need to wrap the feature store object with a FeatureStoreClientWrapper in order to add | ||
| # some necessary logic around updates. Unfortunately, we have code elsewhere that accesses | ||
| # the feature store through the Config object, so we need to make a new Config that uses | ||
| # the wrapped store. | ||
| @store = Impl::FeatureStoreClientWrapper.new(@config.feature_store, store_sink, @config.logger) | ||
| updated_config = @config.clone | ||
| updated_config.instance_variable_set(:@feature_store, @store) | ||
| @config = updated_config | ||
|
|
||
| @data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new(@store, store_sink) | ||
| # Initialize the data system (FDv1 for now, will support FDv2 in the future) | ||
| # Note: FDv1 will update @config.feature_store to use its wrapped store | ||
| @data_system = Impl::DataSystem::FDv1.new(@sdk_key, @config) | ||
|
|
||
| # Components not managed by data system | ||
| @big_segment_store_manager = Impl::BigSegmentStoreManager.new(@config.big_segments, @config.logger) | ||
| @big_segment_store_status_provider = @big_segment_store_manager.status_provider | ||
|
|
||
| get_flag = lambda { |key| @store.get(Impl::DataStore::FEATURES, key) } | ||
| get_segment = lambda { |key| @store.get(Impl::DataStore::SEGMENTS, key) } | ||
| get_flag = lambda { |key| @data_system.store.get(Impl::DataStore::FEATURES, key) } | ||
| get_segment = lambda { |key| @data_system.store.get(Impl::DataStore::SEGMENTS, key) } | ||
| get_big_segments_membership = lambda { |key| @big_segment_store_manager.get_context_membership(key) } | ||
| @evaluator = LaunchDarkly::Impl::Evaluator.new(get_flag, get_segment, get_big_segments_membership, @config.logger) | ||
|
|
||
| if !@config.offline? && @config.send_events && !@config.diagnostic_opt_out? | ||
| diagnostic_accumulator = Impl::DiagnosticAccumulator.new(Impl::DiagnosticAccumulator.create_diagnostic_id(@sdk_key)) | ||
| @data_system.set_diagnostic_accumulator(diagnostic_accumulator) | ||
| else | ||
| diagnostic_accumulator = nil | ||
| end | ||
|
|
@@ -138,38 +125,14 @@ def postfork(wait_for_sec = 5) | |
| @event_processor = EventProcessor.new(@sdk_key, @config, nil, diagnostic_accumulator) | ||
| end | ||
|
|
||
| if @config.use_ldd? | ||
| @config.logger.info { "[LDClient] Started LaunchDarkly Client in LDD mode" } | ||
| @data_source = LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new | ||
| return # requestor and update processor are not used in this mode | ||
| end | ||
|
|
||
| flag_tracker_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) | ||
| @flag_tracker = LaunchDarkly::Impl::FlagTracker.new(flag_tracker_broadcaster, lambda { |key, context| variation(key, context, nil) }) | ||
|
|
||
| data_source_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) | ||
|
|
||
| # Make the update sink available on the config so that our data source factory can access the sink with a shared executor. | ||
| @config.data_source_update_sink = LaunchDarkly::Impl::DataSource::UpdateSink.new(@store, data_source_broadcaster, flag_tracker_broadcaster) | ||
|
|
||
| @data_source_status_provider = LaunchDarkly::Impl::DataSource::StatusProvider.new(data_source_broadcaster, @config.data_source_update_sink) | ||
|
|
||
| data_source_or_factory = @config.data_source || self.method(:create_default_data_source) | ||
| if data_source_or_factory.respond_to? :call | ||
| # Currently, data source factories take two parameters unless they need to be aware of diagnostic_accumulator, in | ||
| # which case they take three parameters. This will be changed in the future to use a less awkware mechanism. | ||
| if data_source_or_factory.arity == 3 | ||
| @data_source = data_source_or_factory.call(@sdk_key, @config, diagnostic_accumulator) | ||
| else | ||
| @data_source = data_source_or_factory.call(@sdk_key, @config) | ||
| end | ||
| else | ||
| @data_source = data_source_or_factory | ||
| end | ||
| # Create the flag tracker using the broadcaster from the data system | ||
| eval_fn = lambda { |key, context| variation(key, context, nil) } | ||
| @flag_tracker = Impl::FlagTracker.new(@data_system.flag_change_broadcaster, eval_fn) | ||
|
|
||
| register_plugins(environment_metadata) | ||
|
|
||
| ready = @data_source.start | ||
| # Start the data system | ||
| ready = @data_system.start | ||
|
|
||
| return unless wait_for_sec > 0 | ||
|
|
||
|
|
@@ -180,7 +143,7 @@ def postfork(wait_for_sec = 5) | |
| ok = ready.wait(wait_for_sec) | ||
| if !ok | ||
| @config.logger.error { "[LDClient] Timeout encountered waiting for LaunchDarkly client initialization" } | ||
| elsif !@data_source.initialized? | ||
| elsif !initialized? | ||
| @config.logger.error { "[LDClient] LaunchDarkly client initialization failed" } | ||
| end | ||
| end | ||
|
|
@@ -295,7 +258,7 @@ def secure_mode_hash(context) | |
| # @return [Boolean] true if the client has been initialized | ||
| # | ||
| def initialized? | ||
| @config.offline? || @config.use_ldd? || @data_source.initialized? | ||
| @data_system.data_availability == @data_system.target_availability | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure these lines are equivalent given our previous discussions about this? I can't remember exactly where we landed on that. |
||
| end | ||
|
|
||
| # | ||
|
|
@@ -601,7 +564,7 @@ def all_flags_state(context, options={}) | |
| return FeatureFlagsState.new(false) if @config.offline? | ||
|
|
||
| unless initialized? | ||
| if @store.initialized? | ||
| if @data_system.store.initialized? | ||
| @config.logger.warn { "Called all_flags_state before client initialization; using last known values from data store" } | ||
| else | ||
| @config.logger.warn { "Called all_flags_state before client initialization. Data store not available; returning empty state" } | ||
|
|
@@ -616,7 +579,7 @@ def all_flags_state(context, options={}) | |
| end | ||
|
|
||
| begin | ||
| features = @store.all(Impl::DataStore::FEATURES) | ||
| features = @data_system.store.all(Impl::DataStore::FEATURES) | ||
| rescue => exn | ||
| Impl::Util.log_exception(@config.logger, "Unable to read flags for all_flags_state", exn) | ||
| return FeatureFlagsState.new(false) | ||
|
|
@@ -663,11 +626,9 @@ def all_flags_state(context, options={}) | |
| # @return [void] | ||
| def close | ||
| @config.logger.info { "[LDClient] Closing LaunchDarkly client..." } | ||
| @data_source.stop | ||
| @data_system.stop | ||
| @event_processor.stop | ||
| @big_segment_store_manager.stop | ||
| @store.stop | ||
| @shared_executor.shutdown | ||
| end | ||
|
|
||
| # | ||
|
|
@@ -690,7 +651,9 @@ def close | |
| # | ||
| # @return [LaunchDarkly::Interfaces::DataStore::StatusProvider] | ||
| # | ||
| attr_reader :data_store_status_provider | ||
| def data_store_status_provider | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could also make these delegators if you wanted to tighten the syntax up some. It would be something like: require 'forwardable'
# ...
extend Forwardable
def_delegators :@data_system, :data_store_status_provider, :data_source_status_provider |
||
| @data_system.data_store_status_provider | ||
| end | ||
|
|
||
| # | ||
| # Returns an interface for tracking the status of the data source. | ||
|
|
@@ -703,7 +666,9 @@ def close | |
| # | ||
| # @return [LaunchDarkly::Interfaces::DataSource::StatusProvider] | ||
| # | ||
| attr_reader :data_source_status_provider | ||
| def data_source_status_provider | ||
| @data_system.data_source_status_provider | ||
| end | ||
|
|
||
| # | ||
| # Returns an interface for tracking changes in feature flag configurations. | ||
|
|
@@ -712,23 +677,8 @@ def close | |
| # requesting notifications about feature flag changes using an event | ||
| # listener model. | ||
| # | ||
| attr_reader :flag_tracker | ||
|
|
||
| private | ||
|
|
||
| def create_default_data_source(sdk_key, config, diagnostic_accumulator) | ||
| if config.offline? | ||
| return LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new | ||
| end | ||
| raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil? # see LDClient constructor comment on sdk_key | ||
| if config.stream? | ||
| Impl::DataSource::StreamProcessor.new(sdk_key, config, diagnostic_accumulator) | ||
| else | ||
| config.logger.info { "Disabling streaming API" } | ||
| config.logger.warn { "You should only disable the streaming API if instructed to do so by LaunchDarkly support" } | ||
| requestor = Impl::DataSource::Requestor.new(sdk_key, config) | ||
| Impl::DataSource::PollingProcessor.new(config, requestor) | ||
| end | ||
| def flag_tracker | ||
| @flag_tracker | ||
| end | ||
|
|
||
| # | ||
|
|
@@ -738,7 +688,7 @@ def create_default_data_source(sdk_key, config, diagnostic_accumulator) | |
| # | ||
| # @return [Array<EvaluationDetail, [LaunchDarkly::Impl::Model::FeatureFlag, nil], [String, nil]>] | ||
| # | ||
| def variation_with_flag(key, context, default) | ||
| private def variation_with_flag(key, context, default) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The file used a mix of per method private and all methods after the declaration on line 717 of the original file. Moving to defining per method. |
||
| evaluate_internal(key, context, default, false) | ||
| end | ||
|
|
||
|
|
@@ -750,7 +700,7 @@ def variation_with_flag(key, context, default) | |
| # | ||
| # @return [Array<EvaluationDetail, [LaunchDarkly::Impl::Model::FeatureFlag, nil], [String, nil]>] | ||
| # | ||
| def evaluate_internal(key, context, default, with_reasons) | ||
| private def evaluate_internal(key, context, default, with_reasons) | ||
| if @config.offline? | ||
| return Evaluator.error_result(EvaluationReason::ERROR_CLIENT_NOT_READY, default), nil, nil | ||
| end | ||
|
|
@@ -768,7 +718,7 @@ def evaluate_internal(key, context, default, with_reasons) | |
| end | ||
|
|
||
| unless initialized? | ||
| if @store.initialized? | ||
| if @data_system.store.initialized? | ||
| @config.logger.warn { "[LDClient] Client has not finished initializing; using last known values from feature store" } | ||
| else | ||
| @config.logger.error { "[LDClient] Client has not finished initializing; feature store unavailable, returning default value" } | ||
|
|
@@ -779,7 +729,7 @@ def evaluate_internal(key, context, default, with_reasons) | |
| end | ||
|
|
||
| begin | ||
| feature = @store.get(Impl::DataStore::FEATURES, key) | ||
| feature = @data_system.store.get(Impl::DataStore::FEATURES, key) | ||
| rescue | ||
| # Ignored | ||
| end | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.