Skip to content

Commit 69e6f5a

Browse files
committed
Added telemetery pluggable storage with tests
1 parent a5c221c commit 69e6f5a

File tree

3 files changed

+301
-8
lines changed

3 files changed

+301
-8
lines changed

splitio/models/telemetry.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
MAX_LATENCY = 7481828
1818
MAX_LATENCY_BUCKET_COUNT = 23
1919
MAX_STREAMING_EVENTS = 20
20+
MAX_TAGS = 10
2021

2122
class CounterConstants(Enum):
2223
"""Impressions and events counters constants"""

splitio/storage/pluggable.py

Lines changed: 191 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22

33
import logging
44
import json
5+
import threading
56

67
from splitio.models import splits, segments
78
from splitio.models.impressions import Impression
8-
from splitio.storage import SplitStorage, SegmentStorage, ImpressionStorage, EventStorage
9+
from splitio.models.telemetry import MethodExceptions, MethodLatencies, TelemetryConfig, MAX_TAGS
10+
from splitio.storage import SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, TelemetryStorage
911

1012
_LOGGER = logging.getLogger(__name__)
1113

@@ -15,7 +17,14 @@ class PluggableSplitStorage(SplitStorage):
1517
_SPLIT_NAME_LENGTH = 12
1618

1719
def __init__(self, pluggable_adapter, prefix=None):
18-
"""Constructor."""
20+
"""
21+
Class constructor.
22+
23+
:param pluggable_adapter: Storage client or compliant interface.
24+
:type pluggable_adapter: TBD
25+
:param prefix: optional, prefix to storage keys
26+
:type prefix: str
27+
"""
1928
self._pluggable_adapter = pluggable_adapter
2029
self._prefix = "SPLITIO.split.{split_name}"
2130
self._traffic_type_prefix = "SPLITIO.trafficType.{traffic_type_name}"
@@ -302,7 +311,14 @@ class PluggableSegmentStorage(SegmentStorage):
302311
_TILL_LENGTH = 4
303312

304313
def __init__(self, pluggable_adapter, prefix=None):
305-
"""Constructor."""
314+
"""
315+
Class constructor.
316+
317+
:param pluggable_adapter: Storage client or compliant interface.
318+
:type pluggable_adapter: TBD
319+
:param prefix: optional, prefix to storage keys
320+
:type prefix: str
321+
"""
306322
self._pluggable_adapter = pluggable_adapter
307323
self._prefix = "SPLITIO.segment.{segment_name}"
308324
self._segment_till_prefix = "SPLITIO.segment.{segment_name}.till"
@@ -475,6 +491,7 @@ def put(self, segment):
475491

476492

477493
class PluggableImpressionsStorage(ImpressionStorage):
494+
"""Pluggable Impressions storage class."""
478495

479496
IMPRESSIONS_KEY_DEFAULT_TTL = 3600
480497

