Skip to content

Commit 57ca6ac

Browse files
committed
feat: Add support for data store status monitoring (#252)
The client instance will now provide access to a `data_store_status_provider`. This provider allows developers to retrieve the data store status of the SDK on demand, or asynchronously by registering listeners.
1 parent 4df1762 commit 57ca6ac

19 files changed

+621
-20
lines changed

ldclient/client.py

Lines changed: 121 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
This submodule contains the client class that provides most of the SDK functionality.
33
"""
44

5-
from typing import Optional, Any, Dict, Mapping, Union, Tuple
5+
from typing import Optional, Any, Dict, Mapping, Union, Tuple, Callable
66

77
from .impl import AnyNum
88

@@ -21,52 +21,142 @@
2121
from ldclient.impl.datasource.polling import PollingUpdateProcessor
2222
from ldclient.impl.datasource.streaming import StreamingUpdateProcessor
2323
from ldclient.impl.datasource.status import DataSourceUpdateSinkImpl, DataSourceStatusProviderImpl
24+
from ldclient.impl.datastore.status import DataStoreUpdateSinkImpl, DataStoreStatusProviderImpl
2425
from ldclient.impl.evaluator import Evaluator, error_reason
2526
from ldclient.impl.events.diagnostics import create_diagnostic_id, _DiagnosticAccumulator
2627
from ldclient.impl.events.event_processor import DefaultEventProcessor
2728
from ldclient.impl.events.types import EventFactory
2829
from ldclient.impl.model.feature_flag import FeatureFlag
2930
from ldclient.impl.listeners import Listeners
31+
from ldclient.impl.rwlock import ReadWriteLock
3032
from ldclient.impl.stubs import NullEventProcessor, NullUpdateProcessor
3133
from ldclient.impl.util import check_uwsgi, log
32-
from ldclient.interfaces import BigSegmentStoreStatusProvider, DataSourceStatusProvider, FeatureRequester, FeatureStore, FlagTracker
34+
from ldclient.impl.repeating_task import RepeatingTask
35+
from ldclient.interfaces import BigSegmentStoreStatusProvider, DataSourceStatusProvider, FeatureStore, FlagTracker, DataStoreUpdateSink, DataStoreStatus, DataStoreStatusProvider
3336
from ldclient.versioned_data_kind import FEATURES, SEGMENTS, VersionedDataKind
34-
from ldclient.feature_store import FeatureStore
3537
from ldclient.migrations import Stage, OpTracker
3638
from ldclient.impl.flag_tracker import FlagTrackerImpl
3739

3840
from threading import Lock
3941

4042

4143

44+
4245
class _FeatureStoreClientWrapper(FeatureStore):
4346
"""Provides additional behavior that the client requires before or after feature store operations.
44-
Currently this just means sorting the data set for init(). In the future we may also use this
45-
to provide an update listener capability.
47+
Currently this just means sorting the data set for init() and dealing with data store status listeners.
4648
"""
4749

48-
def __init__(self, store: FeatureStore):
50+
def __init__(self, store: FeatureStore, store_update_sink: DataStoreUpdateSink):
4951
self.store = store
52+
self.__store_update_sink = store_update_sink
53+
self.__monitoring_enabled = self.is_monitoring_enabled()
54+
55+
# Covers the following variables
56+
self.__lock = ReadWriteLock()
57+
self.__last_available = True
58+
self.__poller: Optional[RepeatingTask] = None
5059

5160
def init(self, all_data: Mapping[VersionedDataKind, Mapping[str, Dict[Any, Any]]]):
52-
return self.store.init(_FeatureStoreDataSetSorter.sort_all_collections(all_data))
61+
return self.__wrapper(lambda: self.store.init(_FeatureStoreDataSetSorter.sort_all_collections(all_data)))
5362

5463
def get(self, kind, key, callback):
55-
return self.store.get(kind, key, callback)
64+
return self.__wrapper(lambda: self.store.get(kind, key, callback))
5665

5766
def all(self, kind, callback):
58-
return self.store.all(kind, callback)
67+
return self.__wrapper(lambda: self.store.all(kind, callback))
5968

6069
def delete(self, kind, key, version):
61-
return self.store.delete(kind, key, version)
70+
return self.__wrapper(lambda: self.store.delete(kind, key, version))
6271

6372
def upsert(self, kind, item):
64-
return self.store.upsert(kind, item)
73+
return self.__wrapper(lambda: self.store.upsert(kind, item))
6574

6675
@property
6776
def initialized(self) -> bool:
6877
return self.store.initialized
6978

79+
def __wrapper(self, fn: Callable):
80+
try:
81+
return fn()
82+
except BaseException:
83+
if self.__monitoring_enabled:
84+
self.__update_availability(False)
85+
raise
86+
87+
def __update_availability(self, available: bool):
88+
try:
89+
self.__lock.lock()
90+
if available == self.__last_available:
91+
return
92+
self.__last_available = available
93+
finally:
94+
self.__lock.unlock()
95+
96+
status = DataStoreStatus(available, False)
97+
98+
if available:
99+
log.warn("Persistent store is available again")
100+
101+
self.__store_update_sink.update_status(status)
102+
103+
if available:
104+
try:
105+
self.__lock.lock()
106+
if self.__poller is not None:
107+
self.__poller.stop()
108+
self.__poller = None
109+
finally:
110+
self.__lock.unlock()
111+
112+
return
113+
114+
log.warn("Detected persistent store unavailability; updates will be cached until it recovers")
115+
task = RepeatingTask(0.5, 0, self.__check_availability)
116+
117+
self.__lock.lock()
118+
self.__poller = task
119+
self.__poller.start()
120+
self.__lock.unlock()
121+
122+
def __check_availability(self):
123+
try:
124+
if self.store.available:
125+
self.__update_availability(True)
126+
except BaseException as e:
127+
log.error("Unexpected error from data store status function: %s", e)
128+
129+
def is_monitoring_enabled(self) -> bool:
130+
"""
131+
This methods determines whether the wrapped store can support enabling monitoring.
132+
133+
The wrapped store must provide a monitoring_enabled method, which must
134+
be true. But this alone is not sufficient.
135+
136+
Because this class wraps all interactions with a provided store, it can
137+
technically "monitor" any store. However, monitoring also requires that
138+
we notify listeners when the store is available again.
139+
140+
We determine this by checking the store's `available?` method, so this
141+
is also a requirement for monitoring support.
142+
143+
These extra checks won't be necessary once `available` becomes a part
144+
of the core interface requirements and this class no longer wraps every
145+
feature store.
146+
"""
147+
148+
if not hasattr(self.store, 'is_monitoring_enabled'):
149+
return False
150+
151+
if not hasattr(self.store, 'is_available'):
152+
return False
153+
154+
monitoring_enabled = getattr(self.store, 'is_monitoring_enabled')
155+
if not callable(monitoring_enabled):
156+
return False
157+
158+
return monitoring_enabled()
159+
70160

71161
def _get_store_item(store, kind: VersionedDataKind, key: str) -> Any:
72162
# This decorator around store.get provides backward compatibility with any custom data
@@ -102,7 +192,11 @@ def __init__(self, config: Config, start_wait: float=5):
102192
self._event_factory_default = EventFactory(False)
103193
self._event_factory_with_reasons = EventFactory(True)
104194

105-
store = _FeatureStoreClientWrapper(self._config.feature_store)
195+
data_store_listeners = Listeners()
196+
store_sink = DataStoreUpdateSinkImpl(data_store_listeners)
197+
store = _FeatureStoreClientWrapper(self._config.feature_store, store_sink)
198+
199+
self.__data_store_status_provider = DataStoreStatusProviderImpl(store, store_sink)
106200

107201
data_source_listeners = Listeners()
108202
flag_change_listeners = Listeners()
@@ -515,6 +609,21 @@ def data_source_status_provider(self) -> DataSourceStatusProvider:
515609
"""
516610
return self.__data_source_status_provider
517611

612+
@property
613+
def data_store_status_provider(self) -> DataStoreStatusProvider:
614+
"""
615+
Returns an interface for tracking the status of a persistent data store.
616+
617+
The provider has methods for checking whether the data store is (as far
618+
as the SDK knows) currently operational, tracking changes in this
619+
status, and getting cache statistics. These are only relevant for a
620+
persistent data store; if you are using an in-memory data store, then
621+
this method will return a stub object that provides no information.
622+
623+
:return: The data store status provider
624+
"""
625+
return self.__data_store_status_provider
626+
518627
@property
519628
def flag_tracker(self) -> FlagTracker:
520629
"""

ldclient/feature_store.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ def __init__(self):
7878
self._initialized = False
7979
self._items = defaultdict(dict)
8080

81+
def is_monitoring_enabled(self) -> bool:
82+
return False
83+
84+
def is_available(self) -> bool:
85+
return True
86+
8187
def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any]=lambda x: x) -> Any:
8288
"""
8389
"""

ldclient/feature_store_helpers.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,21 @@ def __init__(self, core: FeatureStoreCore, cache_config: CacheConfig):
3232
:param cache_config: the caching parameters
3333
"""
3434
self._core = core
35+
self.__has_available_method = callable(getattr(core, 'is_available', None))
36+
3537
if cache_config.enabled:
3638
self._cache = ExpiringDict(max_len=cache_config.capacity, max_age_seconds=cache_config.expiration)
3739
else:
3840
self._cache = None
3941
self._inited = False
4042

43+
def is_monitoring_enabled(self) -> bool:
44+
return self.__has_available_method
45+
46+
def is_available(self) -> bool:
47+
# We know is_available exists since we are checking __has_available_method
48+
return self._core.is_available() if self.__has_available_method else False # type: ignore
49+
4150
def init(self, all_encoded_data: Mapping[VersionedDataKind, Mapping[str, Dict[Any, Any]]]):
4251
"""
4352
"""

ldclient/impl/datasource/status.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,13 @@ def __compute_changed_items_for_full_data_set(self, old_data: Mapping[VersionedD
147147

148148

149149
class DataSourceStatusProviderImpl(DataSourceStatusProvider):
150-
def __init__(self, listeners: Listeners, updates_sink: DataSourceUpdateSinkImpl):
150+
def __init__(self, listeners: Listeners, update_sink: DataSourceUpdateSinkImpl):
151151
self.__listeners = listeners
152-
self.__updates_sink = updates_sink
152+
self.__update_sink = update_sink
153153

154154
@property
155155
def status(self) -> DataSourceStatus:
156-
return self.__updates_sink.status
156+
return self.__update_sink.status
157157

158158
def add_listener(self, listener: Callable[[DataSourceStatus], None]):
159159
self.__listeners.add(listener)

ldclient/impl/datastore/__init__.py

Whitespace-only changes.

ldclient/impl/datastore/status.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from __future__ import annotations
2+
from typing import Callable, TYPE_CHECKING
3+
from copy import copy
4+
5+
from ldclient.interfaces import DataStoreStatusProvider, DataStoreStatus, DataStoreUpdateSink
6+
from ldclient.impl.listeners import Listeners
7+
from ldclient.impl.rwlock import ReadWriteLock
8+
9+
if TYPE_CHECKING:
10+
from ldclient.client import _FeatureStoreClientWrapper
11+
12+
13+
class DataStoreUpdateSinkImpl(DataStoreUpdateSink):
14+
def __init__(self, listeners: Listeners):
15+
self.__listeners = listeners
16+
17+
self.__lock = ReadWriteLock()
18+
self.__status = DataStoreStatus(True, False)
19+
20+
@property
21+
def listeners(self) -> Listeners:
22+
return self.__listeners
23+
24+
def status(self) -> DataStoreStatus:
25+
self.__lock.rlock()
26+
status = copy(self.__status)
27+
self.__lock.runlock()
28+
29+
return status
30+
31+
def update_status(self, status: DataStoreStatus):
32+
self.__lock.lock()
33+
old_value, self.__status = self.__status, status
34+
self.__lock.unlock()
35+
36+
if old_value != status:
37+
self.__listeners.notify(status)
38+
39+
40+
class DataStoreStatusProviderImpl(DataStoreStatusProvider):
41+
def __init__(self, store: _FeatureStoreClientWrapper, update_sink: DataStoreUpdateSinkImpl):
42+
self.__store = store
43+
self.__update_sink = update_sink
44+
45+
@property
46+
def status(self) -> DataStoreStatus:
47+
return self.__update_sink.status()
48+
49+
def is_monitoring_enabled(self) -> bool:
50+
return self.__store.is_monitoring_enabled()
51+
52+
def add_listener(self, listener: Callable[[DataStoreStatus], None]):
53+
self.__update_sink.listeners.add(listener)
54+
55+
def remove_listener(self, listener: Callable[[DataStoreStatus], None]):
56+
self.__update_sink.listeners.remove(listener)

ldclient/impl/integrations/consul/consul_feature_store.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ def __init__(self, host, port, prefix, consul_opts):
4545
self._prefix = ("launchdarkly" if prefix is None else prefix) + "/"
4646
self._client = consul.Consul(**opts)
4747

48+
def is_available(self) -> bool:
49+
try:
50+
self._client.kv.get(self._inited_key())
51+
return True
52+
except BaseException:
53+
return False
54+
4855
def init_internal(self, all_data):
4956
# Start by reading the existing keys; we will later delete any of these that weren't in all_data.
5057
index, keys = self._client.kv.get(self._prefix, recurse=True, keys=True)

ldclient/impl/integrations/dynamodb/dynamodb_feature_store.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ def __init__(self, table_name, prefix, dynamodb_opts):
5353
self._prefix = (prefix + ":") if prefix else ""
5454
self._client = boto3.client('dynamodb', **dynamodb_opts)
5555

56+
def is_available(self) -> bool:
57+
try:
58+
inited_key = self._inited_key()
59+
self._get_item_by_keys(inited_key, inited_key)
60+
return True
61+
except BaseException:
62+
return False
63+
5664
def init_internal(self, all_data):
5765
# Start by reading the existing keys; we will later delete any of these that weren't in all_data.
5866
unused_old_keys = self._read_existing_keys(all_data.keys())

ldclient/impl/integrations/redis/redis_feature_store.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ def __init__(self, url, prefix, redis_opts: Dict[str, Any]):
2424
self.test_update_hook = None # exposed for testing
2525
log.info("Started RedisFeatureStore connected to URL: " + redact_password(url) + " using prefix: " + self._prefix)
2626

27+
def is_available(self) -> bool:
28+
try:
29+
self.initialized_internal()
30+
return True
31+
except BaseException:
32+
return False
33+
2734
def _items_key(self, kind):
2835
return "{0}:{1}".format(self._prefix, kind.namespace)
2936

0 commit comments

Comments
 (0)