Skip to content

Commit 1346d92

Browse files
committed
Changed event and telemetry to use pipe
1 parent fb0d244 commit 1346d92

File tree

2 files changed

+76
-40
lines changed

2 files changed

+76
-40
lines changed

splitio/recorder/recorder.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,6 @@ def record_treatment_stats(self, impressions, latency, operation, method_name):
126126
:type operation: str
127127
"""
128128
try:
129-
# TODO @matias.melograno
130-
# Changing logic until TelemetryV2 released to avoid using pipelined operations
131-
# Deprecated Old Telemetry
132129
if self._data_sampling < DEFAULT_DATA_SAMPLING:
133130
rnumber = random.uniform(0, 1)
134131
if self._data_sampling < rnumber:
@@ -138,11 +135,12 @@ def record_treatment_stats(self, impressions, latency, operation, method_name):
138135
return
139136
pipe = self._make_pipe()
140137
self._impression_storage.add_impressions_to_pipe(impressions, pipe)
138+
if method_name is not None:
139+
self._telemetry_redis_storage.add_latency_to_pipe(method_name[4:], latency, pipe)
141140
result = pipe.execute()
142141
if len(result) == 2:
143142
self._impression_storage.expire_key(result[0], len(impressions))
144-
if method_name is not None:
145-
self._telemetry_redis_storage.record_latency(method_name[4:], latency)
143+
self._telemetry_redis_storage.expire_latency_keys(result[1], latency)
146144
except Exception: # pylint: disable=broad-except
147145
_LOGGER.error('Error recording impressions')
148146
_LOGGER.debug('Error: ', exc_info=True)
@@ -154,5 +152,10 @@ def record_track_stats(self, event, latency):
154152
:param event: events tracked
155153
:type event: splitio.models.events.EventWrapper
156154
"""
157-
self._telemetry_redis_storage.record_latency(MethodExceptionsAndLatencies.TRACK.value, latency)
158-
return self._event_sotrage.put(event)
155+
pipe = self._make_pipe()
156+
rc = self._event_sotrage.add_events_to_pipe(event, pipe)
157+
self._telemetry_redis_storage.add_latency_to_pipe(MethodExceptionsAndLatencies.TRACK.value, latency, pipe)
158+
result = pipe.execute()
159+
self._event_sotrage.expire_keys(result[0], len(event))
160+
self._telemetry_redis_storage.expire_latency_keys(result[1], latency)
161+
return rc

splitio/storage/redis.py

Lines changed: 66 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,6 @@ def expire_key(self, total_keys, inserted):
421421
:type inserted: int
422422
"""
423423
if total_keys == inserted:
424-
_LOGGER.debug("SET EXPIRE KEY FOR QUEUE")
425424
self._redis.expire(self.IMPRESSIONS_QUEUE_KEY, self.IMPRESSIONS_KEY_DEFAULT_TTL)
426425

427426
def add_impressions_to_pipe(self, impressions, pipe):
@@ -475,7 +474,8 @@ def clear(self):
475474
class RedisEventsStorage(EventStorage):
476475
"""Redis based event storage class."""
477476

478-
_KEY_TEMPLATE = 'SPLITIO.events'
477+
_EVENTS_KEY_TEMPLATE = 'SPLITIO.events'
478+
_EVENTS_KEY_DEFAULT_TTL = 3600
479479

480480
def __init__(self, redis_client, sdk_metadata):
481481
"""
@@ -489,6 +489,38 @@ def __init__(self, redis_client, sdk_metadata):
489489
self._redis = redis_client
490490
self._sdk_metadata = sdk_metadata
491491