@@ -486,6 +503,8 @@ def __init__(self, pluggable_adapter, sdk_metadata, prefix=None):
486503
:type pluggable_adapter: TBD
487504
:param sdk_metadata: SDK & Machine information.
488505
:type sdk_metadata: splitio.client.util.SdkMetadata
506+
:param prefix: optional, prefix to storage keys
507+
:type prefix: str
489508
"""
490509
self._pluggable_adapter = pluggable_adapter
491510
self._sdk_metadata = {
@@ -573,18 +592,20 @@ def clear(self):
573592

574593

575594
class PluggableEventsStorage(EventStorage):
576-
"""Redis based event storage class."""
595+
"""Pluggable Event storage class."""
577596

578597
_EVENTS_KEY_DEFAULT_TTL = 3600
579598

580599
def __init__(self, pluggable_adapter, sdk_metadata, prefix=None):
581600
"""
582601
Class constructor.
583602
584-
:param redis_client: Redis client or compliant interface.
585-
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
603+
:param pluggable_adapter: Storage client or compliant interface.
604+
:type pluggable_adapter: TBD
586605
:param sdk_metadata: SDK & Machine information.
587606
:type sdk_metadata: splitio.client.util.SdkMetadata
607+
:param prefix: optional, prefix to storage keys
608+
:type prefix: str
588609
"""
589610
self._pluggable_adapter = pluggable_adapter
590611
self._sdk_metadata = {
@@ -657,3 +678,167 @@ def clear(self):
657678
Clear data.
658679
"""
659680
raise NotImplementedError('Not supported for redis.')
681+
682+
683+
class PluggableTelemetryStorage(TelemetryStorage):
684+
"""Pluggable telemetry storage class."""
685+
686+
_TELEMETRY_KEY_DEFAULT_TTL = 3600
687+
688+
def __init__(self, pluggable_adapter, sdk_metadata, prefix=None):
689+
"""
690+
Class constructor.
691+
692+
:param pluggable_adapter: Storage client or compliant interface.
693+
:type pluggable_adapter: TBD
694+
:param sdk_metadata: SDK & Machine information.
695+
:type sdk_metadata: splitio.client.util.SdkMetadata
696+
:param prefix: optional, prefix to storage keys
697+
:type prefix: str
698+
"""
699+
self._lock = threading.RLock()
700+
self._reset_config_tags()
701+
self._pluggable_adapter = pluggable_adapter
702+
self._sdk_metadata = sdk_metadata.sdk_version + '/' + sdk_metadata.instance_name + '/' + sdk_metadata.instance_ip
703+
self._method_latencies = MethodLatencies()
704+
self._method_exceptions = MethodExceptions()
705+
self._tel_config = TelemetryConfig()
706+
self._telemetry_config_key = 'SPLITIO.telemetry.init'
707+
self._telemetry_latencies_key = 'SPLITIO.telemetry.latencies'
708+
self._telemetry_exceptions_key = 'SPLITIO.telemetry.exceptions'
709+
if prefix is not None:
710+
self._telemetry_config_key = prefix + "." + self._telemetry_config_key
711+
self._telemetry_latencies_key = prefix + "." + self._telemetry_latencies_key
712+
self._telemetry_exceptions_key = prefix + "." + self._telemetry_exceptions_key
713+
714+
def _reset_config_tags(self):
715+
"""Reset config tags."""
716+
with self._lock:
717+
self._config_tags = []
718+
719+
def add_config_tag(self, tag):
720+
"""
721+
Record tag string.
722+
723+
:param tag: tag to be added
724+
:type tag: str
725+
"""
726+
with self._lock:
727+
if len(self._config_tags) < MAX_TAGS:
728+
self._config_tags.append(tag)
729+
730+
def record_config(self, config, extra_config):
731+
"""
732+
initilize telemetry objects
733+
734+
:param config: factory configuration parameters
735+
:type config: Dict
736+
:param extra_config: any extra configs
737+
:type extra_config: Dict
738+
"""
739+
self._tel_config.record_config(config, extra_config)
740+
741+
def pop_config_tags(self):
742+
"""Get and reset configs."""
743+
with self._lock:
744+
tags = self._config_tags
745+
self._reset_config_tags()
746+
return tags
747+
748+
def push_config_stats(self):
749+
"""push config stats to storage."""
750+
self._pluggable_adapter.set(self._telemetry_config_key + "::" + self._sdk_metadata, str(self._format_config_stats()))
751+
752+
def _format_config_stats(self):
753+
"""format only selected config stats to json"""
754+
config_stats = self._tel_config.get_stats()
755+
return json.dumps({
756+
'aF': config_stats['aF'],
757+
'rF': config_stats['rF'],
758+
'sT': config_stats['sT'],
759+
'oM': config_stats['oM'],
760+
't': self.pop_config_tags()
761+
})
762+
763+
def record_active_and_redundant_factories(self, active_factory_count, redundant_factory_count):
764+
"""
765+
Record active and redundant factories.
766+
767+
:param active_factory_count: active factory count
768+
:type active_factory_count: int
769+
:param redundant_factory_count: redundant factory count
770+
:type redundant_factory_count: int
771+
"""
772+
self._tel_config.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
773+
774+
def record_latency(self, method, latency):
775+
"""
776+
record latency data
777+
778+
:param method: method name
779+
:type method: string
780+
:param latency: latency
781+
:type latency: int64
782+
"""
783+
self._method_latencies.add_latency(method, latency)
784+
latencies = self._method_latencies.pop_all()['methodLatencies']
785+
values = latencies[method.value]
786+
total_keys = 0
787+
bucket_number = 0
788+
for bucket in values:
789+
if bucket > 0:
790+
latency_key = self._telemetry_latencies_key + '::' + self._sdk_metadata + '/' + method.value + '/' + str(bucket_number)
791+
result = self._pluggable_adapter.increment(latency_key, bucket)
792+
self.expire_keys(latency_key, self._TELEMETRY_KEY_DEFAULT_TTL, 1, result)
793+
total_keys += 1
794+
bucket_number = bucket_number + 0
795+
796+
def record_exception(self, method):
797+
"""
798+
record an exception
799+
800+
:param method: method name
801+
:type method: string
802+
"""
803+
except_key = self._telemetry_exceptions_key + "::" + self._sdk_metadata + '/' + method.value
804+
result = self._pluggable_adapter.increment(except_key, 1)
805+
self.expire_keys(except_key, self._TELEMETRY_KEY_DEFAULT_TTL, 1, result)
806+
807+
def record_not_ready_usage(self):
808+
"""Not implemented"""
809+
pass
810+
811+
def record_bur_time_out(self):
812+
"""Not implemented"""
813+
pass
814+
815+
def record_impression_stats(self, data_type, count):
816+
"""Not implemented"""
817+
pass
818+
819+
def expire_latency_keys(self, total_keys, inserted):
820+
"""
821+
Set expire ttl for a latency key in storage
822+
823+
:param total_keys: length of keys.
824+
:type total_keys: int
825+
:param inserted: added keys.
826+
:type inserted: int
827+
"""
828+
self.expire_keys(self._telemetry_latencies_key, self._TELEMETRY_KEY_DEFAULT_TTL, total_keys, inserted)
829+
830+
def expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
831+
"""
832+
Set expire ttl for a key in storage if total keys equal inserted
833+
834+
:param queue_keys: key to be set
835+
:type queue_keys: str
836+
:param ey_default_ttl: ttl value
837+
:type ey_default_ttl: int
838+
:param total_keys: length of keys.
839+
:type total_keys: int
840+
:param inserted: added keys.
841+
:type inserted: int
842+
"""
843+
if total_keys == inserted:
844+
self._pluggable_adapter.expire(queue_key, key_default_ttl)

