Skip to content

Commit b4ced4f

Browse files
author
Matias Melograno
committed
data_throttlong for imp
1 parent 6b485b4 commit b4ced4f

File tree

4 files changed

+27
-11
lines changed

4 files changed

+27
-11
lines changed

splitio/client/config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77

88
_LOGGER = logging.getLogger(__name__)
9+
DEFAULT_DATA_THROTTLING = 1
910

1011

1112
DEFAULT_CONFIG = {
@@ -52,7 +53,7 @@
5253
'machineIp': None,
5354
'splitFile': os.path.join(os.path.expanduser('~'), '.split'),
5455
'preforkedInitialization': False,
55-
'dataThrotling': 1,
56+
'dataThrottling': DEFAULT_DATA_THROTTLING,
5657
}
5758

5859

splitio/client/factory.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from splitio.client.client import Client
99
from splitio.client import input_validator
1010
from splitio.client.manager import SplitManager
11-
from splitio.client.config import sanitize as sanitize_config
11+
from splitio.client.config import sanitize as sanitize_config, DEFAULT_DATA_THROTTLING
1212
from splitio.client import util
1313
from splitio.client.listener import ImpressionListenerWrapper
1414
from splitio.engine.impressions import Manager as ImpressionsManager
@@ -57,6 +57,7 @@
5757
_LOGGER = logging.getLogger(__name__)
5858
_INSTANTIATED_FACTORIES = Counter()
5959
_INSTANTIATED_FACTORIES_LOCK = threading.RLock()
60+
_MIN_DEFAULT_DATA_THROTTLING_ALLOWED = 0.1 # 10%
6061

6162

6263
class Status(Enum):
@@ -401,13 +402,19 @@ def _build_redis_factory(api_key, cfg):
401402
'events': RedisEventsStorage(redis_adapter, sdk_metadata),
402403
'telemetry': RedisTelemetryStorage(redis_adapter, sdk_metadata)
403404
}
405+
data_throttling = cfg.get('dataThrottling', DEFAULT_DATA_THROTTLING)
406+
if data_throttling < _MIN_DEFAULT_DATA_THROTTLING_ALLOWED:
407+
_LOGGER.warning("dataThrottling cannot be less than %f, defaulting to minimum",
408+
_MIN_DEFAULT_DATA_THROTTLING_ALLOWED)
409+
data_throttling = _MIN_DEFAULT_DATA_THROTTLING_ALLOWED
404410
recorder = PipelinedRecorder(
405411
redis_adapter.pipeline,
406412
ImpressionsManager(cfg['impressionsMode'], False,
407413
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata)),
408414
storages['telemetry'],
409415
storages['events'],
410416
storages['impressions'],
417+
data_throttling,
411418
)
412419
return SplitFactory(
413420
api_key,

splitio/recorder/recorder.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
import random
55

66

7+
from splitio.client.config import DEFAULT_DATA_THROTTLING
8+
9+
710
_LOGGER = logging.getLogger(__name__)
8-
_MIN_THROTLING = 1
911

1012

1113
class StatsRecorder(object, metaclass=abc.ABCMeta):
@@ -89,7 +91,8 @@ def record_track_stats(self, event):
8991
class PipelinedRecorder(StatsRecorder):
9092
"""PipelinedRecorder class."""
9193

92-
def __init__(self, pipe, impressions_manager, telemetry_storage, event_storage, impression_storage, data_throtling):
94+
def __init__(self, pipe, impressions_manager, telemetry_storage, event_storage,
95+
impression_storage, data_throttling=DEFAULT_DATA_THROTTLING):
9396
"""
9497
Class constructor.
9598
@@ -103,13 +106,15 @@ def __init__(self, pipe, impressions_manager, telemetry_storage, event_storage,
103106
:type event_storage: splitio.storage.EventStorage
104107
:param impression_storage: impression storage instance
105108
:type impression_storage: splitio.storage.redis.RedisImpressionsStorage
109+
:param data_throttling: data throttling factor
110+
:type data_throttling: number
106111
"""
107112
self._make_pipe = pipe
108113
self._impressions_manager = impressions_manager
109114
self._telemetry_storage = telemetry_storage
110115
self._event_sotrage = event_storage
111116
self._impression_storage = impression_storage
112-
self._data_trothling = data_throtling
117+
self._data_throttling = data_throttling
113118

114119
def record_treatment_stats(self, impressions, latency, operation):
115120
"""
@@ -123,11 +128,12 @@ def record_treatment_stats(self, impressions, latency, operation):
123128
:type operation: str
124129
"""
125130
try:
131+
# TODO @matias.melograno
126132
# Changing logic until TelemetryV2 released to avoid using pipelined operations
127133
# Deprecated Old Telemetry
128-
if self._data_trothling < _MIN_THROTLING:
134+
if self._data_throttling < DEFAULT_DATA_THROTTLING:
129135
rnumber = random.uniform(0, 1)
130-
if self._data_trothling > rnumber:
136+
if self._data_throttling > rnumber:
131137
return
132138
impressions = self._impressions_manager.process_impressions(impressions)
133139
self._impression_storage.put(impressions)

tests/recorder/test_recorder.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,12 @@ def test_pipelined_recorder(self, mocker):
3939
impmanager.process_impressions.return_value = impressions
4040
telemetry = mocker.Mock(spec=TelemetryPipelinedStorage)
4141
event = mocker.Mock(spec=EventStorage)
42-
impression = mocker.Mock(spec=ImpressionPipelinedStorage)
42+
impression = mocker.Mock(spec=ImpressionStorage)
4343
recorder = PipelinedRecorder(redis, impmanager, telemetry, event, impression)
4444
recorder.record_treatment_stats(impressions, 1, 'some')
4545

46-
assert recorder._impression_storage.add_impressions_to_pipe.mock_calls[0][1][0] == impressions
47-
assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][0] == 'some'
48-
assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][1] == 1
46+
# TODO @matias.melograno Commented until we implement TelemetryV2
47+
# assert recorder._impression_storage.add_impressions_to_pipe.mock_calls[0][1][0] == impressions
48+
# assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][0] == 'some'
49+
# assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][1] == 1
50+
assert recorder._impression_storage.put.mock_calls[0][1][0] == impressions

0 commit comments

Comments
 (0)