@@ -74,7 +74,7 @@ def record_unique_keys(self, uniques):
7474 :param uniques: unique keys disctionary
7575 :type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
7676 """
77- bulk_mtks = self . _uniques_formatter (uniques )
77+ bulk_mtks = _uniques_formatter (uniques )
7878 try :
7979 inserted = self ._redis_client .rpush (self .MTK_QUEUE_KEY , * bulk_mtks )
8080 self ._expire_keys (self .MTK_QUEUE_KEY , self .MTK_KEY_DEFAULT_TTL , inserted , len (bulk_mtks ))
@@ -119,14 +119,79 @@ def _expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
119119 if total_keys == inserted :
120120 self ._redis_client .expire (queue_key , key_default_ttl )
121121
122- def _uniques_formatter (self , uniques ):
122+ class PluggableSenderAdapter (ImpressionsSenderAdapter ):
123+ """In Memory Impressions Sender Adapter class."""
124+
125+ MTK_QUEUE_KEY = 'SPLITIO.uniquekeys'
126+ MTK_KEY_DEFAULT_TTL = 3600
127+ IMP_COUNT_QUEUE_KEY = 'SPLITIO.impressions.count'
128+ IMP_COUNT_KEY_DEFAULT_TTL = 3600
129+
130+ def __init__ (self , adapter_client ):
123131 """
124- Format the unique keys dictionary array to a JSON body
132+ Initialize pluggable sender adapter instance
133+
134+ :param telemtry_http_client: instance of telemetry http api
135+ :type telemtry_http_client: splitio.api.telemetry.TelemetryAPI
136+ """
137+ self ._adapter_client = adapter_client
138+
139+ def record_unique_keys (self , uniques ):
140+ """
141+ post the unique keys to storage.
125142
126143 :param uniques: unique keys disctionary
127144 :type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
145+ """
146+ bulk_mtks = _uniques_formatter (uniques )
147+ try :
148+ inserted = self ._adapter_client .push_items (self .MTK_QUEUE_KEY , * bulk_mtks )
149+ self ._expire_keys (self .MTK_QUEUE_KEY , self .MTK_KEY_DEFAULT_TTL , inserted , len (bulk_mtks ))
150+ return True
151+ except RedisAdapterException :
152+ _LOGGER .error ('Something went wrong when trying to add mtks to storage adapter' )
153+ _LOGGER .error ('Error: ' , exc_info = True )
154+ return False
128155
129- :return: unique keys JSON array
130- :rtype: json
156+ def flush_counters (self , to_send ):
131157 """
132- return [json .dumps ({'f' : feature , 'ks' : list (keys )}) for feature , keys in uniques .items ()]
158+ post the impression counters to storage.
159+
160+ :param to_send: unique keys disctionary
161+ :type to_send: Dictionary {'feature1': set(), 'feature2': set(), .. }
162+ """
163+ try :
164+ resulted = 0
165+ for pf_count in to_send :
166+ resulted = self ._adapter_client .increment (self .IMP_COUNT_QUEUE_KEY + "." + pf_count .feature + "::" + str (pf_count .timeframe ), pf_count .count )
167+ self ._expire_keys (self .IMP_COUNT_QUEUE_KEY + "." + pf_count .feature + "::" + str (pf_count .timeframe ),
168+ self .IMP_COUNT_KEY_DEFAULT_TTL , resulted , pf_count .count )
169+ return True
170+ except RedisAdapterException :
171+ _LOGGER .error ('Something went wrong when trying to add counters to storage adapter' )
172+ _LOGGER .error ('Error: ' , exc_info = True )
173+ return False
174+
175+ def _expire_keys (self , queue_key , key_default_ttl , total_keys , inserted ):
176+ """
177+ Set expire
178+
179+ :param total_keys: length of keys.
180+ :type total_keys: int
181+ :param inserted: added keys.
182+ :type inserted: int
183+ """
184+ if total_keys == inserted :
185+ self ._adapter_client .expire (queue_key , key_default_ttl )
186+
187+ def _uniques_formatter (uniques ):
188+ """
189+ Format the unique keys dictionary array to a JSON body
190+
191+ :param uniques: unique keys disctionary
192+ :type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
193+
194+ :return: unique keys JSON array
195+ :rtype: json
196+ """
197+ return [json .dumps ({'f' : feature , 'ks' : list (keys )}) for feature , keys in uniques .items ()]
0 commit comments