tests/storage/test_pluggable.py

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
from splitio.models.segments import Segment
77
from splitio.models.impressions import Impression
88
from splitio.models.events import Event, EventWrapper
9-
from splitio.storage.pluggable import PluggableSplitStorage, PluggableSegmentStorage, PluggableImpressionsStorage, PluggableEventsStorage
9+
from splitio.storage.pluggable import PluggableSplitStorage, PluggableSegmentStorage, PluggableImpressionsStorage, PluggableEventsStorage, PluggableTelemetryStorage
1010
from splitio.client.util import get_metadata, SdkMetadata
11+
from splitio.models.telemetry import MAX_TAGS, MethodExceptionsAndLatencies, OperationMode
1112

1213
from tests.integration import splits_json
1314
import pytest
@@ -432,7 +433,7 @@ def mock_expire(impressions_queue_key, ttl):
432433

433434

434435
class PluggableEventsStorageTests(object):
435-
"""In memory events storage test cases."""
436+
"""Pluggable events storage test cases."""
436437

437438
def setup_method(self):
438439
"""Prepare storages with test data."""
@@ -526,3 +527,109 @@ def mock_expire(impressions_event_key, ttl):
526527
assert(self.expired_called)
527528
assert(self.key == "myprefix.SPLITIO.events")
528529
assert(self.ttl == self.pluggable_events_storage._EVENTS_KEY_DEFAULT_TTL)
530+
531+
class PluggableTelemetryStorageTests(object):
532+
"""Pluggable telemetry storage test cases."""
533+
534+
def setup_method(self):
535+
"""Prepare storages with test data."""
536+
self.mock_adapter = StorageMockAdapter()
537+
self.sdk_metadata = SdkMetadata('python-1.1.1', 'hostname', 'ip')
538+
self.pluggable_telemetry_storage = PluggableTelemetryStorage(self.mock_adapter, self.sdk_metadata, 'myprefix')
539+
540+
def test_init(self):
541+
assert(self.pluggable_telemetry_storage._telemetry_config_key == 'myprefix.SPLITIO.telemetry.init')
542+
assert(self.pluggable_telemetry_storage._telemetry_latencies_key == 'myprefix.SPLITIO.telemetry.latencies')
543+
assert(self.pluggable_telemetry_storage._telemetry_exceptions_key == 'myprefix.SPLITIO.telemetry.exceptions')
544+
assert(self.pluggable_telemetry_storage._sdk_metadata == self.sdk_metadata.sdk_version + '/' + self.sdk_metadata.instance_name + '/' + self.sdk_metadata.instance_ip)
545+
assert(self.pluggable_telemetry_storage._config_tags == [])
546+
547+
pluggable2 = PluggableTelemetryStorage(self.mock_adapter, self.sdk_metadata)
548+
assert(pluggable2._telemetry_config_key == 'SPLITIO.telemetry.init')
549+
assert(pluggable2._telemetry_latencies_key == 'SPLITIO.telemetry.latencies')
550+
assert(pluggable2._telemetry_exceptions_key == 'SPLITIO.telemetry.exceptions')
551+
552+
def test_reset_config_tags(self):
553+
self.pluggable_telemetry_storage._config_tags = ['a']
554+
self.pluggable_telemetry_storage._reset_config_tags()
555+
assert(self.pluggable_telemetry_storage._config_tags == [])
556+
557+
def test_add_config_tag(self):
558+
self.pluggable_telemetry_storage.add_config_tag('q')
559+
assert(self.pluggable_telemetry_storage._config_tags == ['q'])
560+
561+
self.pluggable_telemetry_storage._config_tags = []
562+
for i in range(0, 20):
563+
self.pluggable_telemetry_storage.add_config_tag('q' + str(i))
564+
assert(len(self.pluggable_telemetry_storage._config_tags) == MAX_TAGS)
565+
assert(self.pluggable_telemetry_storage._config_tags == ['q' + str(i) for i in range(0, MAX_TAGS)])
566+
567+
def test_record_config(self):
568+
self.config = {}
569+
self.extra_config = {}
570+
def record_config_mock(config, extra_config):
571+
self.config = config
572+
self.extra_config = extra_config
573+
574+
self.pluggable_telemetry_storage.record_config = record_config_mock
575+
self.pluggable_telemetry_storage.record_config({'item': 'value'}, {'item2': 'value2'})
576+
assert(self.config == {'item': 'value'})
577+
assert(self.extra_config == {'item2': 'value2'})
578+
579+
def test_pop_config_tags(self):
580+
self.pluggable_telemetry_storage._config_tags = ['a']
581+
self.pluggable_telemetry_storage.pop_config_tags()
582+
assert(self.pluggable_telemetry_storage._config_tags == [])
583+
584+
def test_record_active_and_redundant_factories(self):
585+
self.active_factory_count = 0
586+
self.redundant_factory_count = 0
587+
def record_active_and_redundant_factories_mock(active_factory_count, redundant_factory_count):
588+
self.active_factory_count = active_factory_count
589+
self.redundant_factory_count = redundant_factory_count
590+
591+
self.pluggable_telemetry_storage.record_active_and_redundant_factories = record_active_and_redundant_factories_mock
592+
self.pluggable_telemetry_storage.record_active_and_redundant_factories(2, 1)
593+
assert(self.active_factory_count == 2)
594+
assert(self.redundant_factory_count == 1)
595+
596+
def test_record_latency(self):
597+
def expire_keys_mock(*args, **kwargs):
598+
assert(args[0] == self.pluggable_telemetry_storage._telemetry_latencies_key + '::python-1.1.1/hostname/ip/treatment/0')
599+
assert(args[1] == self.pluggable_telemetry_storage._TELEMETRY_KEY_DEFAULT_TTL)
600+
assert(args[2] == 1)
601+
assert(args[3] == 1)
602+
603+
self.pluggable_telemetry_storage.expire_keys = expire_keys_mock
604+
self.pluggable_telemetry_storage.record_latency(MethodExceptionsAndLatencies.TREATMENT, 10)
605+
assert(self.mock_adapter._keys[self.pluggable_telemetry_storage._telemetry_latencies_key + '::python-1.1.1/hostname/ip/treatment/0'] == 1)
606+
607+
def test_record_exception(self):
608+
def expire_keys_mock(*args, **kwargs):
609+
assert(args[0] == self.pluggable_telemetry_storage._telemetry_exceptions_key + '::python-1.1.1/hostname/ip/treatment')
610+
assert(args[1] == self.pluggable_telemetry_storage._TELEMETRY_KEY_DEFAULT_TTL)
611+
assert(args[2] == 1)
612+
assert(args[3] == 1)
613+
614+
self.pluggable_telemetry_storage.expire_keys = expire_keys_mock
615+
self.pluggable_telemetry_storage.record_exception(MethodExceptionsAndLatencies.TREATMENT)
616+
assert(self.mock_adapter._keys[self.pluggable_telemetry_storage._telemetry_exceptions_key + '::python-1.1.1/hostname/ip/treatment'] == 1)
617+
618+
def test_push_config_stats(self):
619+
self.pluggable_telemetry_storage.record_config(
620+
{'operationMode': 'inmemory',
621+
'streamingEnabled': True,
622+
'impressionsQueueSize': 100,
623+
'eventsQueueSize': 200,
624+
'impressionsMode': 'DEBUG',''
625+
'impressionListener': None,
626+
'featuresRefreshRate': 30,
627+
'segmentsRefreshRate': 30,
628+
'impressionsRefreshRate': 60,
629+
'eventsPushRate': 60,
630+
'metricsRefreshRate': 10,
631+
}, {}
632+
)
633+
self.pluggable_telemetry_storage.record_active_and_redundant_factories(2, 1)
634+
self.pluggable_telemetry_storage.push_config_stats()
635+
assert(self.mock_adapter._keys[self.pluggable_telemetry_storage._telemetry_config_key + "::" + self.pluggable_telemetry_storage._sdk_metadata] == '{"aF": 2, "rF": 1, "sT": "memory", "oM": 0, "t": []}')

0 commit comments

Comments
 (0)