492+
def add_events_to_pipe(self, events, pipe):
493+
"""
494+
Add put operation to pipeline
495+
496+
:param impressions: List of one or more impressions to store.
497+
:type impressions: list
498+
:param pipe: Redis pipe.
499+
:type pipe: redis.pipe
500+
"""
501+
bulk_events = self._wrap_events(events)
502+
pipe.rpush(self._EVENTS_KEY_TEMPLATE, *bulk_events)
503+
504+
def _wrap_events(self, events):
505+
return [
506+
json.dumps({
507+
'e': {
508+
'key': e.event.key,
509+
'trafficTypeName': e.event.traffic_type_name,
510+
'eventTypeId': e.event.event_type_id,
511+
'value': e.event.value,
512+
'timestamp': e.event.timestamp,
513+
'properties': e.event.properties,
514+
},
515+
'm': {
516+
's': self._sdk_metadata.sdk_version,
517+
'n': self._sdk_metadata.instance_name,
518+
'i': self._sdk_metadata.instance_ip,
519+
}
520+
})
521+
for e in events
522+
]
523+
492524
def put(self, events):
493525
"""
494526
Add an event to the redis storage.
@@ -499,25 +531,8 @@ def put(self, events):
499531
:return: Whether the event has been added or not.
500532
:rtype: bool
501533
"""
502-
key = self._KEY_TEMPLATE
503-
to_store = [
504-
json.dumps({
505-
'e': {
506-
'key': e.event.key,
507-
'trafficTypeName': e.event.traffic_type_name,
508-
'eventTypeId': e.event.event_type_id,
509-
'value': e.event.value,
510-
'timestamp': e.event.timestamp,
511-
'properties': e.event.properties,
512-
},
513-
'm': {
514-
's': self._sdk_metadata.sdk_version,
515-
'n': self._sdk_metadata.instance_name,
516-
'i': self._sdk_metadata.instance_ip,
517-
}
518-
})
519-
for e in events
520-
]
534+
key = self._EVENTS_KEY_TEMPLATE
535+
to_store = self._wrap_events(events)
521536
try:
522537
self._redis.rpush(key, *to_store)
523538
return True
@@ -541,12 +556,24 @@ def clear(self):
541556
"""
542557
raise NotImplementedError('Not supported for redis.')
543558

559+
def expire_keys(self, total_keys, inserted):
560+
"""
561+
Set expire
562+
563+
:param total_keys: length of keys.
564+
:type total_keys: int
565+
:param inserted: added keys.
566+
:type inserted: int
567+
"""
568+
if total_keys == inserted:
569+
self._redis.expire(self._EVENTS_KEY_TEMPLATE, self._EVENTS_KEY_DEFAULT_TTL)
570+
544571
class RedisTelemetryStorage(TelemetryStorage):
545572
"""Redis based telemetry storage class."""
546573

547-
TELEMETRY_LATENCIES_KEY = 'SPLITIO.telemetry.latencies'
548-
TELEMETRY_EXCEPTIONS_KEY = 'SPLITIO.telemetry.exceptions'
549-
TELEMETRY_KEY_DEFAULT_TTL = 3600
574+
_TELEMETRY_LATENCIES_KEY = 'SPLITIO.telemetry.latencies'
575+
_TELEMETRY_EXCEPTIONS_KEY = 'SPLITIO.telemetry.exceptions'
576+
_TELEMETRY_KEY_DEFAULT_TTL = 3600
550577

551578
def __init__(self, redis_client, sdk_metadata):
552579
"""
@@ -565,7 +592,7 @@ def __init__(self, redis_client, sdk_metadata):
565592
self.host_ip = get_ip()
566593
self.host_name = get_hostname()
567594
self._make_pipe = redis_client.pipeline
568-
595+
569596
def record_config(self, config, extra_config):
570597
"""
571598
initilize telemetry objects
@@ -580,7 +607,7 @@ def record_active_and_redundant_factories(self, active_factory_count, redundant_
580607
self._tel_config.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
581608
self._redis_client.record_init(self._tel_config.get_stats())
582609

583-
def record_latency(self, method, latency):
610+
def add_latency_to_pipe(self, method, latency, pipe):
584611
"""
585612
record latency data
586613
@@ -594,17 +621,20 @@ def record_latency(self, method, latency):
594621
self._method_latencies.add_latency(get_method_constant(method), latency)
595622
latencies = self._method_latencies.pop_all()['methodLatencies']
596623
values = latencies[method]
597-
pipe = self._make_pipe()
598624
total_keys = 0
599625
bucket_number = 0
600626
for bucket in values:
601627
if bucket > 0:
602-
pipe.hincrby(self.TELEMETRY_LATENCIES_KEY, 'python-' + __version__ + '/' + self.host_name+ '/' + self.host_ip + '/' +
628+
pipe.hincrby(self._TELEMETRY_LATENCIES_KEY, 'python-' + __version__ + '/' + self.host_name+ '/' + self.host_ip + '/' +
603629
method + '/' + str(bucket_number), bucket)
604630
total_keys += 1
605631
bucket_number = bucket_number + 0
606-
result = pipe.execute()
607-
self._expire_keys(self.TELEMETRY_LATENCIES_KEY, self.TELEMETRY_KEY_DEFAULT_TTL, total_keys, result[0])
632+
633+
def record_latency(self, method, latency):
634+
"""
635+
Not implemented
636+
"""
637+
raise NotImplementedError('Only redis pipe is used.')
608638

609639
def record_exception(self, method):
610640
"""
@@ -614,10 +644,10 @@ def record_exception(self, method):
614644
:type method: string
615645
"""
616646
pipe = self._make_pipe()
617-
pipe.hincrby(self.TELEMETRY_EXCEPTIONS_KEY, 'python-' + __version__ + '/' + self.host_name+ '/' + self.host_ip + '/' +
647+
pipe.hincrby(self._TELEMETRY_EXCEPTIONS_KEY, 'python-' + __version__ + '/' + self.host_name+ '/' + self.host_ip + '/' +
618648
method.value, 1)
619649
result = pipe.execute()
620-
self._expire_keys(self.TELEMETRY_EXCEPTIONS_KEY, self.TELEMETRY_KEY_DEFAULT_TTL, 1, result[0])
650+
self._expire_keys(self._TELEMETRY_EXCEPTIONS_KEY, self._TELEMETRY_KEY_DEFAULT_TTL, 1, result[0])
621651

622652
def record_not_ready_usage(self):
623653
"""
@@ -636,7 +666,10 @@ def record_bur_time_out(self):
636666
def record_impression_stats(self, data_type, count):
637667
pass
638668

639-
def _expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
669+
def expire_latency_keys(self, total_keys, inserted):
670+
self.expire_keys(self._TELEMETRY_LATENCIES_KEY, self._TELEMETRY_KEY_DEFAULT_TTL, total_keys, inserted)
671+
672+
def expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
640673
"""
641674
Set expire
642675

0 commit comments

Comments
 (0)