Skip to content

Commit ab2623b

Browse files
authored
Merge pull request #242 from splitio/task/trhotling
Task/throttling
2 parents 3d5925d + cea74cb commit ab2623b

File tree

4 files changed

+61
-7
lines changed

4 files changed

+61
-7
lines changed

splitio/client/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77

88
_LOGGER = logging.getLogger(__name__)
9+
DEFAULT_DATA_SAMPLING = 1
910

1011

1112
DEFAULT_CONFIG = {
@@ -52,6 +53,7 @@
5253
'machineIp': None,
5354
'splitFile': os.path.join(os.path.expanduser('~'), '.split'),
5455
'preforkedInitialization': False,
56+
'dataSampling': DEFAULT_DATA_SAMPLING,
5557
}
5658

5759

splitio/client/factory.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from splitio.client.client import Client
99
from splitio.client import input_validator
1010
from splitio.client.manager import SplitManager
11-
from splitio.client.config import sanitize as sanitize_config
11+
from splitio.client.config import sanitize as sanitize_config, DEFAULT_DATA_SAMPLING
1212
from splitio.client import util
1313
from splitio.client.listener import ImpressionListenerWrapper
1414
from splitio.engine.impressions import Manager as ImpressionsManager
@@ -53,6 +53,7 @@
5353
_LOGGER = logging.getLogger(__name__)
5454
_INSTANTIATED_FACTORIES = Counter()
5555
_INSTANTIATED_FACTORIES_LOCK = threading.RLock()
56+
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED = 0.1 # 10%
5657

5758

5859
class Status(Enum):
@@ -387,12 +388,18 @@ def _build_redis_factory(api_key, cfg):
387388
'impressions': RedisImpressionsStorage(redis_adapter, sdk_metadata),
388389
'events': RedisEventsStorage(redis_adapter, sdk_metadata),
389390
}
391+
data_sampling = cfg.get('dataSampling', DEFAULT_DATA_SAMPLING)
392+
if data_sampling < _MIN_DEFAULT_DATA_SAMPLING_ALLOWED:
393+
_LOGGER.warning("dataSampling cannot be less than %f, defaulting to minimum",
394+
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED)
395+
data_sampling = _MIN_DEFAULT_DATA_SAMPLING_ALLOWED
390396
recorder = PipelinedRecorder(
391397
redis_adapter.pipeline,
392398
ImpressionsManager(cfg['impressionsMode'], False,
393399
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata)),
394400
storages['events'],
395401
storages['impressions'],
402+
data_sampling,
396403
)
397404
return SplitFactory(
398405
api_key,

splitio/recorder/recorder.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
"""Stats Recorder."""
22
import abc
33
import logging
4+
import random
5+
6+
7+
from splitio.client.config import DEFAULT_DATA_SAMPLING
48

59

610
_LOGGER = logging.getLogger(__name__)
@@ -83,7 +87,8 @@ def record_track_stats(self, event):
8387
class PipelinedRecorder(StatsRecorder):
8488
"""PipelinedRecorder class."""
8589

86-
def __init__(self, pipe, impressions_manager, event_storage, impression_storage):
90+
def __init__(self, pipe, impressions_manager, event_storage,
91+
impression_storage, data_sampling=DEFAULT_DATA_SAMPLING):
8792
"""
8893
Class constructor.
8994
@@ -95,11 +100,14 @@ def __init__(self, pipe, impressions_manager, event_storage, impression_storage)
95100
:type event_storage: splitio.storage.EventStorage
96101
:param impression_storage: impression storage instance
97102
:type impression_storage: splitio.storage.redis.RedisImpressionsStorage
103+
:param data_sampling: data sampling factor
104+
:type data_sampling: number
98105
"""
99106
self._make_pipe = pipe
100107
self._impressions_manager = impressions_manager
101108
self._event_sotrage = event_storage
102109
self._impression_storage = impression_storage
110+
self._data_sampling = data_sampling
103111

104112
def record_treatment_stats(self, impressions, latency, operation):
105113
"""
@@ -113,11 +121,21 @@ def record_treatment_stats(self, impressions, latency, operation):
113121
:type operation: str
114122
"""
115123
try:
124+
# TODO @matias.melograno
125+
# Changing logic until TelemetryV2 released to avoid using pipelined operations
126+
# Deprecated Old Telemetry
127+
if self._data_sampling < DEFAULT_DATA_SAMPLING:
128+
rnumber = random.uniform(0, 1)
129+
if self._data_sampling < rnumber:
130+
return
116131
impressions = self._impressions_manager.process_impressions(impressions)
117-
pipe = self._make_pipe()
118-
self._impression_storage.add_impressions_to_pipe(impressions, pipe)
132+
# pipe = self._make_pipe()
133+
# self._impression_storage.add_impressions_to_pipe(impressions, pipe)
119134
# self._telemetry_storage.add_latency_to_pipe(operation, latency, pipe)
120-
result = pipe.execute()
135+
# result = pipe.execute()
136+
# if len(result) == 2:
137+
# self._impression_storage.expire_key(result[0], len(impressions))
138+
result = self._impression_storage.put(impressions)
121139
if len(result) == 2:
122140
self._impression_storage.expire_key(result[0], len(impressions))
123141
except Exception: # pylint: disable=broad-except

tests/recorder/test_recorder.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,35 @@ def test_pipelined_recorder(self, mocker):
3636
impmanager = mocker.Mock(spec=ImpressionsManager)
3737
impmanager.process_impressions.return_value = impressions
3838
event = mocker.Mock(spec=EventStorage)
39-
impression = mocker.Mock(spec=ImpressionPipelinedStorage)
39+
impression = mocker.Mock(spec=ImpressionStorage)
4040
recorder = PipelinedRecorder(redis, impmanager, event, impression)
4141
recorder.record_treatment_stats(impressions, 1, 'some')
42+
assert recorder._impression_storage.put.mock_calls[0][1][0] == impressions
43+
44+
# TODO @matias.melograno Commented until we implement TelemetryV2
45+
# assert recorder._impression_storage.add_impressions_to_pipe.mock_calls[0][1][0] == impressions
46+
# assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][0] == 'some'
47+
# assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][1] == 1
48+
49+
50+
def test_sampled_recorder(self, mocker):
51+
impressions = [
52+
Impression('k1', 'f1', 'on', 'l1', 123, None, None),
53+
Impression('k1', 'f2', 'on', 'l1', 123, None, None)
54+
]
55+
redis = mocker.Mock(spec=RedisAdapter)
56+
impmanager = mocker.Mock(spec=ImpressionsManager)
57+
impmanager.process_impressions.return_value = impressions
58+
event = mocker.Mock(spec=EventStorage)
59+
impression = mocker.Mock(spec=ImpressionStorage)
60+
recorder = PipelinedRecorder(redis, impmanager, event, impression, 0.5)
61+
62+
def put(x):
63+
return
64+
65+
recorder._impression_storage.put.side_effect = put
4266

43-
assert recorder._impression_storage.add_impressions_to_pipe.mock_calls[0][1][0] == impressions
67+
for _ in range(100):
68+
recorder.record_treatment_stats(impressions, 1, 'some')
69+
print(recorder._impression_storage.put.call_count)
70+
assert recorder._impression_storage.put.call_count < 80

0 commit comments

Comments
 (0)