Skip to content

Commit c3802c6

Browse files
committed
Fixed adding telemetry latency to bucket
1 parent 69e6f5a commit c3802c6

File tree

4 files changed

+43
-31
lines changed

4 files changed

+43
-31
lines changed

splitio/storage/pluggable.py

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from splitio.models import splits, segments
88
from splitio.models.impressions import Impression
9-
from splitio.models.telemetry import MethodExceptions, MethodLatencies, TelemetryConfig, MAX_TAGS
9+
from splitio.models.telemetry import MethodExceptions, MethodLatencies, TelemetryConfig, MAX_TAGS, get_latency_bucket_index
1010
from splitio.storage import SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, TelemetryStorage
1111

1212
_LOGGER = logging.getLogger(__name__)
@@ -780,18 +780,10 @@ def record_latency(self, method, latency):
780780
:param latency: latency
781781
:type latency: int64
782782
"""
783-
self._method_latencies.add_latency(method, latency)
784-
latencies = self._method_latencies.pop_all()['methodLatencies']
785-
values = latencies[method.value]
786-
total_keys = 0
787-
bucket_number = 0
788-
for bucket in values:
789-
if bucket > 0:
790-
latency_key = self._telemetry_latencies_key + '::' + self._sdk_metadata + '/' + method.value + '/' + str(bucket_number)
791-
result = self._pluggable_adapter.increment(latency_key, bucket)
792-
self.expire_keys(latency_key, self._TELEMETRY_KEY_DEFAULT_TTL, 1, result)
793-
total_keys += 1
794-
bucket_number = bucket_number + 0
783+
bucket = get_latency_bucket_index(latency)
784+
latency_key = self._telemetry_latencies_key + '::' + self._sdk_metadata + '/' + method.value + '/' + str(bucket)
785+
result = self._pluggable_adapter.increment(latency_key, 1)
786+
self.expire_keys(latency_key, self._TELEMETRY_KEY_DEFAULT_TTL, 1, result)
795787

796788
def record_exception(self, method):
797789
"""

splitio/storage/redis.py

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from splitio.models.impressions import Impression
77
from splitio.models import splits, segments
8-
from splitio.models.telemetry import MethodExceptions, MethodLatencies, TelemetryConfig
8+
from splitio.models.telemetry import MethodExceptions, MethodLatencies, TelemetryConfig, get_latency_bucket_index
99
from splitio.storage import SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, \
1010
ImpressionPipelinedStorage, TelemetryStorage
1111
from splitio.storage.adapters.redis import RedisAdapterException
@@ -660,17 +660,9 @@ def add_latency_to_pipe(self, method, latency, pipe):
660660
:param pipe: Redis pipe.
661661
:type pipe: redis.pipe
662662
"""
663-
self._method_latencies.add_latency(method, latency)
664-
latencies = self._method_latencies.pop_all()['methodLatencies']
665-
values = latencies[method.value]
666-
total_keys = 0
667-
bucket_number = 0
668-
for bucket in values:
669-
if bucket > 0:
670-
pipe.hincrby(self._TELEMETRY_LATENCIES_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
671-
method.value + '/' + str(bucket_number), bucket)
672-
total_keys += 1
673-
bucket_number = bucket_number + 0
663+
bucket = get_latency_bucket_index(latency)
664+
pipe.hincrby(self._TELEMETRY_LATENCIES_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
665+
method.value + '/' + str(bucket), 1)
674666

675667
def record_latency(self, method, latency):
676668
"""

tests/storage/test_pluggable.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -599,11 +599,30 @@ def expire_keys_mock(*args, **kwargs):
599599
assert(args[1] == self.pluggable_telemetry_storage._TELEMETRY_KEY_DEFAULT_TTL)
600600
assert(args[2] == 1)
601601
assert(args[3] == 1)
602-
603602
self.pluggable_telemetry_storage.expire_keys = expire_keys_mock
603+
# should increment bucket 0
604604
self.pluggable_telemetry_storage.record_latency(MethodExceptionsAndLatencies.TREATMENT, 10)
605605
assert(self.mock_adapter._keys[self.pluggable_telemetry_storage._telemetry_latencies_key + '::python-1.1.1/hostname/ip/treatment/0'] == 1)
606606

