Skip to content

Commit 21737af

Browse files
authored
Merge pull request #548 from splitio/impression-toggle-models
updated models and recorder
2 parents ac54e59 + 2bd9704 commit 21737af

File tree

8 files changed

+186
-68
lines changed

8 files changed

+186
-68
lines changed

splitio/engine/evaluator.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ def eval_with_context(self, key, bucketing, feature_name, attrs, ctx):
6767
'impression': {
6868
'label': label,
6969
'change_number': _change_number
70-
}
70+
},
71+
'track': feature.trackImpressions
7172
}
7273

7374
def _treatment_for_flag(self, flag, key, bucketing, attributes, ctx):

splitio/engine/impressions/impressions.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class ImpressionsMode(Enum):
1111
class Manager(object): # pylint:disable=too-few-public-methods
1212
"""Impression manager."""
1313

14-
def __init__(self, strategy, telemetry_runtime_producer):
14+
def __init__(self, strategy, none_strategy, telemetry_runtime_producer):
1515
"""
1616
Construct a manger to track and forward impressions to the queue.
1717
@@ -23,19 +23,33 @@ def __init__(self, strategy, telemetry_runtime_producer):
2323
"""
2424

2525
self._strategy = strategy
26+
self._none_strategy = none_strategy
2627
self._telemetry_runtime_producer = telemetry_runtime_producer
2728

28-
def process_impressions(self, impressions):
29+
def process_impressions(self, impressions_decorated):
2930
"""
3031
Process impressions.
3132
3233
Impressions are analyzed to see if they've been seen before and counted.
3334
34-
:param impressions: List of impression objects with attributes
35-
:type impressions: list[tuple[splitio.models.impression.Impression, dict]]
35+
:param impressions_decorated: List of impression objects with attributes
36+
:type impressions_decorated: list[tuple[splitio.models.impression.ImpressionDecorated, dict]]
3637
3738
:return: processed and deduped impressions.
3839
:rtype: tuple(list[tuple[splitio.models.impression.Impression, dict]], list(int))
3940
"""
40-
for_log, for_listener, for_counter, for_unique_keys_tracker = self._strategy.process_impressions(impressions)
41-
return for_log, len(impressions) - len(for_log), for_listener, for_counter, for_unique_keys_tracker
41+
for_listener_all = []
42+
for_log_all = []
43+
for_counter_all = []
44+
for_unique_keys_tracker_all = []
45+
for impression_decorated, att in impressions_decorated:
46+
if not impression_decorated.track:
47+
for_log, for_listener, for_counter, for_unique_keys_tracker = self._none_strategy.process_impressions([(impression_decorated.Impression, att)])
48+
else:
49+
for_log, for_listener, for_counter, for_unique_keys_tracker = self._strategy.process_impressions([(impression_decorated.Impression, att)])
50+
for_listener_all.extend(for_listener)
51+
for_log_all.extend(for_log)
52+
for_counter_all.extend(for_counter)
53+
for_unique_keys_tracker_all.extend(for_unique_keys_tracker)
54+
55+
return for_log_all, len(impressions_decorated) - len(for_log_all), for_listener_all, for_counter_all, for_unique_keys_tracker_all

splitio/models/impressions.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@
1616
]
1717
)
1818

19+
ImpressionDecorated = namedtuple(
20+
'ImpressionDecorated',
21+
[
22+
'Impression',
23+
'track'
24+
]
25+
)
26+
1927
# pre-python3.7 hack to make previous_time optional
2028
Impression.__new__.__defaults__ = (None,)
2129

splitio/models/splits.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
SplitView = namedtuple(
1212
'SplitView',
13-
['name', 'traffic_type', 'killed', 'treatments', 'change_number', 'configs', 'default_treatment', 'sets']
13+
['name', 'traffic_type', 'killed', 'treatments', 'change_number', 'configs', 'default_treatment', 'sets', 'trackImpressions']
1414
)
1515

1616
_DEFAULT_CONDITIONS_TEMPLATE = {
@@ -73,7 +73,8 @@ def __init__( # pylint: disable=too-many-arguments
7373
traffic_allocation=None,
7474
traffic_allocation_seed=None,
7575
configurations=None,
76-
sets=None
76+
sets=None,
77+
trackImpressions=None
7778
):
7879
"""
7980
Class constructor.
@@ -96,6 +97,8 @@ def __init__( # pylint: disable=too-many-arguments
9697
:type traffic_allocation_seed: int
9798
:pram sets: list of flag sets
9899
:type sets: list
100+
:pram trackImpressions: track impressions flag
101+
:type trackImpressions: boolean
99102
"""
100103
self._name = name
101104
self._seed = seed
@@ -125,6 +128,7 @@ def __init__( # pylint: disable=too-many-arguments
125128

126129
self._configurations = configurations
127130
self._sets = set(sets) if sets is not None else set()
131+
self._trackImpressions = trackImpressions if trackImpressions is not None else True
128132

129133
@property
130134
def name(self):
@@ -186,6 +190,11 @@ def sets(self):
186190
"""Return the flag sets of the split."""
187191
return self._sets
188192

193+
@property
194+
def trackImpressions(self):
195+
"""Return trackImpressions of the split."""
196+
return self._trackImpressions
197+
189198
def get_configurations_for(self, treatment):
190199
"""Return the mapping of treatments to configurations."""
191200
return self._configurations.get(treatment) if self._configurations else None
@@ -214,7 +223,8 @@ def to_json(self):
214223
'algo': self.algo.value,
215224
'conditions': [c.to_json() for c in self.conditions],
216225
'configurations': self._configurations,
217-
'sets': list(self._sets)
226+
'sets': list(self._sets),
227+
'trackImpressions': self._trackImpressions
218228
}
219229

220230
def to_split_view(self):
@@ -232,7 +242,8 @@ def to_split_view(self):
232242
self.change_number,
233243
self._configurations if self._configurations is not None else {},
234244
self._default_treatment,
235-
list(self._sets) if self._sets is not None else []
245+
list(self._sets) if self._sets is not None else [],
246+
self._trackImpressions
236247
)
237248

