11"""Stats Recorder."""
22import abc
33import logging
4+ import random
45
56
67_LOGGER = logging .getLogger (__name__ )
8+ _MIN_THROTLING = 1
79
810
911class StatsRecorder (object , metaclass = abc .ABCMeta ):
@@ -87,7 +89,7 @@ def record_track_stats(self, event):
8789class PipelinedRecorder (StatsRecorder ):
8890 """PipelinedRecorder class."""
8991
90- def __init__ (self , pipe , impressions_manager , telemetry_storage , event_storage , impression_storage ):
92+ def __init__ (self , pipe , impressions_manager , telemetry_storage , event_storage , impression_storage , data_throtling ):
9193 """
9294 Class constructor.
9395
@@ -107,6 +109,7 @@ def __init__(self, pipe, impressions_manager, telemetry_storage, event_storage,
107109 self ._telemetry_storage = telemetry_storage
108110 self ._event_sotrage = event_storage
109111 self ._impression_storage = impression_storage
112+ self ._data_trothling = data_throtling
110113
111114 def record_treatment_stats (self , impressions , latency , operation ):
112115 """
@@ -120,15 +123,22 @@ def record_treatment_stats(self, impressions, latency, operation):
120123 :type operation: str
121124 """
122125 try :
126+ # Changing logic until TelemetryV2 released to avoid using pipelined operations
127+ # Deprecated Old Telemetry
128+ if self ._data_trothling < _MIN_THROTLING :
129+ rnumber = random .uniform (0 , 1 )
130+ if self ._data_trothling > rnumber :
131+ return
123132 impressions = self ._impressions_manager .process_impressions (impressions )
124- pipe = self ._make_pipe ()
125- self ._impression_storage .add_impressions_to_pipe (impressions , pipe )
126- self ._telemetry_storage .add_latency_to_pipe (operation , latency , pipe )
127- result = pipe .execute ()
128- if len (result ) == 2 :
129- self ._impression_storage .expire_key (result [0 ], len (impressions ))
133+ self ._impression_storage .put (impressions )
134+ # pipe = self._make_pipe()
135+ # self._impression_storage.add_impressions_to_pipe(impressions, pipe)
136+ # self._telemetry_storage.add_latency_to_pipe(operation, latency, pipe)
137+ # result = pipe.execute()
138+ # if len(result) == 2:
139+ # self._impression_storage.expire_key(result[0], len(impressions))
130140 except Exception : # pylint: disable=broad-except
131- _LOGGER .error ('Error recording impressions and metrics ' )
141+ _LOGGER .error ('Error recording impressions' )
132142 _LOGGER .debug ('Error: ' , exc_info = True )
133143
134144 def record_track_stats (self , event ):
0 commit comments