Skip to content

Commit 8329464

Browse files
authored
Merge pull request #492 from splitio/async-add-iff
added IFF feature to async branch
2 parents 6ca91c0 + 5de6bc2 commit 8329464

23 files changed

+1030
-294
lines changed

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
'pyyaml>=5.4',
2222
'docopt>=0.6.2',
2323
'enum34;python_version<"3.4"',
24-
'bloom-filter2>=2.0.0',
24+
'bloom-filter2>=2.0.0'
2525
]
2626

2727
with open(path.join(path.abspath(path.dirname(__file__)), 'splitio', 'version.py')) as f:
@@ -44,7 +44,7 @@
4444
'uwsgi': ['uwsgi>=2.0.0'],
4545
'cpphash': ['mmh3cffi==0.2.1'],
4646
},
47-
setup_requires=['pytest-runner'],
47+
setup_requires=['pytest-runner', 'pluggy==1.0.0;python_version<"3.7"'],
4848
classifiers=[
4949
'Environment :: Console',
5050
'Intended Audience :: Developers',

splitio/client/client.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,6 @@ def _get_treatment(self, method, key, feature, attributes=None):
292292
result = self._evaluator.eval_with_context(key, bucketing, feature, attributes, ctx)
293293
except Exception as e: # toto narrow this
294294
_LOGGER.error('Error getting treatment for feature flag')
295-
_LOGGER.error(str(e))
296295
_LOGGER.debug('Error: ', exc_info=True)
297296
self._telemetry_evaluation_producer.record_exception(method)
298297
result = self._FAILED_EVAL_RESULT
@@ -382,7 +381,6 @@ def _get_treatments(self, key, features, method, attributes=None):
382381
results = self._evaluator.eval_many_with_context(key, bucketing, features, attributes, ctx)
383382
except Exception as e: # toto narrow this
384383
_LOGGER.error('Error getting treatment for feature flag')
385-
_LOGGER.error(str(e))
386384
_LOGGER.debug('Error: ', exc_info=True)
387385
self._telemetry_evaluation_producer.record_exception(method)
388386
results = {n: self._FAILED_EVAL_RESULT for n in features}
@@ -572,7 +570,6 @@ async def _get_treatment(self, method, key, feature, attributes=None):
572570
result = self._evaluator.eval_with_context(key, bucketing, feature, attributes, ctx)
573571
except Exception as e: # toto narrow this
574572
_LOGGER.error('Error getting treatment for feature flag')
575-
_LOGGER.error(str(e))
576573
_LOGGER.debug('Error: ', exc_info=True)
577574
await self._telemetry_evaluation_producer.record_exception(method)
578575
result = self._FAILED_EVAL_RESULT
@@ -662,7 +659,6 @@ async def _get_treatments(self, key, features, method, attributes=None):
662659
results = self._evaluator.eval_many_with_context(key, bucketing, features, attributes, ctx)
663660
except Exception as e: # toto narrow this
664661
_LOGGER.error('Error getting treatment for feature flag')
665-
_LOGGER.error(str(e))
666662
_LOGGER.debug('Error: ', exc_info=True)
667663
await self._telemetry_evaluation_producer.record_exception(method)
668664
results = {n: self._FAILED_EVAL_RESULT for n in features}

splitio/client/factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ async def block_until_ready(self, timeout=None):
410410
await asyncio.wait_for(asyncio.shield(self._sdk_ready_flag.wait()), timeout)
411411
except asyncio.TimeoutError as e:
412412
_LOGGER.error("Exception initializing SDK")
413-
_LOGGER.error(str(e))
413+
_LOGGER.debug(str(e))
414414
await self._telemetry_init_producer.record_bur_time_out()
415415
raise TimeoutException('SDK Initialization: time of %d exceeded' % timeout)
416416

splitio/engine/telemetry.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
_LOGGER = logging.getLogger(__name__)
77

88
from splitio.storage.inmemmory import InMemoryTelemetryStorage
9-
from splitio.models.telemetry import CounterConstants
9+
from splitio.models.telemetry import CounterConstants, UpdateFromSSE
1010

1111
class TelemetryStorageProducerBase(object):
1212
"""Telemetry storage producer base class."""
@@ -212,6 +212,9 @@ def record_session_length(self, session):
212212
"""Record session length."""
213213
self._telemetry_storage.record_session_length(session)
214214

215+
def record_update_from_sse(self, event):
216+
"""Record update from sse."""
217+
self._telemetry_storage.record_update_from_sse(event)
215218

216219
class TelemetryRuntimeProducerAsync(object):
217220
"""Telemetry runtime producer async class."""
@@ -260,6 +263,9 @@ async def record_session_length(self, session):
260263
"""Record session length."""
261264
await self._telemetry_storage.record_session_length(session)
262265

266+
async def record_update_from_sse(self, event):
267+
"""Record update from sse."""
268+
await self._telemetry_storage.record_update_from_sse(event)
263269

264270
class TelemetryStorageConsumerBase(object):
265271
"""Telemetry storage consumer base class."""
@@ -539,6 +545,10 @@ def pop_streaming_events(self):
539545
"""Get and reset streaming events."""
540546
return self._telemetry_storage.pop_streaming_events()
541547

548+
def pop_update_from_sse(self, event):
549+
"""Get and reset update from sse."""
550+
return self._telemetry_storage.pop_update_from_sse(event)
551+
542552
def get_session_length(self):
543553
"""Get session length"""
544554
return self._telemetry_storage.get_session_length()
@@ -561,6 +571,7 @@ def pop_formatted_stats(self):
561571
'eQ': self.get_events_stats(CounterConstants.EVENTS_QUEUED),
562572
'eD': self.get_events_stats(CounterConstants.EVENTS_DROPPED),
563573
'lS': self._last_synchronization_to_json(last_synchronization),
574+
'ufs': {event.value: self.pop_update_from_sse(event) for event in UpdateFromSSE},
564575
't': self.pop_tags(),
565576
'hE': self._http_errors_to_json(http_errors),
566577
'hL': self._http_latencies_to_json(http_latencies),
@@ -615,6 +626,10 @@ async def pop_streaming_events(self):
615626
"""Get and reset streaming events."""
616627
return await self._telemetry_storage.pop_streaming_events()
617628

629+
async def pop_update_from_sse(self, event):
630+
"""Get and reset update from sse."""
631+
return await self._telemetry_storage.pop_update_from_sse(event)
632+
618633
async def get_session_length(self):
619634
"""Get session length"""
620635
return await self._telemetry_storage.get_session_length()
@@ -636,6 +651,7 @@ async def pop_formatted_stats(self):
636651
'iDr': await self.get_impressions_stats(CounterConstants.IMPRESSIONS_DROPPED),
637652
'eQ': await self.get_events_stats(CounterConstants.EVENTS_QUEUED),
638653
'eD': await self.get_events_stats(CounterConstants.EVENTS_DROPPED),
654+
'ufs': {event.value: await self.pop_update_from_sse(event) for event in UpdateFromSSE},
639655
'lS': self._last_synchronization_to_json(last_synchronization),
640656
't': await self.pop_tags(),
641657
'hE': self._http_errors_to_json(http_errors['httpErrors']),

splitio/models/telemetry.py

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ class OperationMode(Enum):
133133
CONSUMER = 'consumer'
134134
PARTIAL_CONSUMER = 'partial_consumer'
135135

136+
class UpdateFromSSE(Enum):
137+
"""Update from sse constants"""
138+
SPLIT_UPDATE = 'sp'
139+
136140
def get_latency_bucket_index(micros):
137141
"""
138142
Find the bucket index for a measured latency.
@@ -856,6 +860,7 @@ def _reset_all(self):
856860
self._auth_rejections = 0
857861
self._token_refreshes = 0
858862
self._session_length = 0
863+
self._update_from_sse = {}
859864

860865
@abc.abstractmethod
861866
def record_impressions_value(self, resource, value):
@@ -959,22 +964,42 @@ def record_events_value(self, resource, value):
959964
else:
960965
return
961966

967+
def record_update_from_sse(self, event):
968+
"""
969+
Increment the update from sse resource by one.
970+
"""
971+
with self._lock:
972+
if event.value not in self._update_from_sse:
973+
self._update_from_sse[event.value] = 0
974+
self._update_from_sse[event.value] += 1
975+
962976
def record_auth_rejections(self):
963977
"""
964-
Increament the auth rejection resource by one.
978+
Increment the auth rejection resource by one.
965979
966980
"""
967981
with self._lock:
968982
self._auth_rejections += 1
969983

970984
def record_token_refreshes(self):
971985
"""
972-
Increament the token refreshes resource by one.
986+
Increment the token refreshes resource by one.
973987
974988
"""
975989
with self._lock:
976990
self._token_refreshes += 1
977991

992+
def pop_update_from_sse(self, event):
993+
"""
994+
Pop update from sse
995+
:return: update from sse value
996+
:rtype: int
997+
"""
998+
with self._lock:
999+
update_from_sse = self._update_from_sse[event.value]
1000+
self._update_from_sse[event.value] = 0
1001+
return update_from_sse
1002+
9781003
def record_session_length(self, session):
9791004
"""
9801005
Set the session length value
@@ -1094,22 +1119,42 @@ async def record_events_value(self, resource, value):
10941119
else:
10951120
return
10961121

1122+
async def record_update_from_sse(self, event):
1123+
"""
1124+
Increment the update from sse resource by one.
1125+
"""
1126+
async with self._lock:
1127+
if event.value not in self._update_from_sse:
1128+
self._update_from_sse[event.value] = 0
1129+
self._update_from_sse[event.value] += 1
1130+
10971131
async def record_auth_rejections(self):
10981132
"""
1099-
Increament the auth rejection resource by one.
1133+
Increment the auth rejection resource by one.
11001134
11011135
"""
11021136
async with self._lock:
11031137
self._auth_rejections += 1
11041138

11051139
async def record_token_refreshes(self):
11061140
"""
1107-
Increament the token refreshes resource by one.
1141+
Increment the token refreshes resource by one.
11081142
11091143
"""
11101144
async with self._lock:
11111145
self._token_refreshes += 1
11121146

1147+
async def pop_update_from_sse(self, event):
1148+
"""
1149+
Pop update from sse
1150+
:return: update from sse value
1151+
:rtype: int
1152+
"""
1153+
async with self._lock:
1154+
update_from_sse = self._update_from_sse[event.value]
1155+
self._update_from_sse[event.value] = 0
1156+
return update_from_sse
1157+
11131158
async def record_session_length(self, session):
11141159
"""
11151160
Set the session length value

splitio/push/manager.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
"""Push subsystem manager class and helpers."""
2-
32
import logging
43
from threading import Timer
54
import abc
@@ -67,7 +66,7 @@ def __init__(self, auth_api, synchronizer, feedback_loop, sdk_metadata, telemetr
6766
"""
6867
self._auth_api = auth_api
6968
self._feedback_loop = feedback_loop
70-
self._processor = MessageProcessor(synchronizer)
69+
self._processor = MessageProcessor(synchronizer, telemetry_runtime_producer)
7170
self._status_tracker = PushStatusTracker(telemetry_runtime_producer)
7271
self._event_handlers = {
7372
EventType.MESSAGE: self._handle_message,
@@ -300,7 +299,7 @@ def __init__(self, auth_api, synchronizer, feedback_loop, sdk_metadata, telemetr
300299
"""
301300
self._auth_api = auth_api
302301
self._feedback_loop = feedback_loop
303-
self._processor = MessageProcessorAsync(synchronizer)
302+
self._processor = MessageProcessorAsync(synchronizer, telemetry_runtime_producer)
304303
self._status_tracker = PushStatusTrackerAsync(telemetry_runtime_producer)
305304
self._event_handlers = {
306305
EventType.MESSAGE: self._handle_message,

splitio/push/parser.py

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ def __str__(self):
277277

278278

279279
class BaseUpdate(BaseMessage, metaclass=abc.ABCMeta):
280-
"""Split data update notification."""
280+
"""Feature flag data update notification."""
281281

282282
def __init__(self, channel, timestamp, change_number):
283283
"""
@@ -324,11 +324,14 @@ def change_number(self):
324324

325325

326326
class SplitChangeUpdate(BaseUpdate):
327-
"""Split Change notification."""
327+
"""Feature flag Change notification."""
328328

329-
def __init__(self, channel, timestamp, change_number):
329+
def __init__(self, channel, timestamp, change_number, previous_change_number, feature_flag_definition, compression):
330330
"""Class constructor."""
331331
BaseUpdate.__init__(self, channel, timestamp, change_number)
332+
self._previous_change_number = previous_change_number
333+
self._feature_flag_definition = feature_flag_definition
334+
self._compression = compression
332335

333336
@property
334337
def update_type(self): # pylint:disable=no-self-use
@@ -340,18 +343,45 @@ def update_type(self): # pylint:disable=no-self-use
340343
"""
341344
return UpdateType.SPLIT_UPDATE
342345

346+
@property
347+
def previous_change_number(self): # pylint:disable=no-self-use
348+
"""
349+
Return previous change number
350+
:returns: The previous change number
351+
:rtype: int
352+
"""
353+
return self._previous_change_number
354+
355+
@property
356+
def feature_flag_definition(self): # pylint:disable=no-self-use
357+
"""
358+
Return feature flag definition
359+
:returns: The new feature flag definition
360+
:rtype: str
361+
"""
362+
return self._feature_flag_definition
363+
364+
@property
365+
def compression(self): # pylint:disable=no-self-use
366+
"""
367+
Return previous compression type
368+
:returns: The compression type
369+
:rtype: int
370+
"""
371+
return self._compression
372+
343373
def __str__(self):
344374
"""Return string representation."""
345375
return "SplitChange - changeNumber=%d" % (self.change_number)
346376

347377

348378
class SplitKillUpdate(BaseUpdate):
349-
"""Split Kill notification."""
379+
"""Feature flag Kill notification."""
350380

351-
def __init__(self, channel, timestamp, change_number, split_name, default_treatment): # pylint:disable=too-many-arguments
381+
def __init__(self, channel, timestamp, change_number, feature_flag_name, default_treatment): # pylint:disable=too-many-arguments
352382
"""Class constructor."""
353383
BaseUpdate.__init__(self, channel, timestamp, change_number)
354-
self._split_name = split_name
384+
self._feature_flag_name = feature_flag_name
355385
self._default_treatment = default_treatment
356386

357387
@property
@@ -365,14 +395,14 @@ def update_type(self): # pylint:disable=no-self-use
365395
return UpdateType.SPLIT_KILL
366396

367397
@property
368-
def split_name(self):
398+
def feature_flag_name(self):
369399
"""
370-
Return the name of the killed split.
400+
Return the name of the killed feature flag.
371401
372-
:returns: name of the killed split
402+
:returns: name of the killed feature flag
373403
:rtype: str
374404
"""
375-
return self._split_name
405+
return self._feature_flag_name
376406

377407
@property
378408
def default_treatment(self):
@@ -387,7 +417,7 @@ def default_treatment(self):
387417
def __str__(self):
388418
"""Return string representation."""
389419
return "SplitKill - changeNumber=%d, name=%s, defaultTreatment=%s" % \
390-
(self.change_number, self.split_name, self.default_treatment)
420+
(self.change_number, self.feature_flag_name, self.default_treatment)
391421

392422

393423
class SegmentChangeUpdate(BaseUpdate):
@@ -471,9 +501,9 @@ def _parse_update(channel, timestamp, data):
471501
"""
472502
update_type = UpdateType(data['type'])
473503
change_number = data['changeNumber']
474-
if update_type == UpdateType.SPLIT_UPDATE:
475-
return SplitChangeUpdate(channel, timestamp, change_number)
476-
elif update_type == UpdateType.SPLIT_KILL:
504+
if update_type == UpdateType.SPLIT_UPDATE and change_number is not None:
505+
return SplitChangeUpdate(channel, timestamp, change_number, data.get('pcn'), data.get('d'), data.get('c'))
506+
elif update_type == UpdateType.SPLIT_KILL and change_number is not None:
477507
return SplitKillUpdate(channel, timestamp, change_number,
478508
data['splitName'], data['defaultTreatment'])
479509
elif update_type == UpdateType.SEGMENT_UPDATE:

0 commit comments

Comments
 (0)