238249
def local_kill(self, default_treatment, change_number):
@@ -288,5 +299,6 @@ def from_raw(raw_split):
288299
traffic_allocation=raw_split.get('trafficAllocation'),
289300
traffic_allocation_seed=raw_split.get('trafficAllocationSeed'),
290301
configurations=raw_split.get('configurations'),
291-
sets=set(raw_split.get('sets')) if raw_split.get('sets') is not None else []
302+
sets=set(raw_split.get('sets')) if raw_split.get('sets') is not None else [],
303+
trackImpressions=raw_split.get('trackImpressions') if raw_split.get('trackImpressions') is not None else True
292304
)

splitio/recorder/recorder.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ def __init__(self, impressions_manager, event_storage, impression_storage, telem
151151
self._telemetry_evaluation_producer = telemetry_evaluation_producer
152152
self._telemetry_runtime_producer = telemetry_runtime_producer
153153

154-
def record_treatment_stats(self, impressions, latency, operation, method_name):
154+
def record_treatment_stats(self, impressions_decorated, latency, operation, method_name):
155155
"""
156156
Record stats for treatment evaluation.
157157
@@ -165,7 +165,7 @@ def record_treatment_stats(self, impressions, latency, operation, method_name):
165165
try:
166166
if method_name is not None:
167167
self._telemetry_evaluation_producer.record_latency(operation, latency)
168-
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions)
168+
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions_decorated)
169169
if deduped > 0:
170170
self._telemetry_runtime_producer.record_impression_stats(telemetry.CounterConstants.IMPRESSIONS_DEDUPED, deduped)
171171
self._impression_storage.put(impressions)
@@ -210,7 +210,7 @@ def __init__(self, impressions_manager, event_storage, impression_storage, telem
210210
self._telemetry_evaluation_producer = telemetry_evaluation_producer
211211
self._telemetry_runtime_producer = telemetry_runtime_producer
212212

213-
async def record_treatment_stats(self, impressions, latency, operation, method_name):
213+
async def record_treatment_stats(self, impressions_decorated, latency, operation, method_name):
214214
"""
215215
Record stats for treatment evaluation.
216216
@@ -224,7 +224,7 @@ async def record_treatment_stats(self, impressions, latency, operation, method_n
224224
try:
225225
if method_name is not None:
226226
await self._telemetry_evaluation_producer.record_latency(operation, latency)
227-
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions)
227+
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions_decorated)
228228
if deduped > 0:
229229
await self._telemetry_runtime_producer.record_impression_stats(telemetry.CounterConstants.IMPRESSIONS_DEDUPED, deduped)
230230

@@ -277,7 +277,7 @@ def __init__(self, pipe, impressions_manager, event_storage,
277277
self._data_sampling = data_sampling
278278
self._telemetry_redis_storage = telemetry_redis_storage
279279

280-
def record_treatment_stats(self, impressions, latency, operation, method_name):
280+
def record_treatment_stats(self, impressions_decorated, latency, operation, method_name):
281281
"""
282282
Record stats for treatment evaluation.
283283
@@ -294,7 +294,7 @@ def record_treatment_stats(self, impressions, latency, operation, method_name):
294294
if self._data_sampling < rnumber:
295295
return
296296

297-
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions)
297+
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions_decorated)
298298
if impressions:
299299
pipe = self._make_pipe()
300300
self._impression_storage.add_impressions_to_pipe(impressions, pipe)
@@ -367,7 +367,7 @@ def __init__(self, pipe, impressions_manager, event_storage,
367367
self._data_sampling = data_sampling
368368
self._telemetry_redis_storage = telemetry_redis_storage
369369

370-
async def record_treatment_stats(self, impressions, latency, operation, method_name):
370+
async def record_treatment_stats(self, impressions_decorated, latency, operation, method_name):
371371
"""
372372
Record stats for treatment evaluation.
373373
@@ -384,7 +384,7 @@ async def record_treatment_stats(self, impressions, latency, operation, method_n
384384
if self._data_sampling < rnumber:
385385
return
386386

387-
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions)
387+
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions_decorated)
388388
if impressions:
389389
pipe = self._make_pipe()
390390
self._impression_storage.add_impressions_to_pipe(impressions, pipe)

tests/engine/test_evaluator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def test_evaluate_treatment_ok(self, mocker):
5252
assert result['impression']['change_number'] == 123
5353
assert result['impression']['label'] == 'some_label'
5454
assert mocked_split.get_configurations_for.mock_calls == [mocker.call('on')]
55+
assert result['track'] == mocked_split.trackImpressions
5556

5657

5758
def test_evaluate_treatment_ok_no_config(self, mocker):

0 commit comments

Comments
 (0)