607+
def expire_keys_mock2(*args, **kwargs):
608+
assert(args[0] == self.pluggable_telemetry_storage._telemetry_latencies_key + '::python-1.1.1/hostname/ip/treatment/3')
609+
assert(args[1] == self.pluggable_telemetry_storage._TELEMETRY_KEY_DEFAULT_TTL)
610+
assert(args[2] == 1)
611+
assert(args[3] == 1)
612+
self.pluggable_telemetry_storage.expire_keys = expire_keys_mock2
613+
# should increment bucket 3
614+
self.pluggable_telemetry_storage.record_latency(MethodExceptionsAndLatencies.TREATMENT, 2260)
615+
616+
def expire_keys_mock3(*args, **kwargs):
617+
assert(args[0] == self.pluggable_telemetry_storage._telemetry_latencies_key + '::python-1.1.1/hostname/ip/treatment/3')
618+
assert(args[1] == self.pluggable_telemetry_storage._TELEMETRY_KEY_DEFAULT_TTL)
619+
assert(args[2] == 1)
620+
assert(args[3] == 2)
621+
self.pluggable_telemetry_storage.expire_keys = expire_keys_mock3
622+
# should increment bucket 3
623+
self.pluggable_telemetry_storage.record_latency(MethodExceptionsAndLatencies.TREATMENT, 3280)
624+
assert(self.mock_adapter._keys[self.pluggable_telemetry_storage._telemetry_latencies_key + '::python-1.1.1/hostname/ip/treatment/3'] == 2)
625+
607626
def test_record_exception(self):
608627
def expire_keys_mock(*args, **kwargs):
609628
assert(args[0] == self.pluggable_telemetry_storage._telemetry_exceptions_key + '::python-1.1.1/hostname/ip/treatment')

tests/storage/test_redis.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -430,18 +430,27 @@ def test_record_active_and_redundant_factories(self, mocker):
430430
assert (redis_telemetry._tel_config._redundant_factory_count == redundant_factory_count)
431431

432432
def test_add_latency_to_pipe(self, mocker):
433-
def _mocked_hincrby(*args, **kwargs):
434-
assert(args[1] == RedisTelemetryStorage._TELEMETRY_LATENCIES_KEY)
435-
assert(args[2][-11:] == 'treatment/0')
436-
assert(args[3] == 1)
437-
438433
adapter = build({})
439434
metadata = SdkMetadata('python-1.1.1', 'hostname', 'ip')
440435
redis_telemetry = RedisTelemetryStorage(adapter, metadata)
441436
pipe = adapter._decorated.pipeline()
437+
438+
def _mocked_hincrby(*args, **kwargs):
439+
assert(args[1] == RedisTelemetryStorage._TELEMETRY_LATENCIES_KEY)
440+
assert(args[2][-11:] == 'treatment/0')
441+
assert(args[3] == 1)
442+
# should increment bucket 0
442443
with mock.patch('redis.client.Pipeline.hincrby', _mocked_hincrby):
443444
redis_telemetry.add_latency_to_pipe(MethodExceptionsAndLatencies.TREATMENT, 20, pipe)
444445

446+
def _mocked_hincrby2(*args, **kwargs):
447+
assert(args[1] == RedisTelemetryStorage._TELEMETRY_LATENCIES_KEY)
448+
assert(args[2][-11:] == 'treatment/3')
449+
assert(args[3] == 1)
450+
# should increment bucket 3
451+
with mock.patch('redis.client.Pipeline.hincrby', _mocked_hincrby2):
452+
redis_telemetry.add_latency_to_pipe(MethodExceptionsAndLatencies.TREATMENT, 2260, pipe)
453+
445454
def test_record_exception(self, mocker):
446455
def _mocked_hincrby(*args, **kwargs):
447456
assert(args[1] == RedisTelemetryStorage._TELEMETRY_EXCEPTIONS_KEY)

0 commit comments

Comments
 (0)