Skip to content

Commit 82c1483

Browse files
authored
Merge pull request #331 from splitio/pluggable-mtk-adapter
Added Pluggable support for MTK to sender adapter
2 parents 83822d5 + 01362fa commit 82c1483

File tree

2 files changed

+123
-9
lines changed

2 files changed

+123
-9
lines changed

splitio/engine/impressions/adapters.py

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def record_unique_keys(self, uniques):
7474
:param uniques: unique keys disctionary
7575
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
7676
"""
77-
bulk_mtks = self._uniques_formatter(uniques)
77+
bulk_mtks = _uniques_formatter(uniques)
7878
try:
7979
inserted = self._redis_client.rpush(self.MTK_QUEUE_KEY, *bulk_mtks)
8080
self._expire_keys(self.MTK_QUEUE_KEY, self.MTK_KEY_DEFAULT_TTL, inserted, len(bulk_mtks))
@@ -119,14 +119,82 @@ def _expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
119119
if total_keys == inserted:
120120
self._redis_client.expire(queue_key, key_default_ttl)
121121

122-
def _uniques_formatter(self, uniques):
122+
class PluggableSenderAdapter(ImpressionsSenderAdapter):
123+
"""In Memory Impressions Sender Adapter class."""
124+
125+
MTK_QUEUE_KEY = 'SPLITIO.uniquekeys'
126+
MTK_KEY_DEFAULT_TTL = 3600
127+
IMP_COUNT_QUEUE_KEY = 'SPLITIO.impressions.count'
128+
IMP_COUNT_KEY_DEFAULT_TTL = 3600
129+
130+
def __init__(self, adapter_client, prefix=None):
123131
"""
124-
Format the unique keys dictionary array to a JSON body
132+
Initialize pluggable sender adapter instance
133+
134+
:param telemtry_http_client: instance of telemetry http api
135+
:type telemtry_http_client: splitio.api.telemetry.TelemetryAPI
136+
"""
137+
self._adapter_client = adapter_client
138+
self._prefix = ""
139+
if prefix is not None:
140+
self._prefix = prefix + "."
141+
142+
def record_unique_keys(self, uniques):
143+
"""
144+
post the unique keys to storage.
125145
126146
:param uniques: unique keys disctionary
127147
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
148+
"""
149+
bulk_mtks = _uniques_formatter(uniques)
150+
try:
151+
inserted = self._adapter_client.push_items(self.MTK_QUEUE_KEY, *bulk_mtks)
152+
self._expire_keys(self._prefix + self.MTK_QUEUE_KEY, self.MTK_KEY_DEFAULT_TTL, inserted, len(bulk_mtks))
153+
return True
154+
except RedisAdapterException:
155+
_LOGGER.error('Something went wrong when trying to add mtks to storage adapter')
156+
_LOGGER.error('Error: ', exc_info=True)
157+
return False
128158

129-
:return: unique keys JSON array
130-
:rtype: json
159+
def flush_counters(self, to_send):
131160
"""
132-
return [json.dumps({'f': feature, 'ks': list(keys)}) for feature, keys in uniques.items()]
161+
post the impression counters to storage.
162+
163+
:param to_send: unique keys disctionary
164+
:type to_send: Dictionary {'feature1': set(), 'feature2': set(), .. }
165+
"""
166+
try:
167+
resulted = 0
168+
for pf_count in to_send:
169+
resulted = self._adapter_client.increment(self._prefix + self.IMP_COUNT_QUEUE_KEY + "." + pf_count.feature + "::" + str(pf_count.timeframe), pf_count.count)
170+
self._expire_keys(self._prefix + self.IMP_COUNT_QUEUE_KEY + "." + pf_count.feature + "::" + str(pf_count.timeframe),
171+
self.IMP_COUNT_KEY_DEFAULT_TTL, resulted, pf_count.count)
172+
return True
173+
except RedisAdapterException:
174+
_LOGGER.error('Something went wrong when trying to add counters to storage adapter')
175+
_LOGGER.error('Error: ', exc_info=True)
176+
return False
177+
178+
def _expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
179+
"""
180+
Set expire
181+
182+
:param total_keys: length of keys.
183+
:type total_keys: int
184+
:param inserted: added keys.
185+
:type inserted: int
186+
"""
187+
if total_keys == inserted:
188+
self._adapter_client.expire(queue_key, key_default_ttl)
189+
190+
def _uniques_formatter(uniques):
191+
"""
192+
Format the unique keys dictionary array to a JSON body
193+
194+
:param uniques: unique keys disctionary
195+
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
196+
197+
:return: unique keys JSON array
198+
:rtype: json
199+
"""
200+
return [json.dumps({'f': feature, 'ks': list(keys)}) for feature, keys in uniques.items()]

tests/engine/test_send_adapters.py

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
import unittest.mock as mock
22
import ast
3+
import json
4+
import pytest
35

4-
from splitio.engine.impressions.adapters import InMemorySenderAdapter, RedisSenderAdapter
6+
from splitio.engine.impressions.adapters import InMemorySenderAdapter, RedisSenderAdapter, PluggableSenderAdapter
7+
from splitio.engine.impressions import adapters
58
from splitio.api.telemetry import TelemetryAPI
69
from splitio.storage.adapters.redis import RedisAdapter
710
from splitio.engine.impressions.manager import Counter
11+
from tests.storage.test_pluggable import StorageMockAdapter
12+
813

914
class InMemorySenderAdapterTests(object):
1015
"""In memory sender adapter test."""
@@ -52,9 +57,8 @@ def test_uniques_formatter(self, mocker):
5257
{'f': 'feature2', 'ks': ['key6', 'key1', 'key10']},
5358
]
5459

55-
sender_adapter = RedisSenderAdapter(mocker.Mock())
5660
for i in range(0,1):
57-
assert(sorted(ast.literal_eval(sender_adapter._uniques_formatter(uniques)[i])["ks"]) == sorted(formatted[i]["ks"]))
61+
assert(sorted(ast.literal_eval(adapters._uniques_formatter(uniques)[i])["ks"]) == sorted(formatted[i]["ks"]))
5862

5963
@mock.patch('splitio.storage.adapters.redis.RedisAdapter.rpush')
6064
def test_record_unique_keys(self, mocker):
@@ -98,3 +102,45 @@ def test_expire_keys(self, mocker):
98102
inserted = 100
99103
sender_adapter._expire_keys(mocker.Mock(), mocker.Mock(), total_keys, inserted)
100104
assert(mocker.called)
105+
106+
class PluggableSenderAdapterTests(object):
107+
"""Pluggable sender adapter test."""
108+
109+
def test_record_unique_keys(self, mocker):
110+
"""Test sending unique keys."""
111+
adapter = StorageMockAdapter()
112+
sender_adapter = PluggableSenderAdapter(adapter)
113+
114+
uniques = {"feature1": set({"key1", "key2", "key3"}),
115+
"feature2": set({"key1", "key6", "key10"}),
116+
}
117+
formatted = [
118+
'{"f": "feature1", "ks": ["key3", "key2", "key1"]}',
119+
'{"f": "feature2", "ks": ["key1", "key10", "key6"]}',
120+
]
121+
122+
sender_adapter.record_unique_keys(uniques)
123+
assert(sorted(json.loads(adapter._keys[sender_adapter.MTK_QUEUE_KEY][0])["ks"]) == sorted(json.loads(formatted[0])["ks"]))
124+
assert(sorted(json.loads(adapter._keys[sender_adapter.MTK_QUEUE_KEY][1])["ks"]) == sorted(json.loads(formatted[1])["ks"]))
125+
assert(json.loads(adapter._keys[sender_adapter.MTK_QUEUE_KEY][0])["f"] == "feature1")
126+
assert(json.loads(adapter._keys[sender_adapter.MTK_QUEUE_KEY][1])["f"] == "feature2")
127+
assert(adapter._expire[sender_adapter.MTK_QUEUE_KEY] == sender_adapter.MTK_KEY_DEFAULT_TTL)
128+
sender_adapter.record_unique_keys(uniques)
129+
assert(adapter._expire[sender_adapter.MTK_QUEUE_KEY] != -1)
130+
131+
def test_flush_counters(self, mocker):
132+
"""Test sending counters."""
133+
adapter = StorageMockAdapter()
134+
sender_adapter = PluggableSenderAdapter(adapter)
135+
136+
counters = [
137+
Counter.CountPerFeature('f1', 123, 2),
138+
Counter.CountPerFeature('f2', 123, 123),
139+
]
140+
141+
sender_adapter.flush_counters(counters)
142+
assert(adapter._keys[sender_adapter.IMP_COUNT_QUEUE_KEY + "." + 'f1::123'] == 2)
143+
assert(adapter._keys[sender_adapter.IMP_COUNT_QUEUE_KEY + "." + 'f2::123'] == 123)
144+
assert(adapter._expire[sender_adapter.IMP_COUNT_QUEUE_KEY + "." + 'f1::123'] == sender_adapter.IMP_COUNT_KEY_DEFAULT_TTL)
145+
sender_adapter.flush_counters(counters)
146+
assert(adapter._expire[sender_adapter.IMP_COUNT_QUEUE_KEY + "." + 'f2::123'] == sender_adapter.IMP_COUNT_KEY_DEFAULT_TTL)

0 commit comments

Comments
 (0)