Skip to content

Commit 31b9107

Browse files
authored
Merge pull request #216 from splitio/feature/pipeline
Feature/pipeline
2 parents f725995 + ac9532f commit 31b9107

31 files changed

+1249
-352
lines changed

CHANGES.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
8.4.0 (Jan 6, 2021)
2+
- Added RecordStats for supporting pipelined recording in redis when treatment call is made.
3+
- Added hooks support for preforked servers.
4+
15
8.3.1 (Nov 20, 2020)
26
- Fixed error handling when split server fails, so that it doesn't bring streaming down.
37
- Added SDK Metadata headers to split & segments API clients

splitio/client/client.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class Client(object): # pylint: disable=too-many-instance-attributes
2525
_METRIC_GET_TREATMENT_WITH_CONFIG = 'sdk.getTreatmentWithConfig'
2626
_METRIC_GET_TREATMENTS_WITH_CONFIG = 'sdk.getTreatmentsWithConfig'
2727

28-
def __init__(self, factory, impressions_manager, labels_enabled=True):
28+
def __init__(self, factory, recorder, labels_enabled=True):
2929
"""
3030
Construct a Client instance.
3131
@@ -35,21 +35,18 @@ def __init__(self, factory, impressions_manager, labels_enabled=True):
3535
:param labels_enabled: Whether to store labels on impressions
3636
:type labels_enabled: bool
3737
38-
:param impressions_manager: impression manager instance
39-
:type impressions_manager: splitio.engine.impressions.Manager
38+
:param recorder: recorder instance
39+
:type recorder: splitio.recorder.StatsRecorder
4040
4141
:rtype: Client
4242
"""
4343
self._factory = factory
4444
self._labels_enabled = labels_enabled
45-
self._impressions_manager = impressions_manager
46-
45+
self._recorder = recorder
4746
self._splitter = Splitter()
4847
self._split_storage = factory._get_storage('splits') # pylint: disable=protected-access
4948
self._segment_storage = factory._get_storage('segments') # pylint: disable=protected-access
50-
self._impressions_storage = factory._get_storage('impressions') # pylint: disable=protected-access
5149
self._events_storage = factory._get_storage('events') # pylint: disable=protected-access
52-
self._telemetry_storage = factory._get_storage('telemetry') # pylint: disable=protected-access
5350
self._evaluator = Evaluator(self._split_storage, self._segment_storage, self._splitter)
5451

5552
def destroy(self):
@@ -93,6 +90,9 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
9390
if self.destroyed:
9491
_LOGGER.error("Client has already been destroyed - no calls possible")
9592
return CONTROL, None
93+
if self._factory._waiting_fork():
94+
_LOGGER.error("Client is not ready - no calls possible")
95+
return CONTROL, None
9696

9797
start = int(round(time.time() * 1000))
9898

@@ -146,6 +146,9 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
146146
if self.destroyed:
147147
_LOGGER.error("Client has already been destroyed - no calls possible")
148148
return input_validator.generate_control_treatments(features, method_name)
149+
if self._factory._waiting_fork():
150+
_LOGGER.error("Client is not ready - no calls possible")
151+
return input_validator.generate_control_treatments(features, method_name)
149152

150153
start = int(round(time.time() * 1000))
151154

@@ -341,13 +344,9 @@ def _record_stats(self, impressions, start, operation):
341344
:param operation: operation performed.
342345
:type operation: str
343346
"""
344-
try:
345-
end = int(round(time.time() * 1000))
346-
self._impressions_manager.track(impressions)
347-
self._telemetry_storage.inc_latency(operation, get_latency_bucket_index(end - start))
348-
except Exception: # pylint: disable=broad-except
349-
_LOGGER.error('Error recording impressions and metrics')
350-
_LOGGER.debug('Error: ', exc_info=True)
347+
end = int(round(time.time() * 1000))
348+
self._recorder.record_treatment_stats(impressions, get_latency_bucket_index(end - start),
349+
operation)
351350

352351
def track(self, key, traffic_type, event_type, value=None, properties=None):
353352
"""
@@ -370,6 +369,9 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
370369
if self.destroyed:
371370
_LOGGER.error("Client has already been destroyed - no calls possible")
372371
return False
372+
if self._factory._waiting_fork():
373+
_LOGGER.error("Client is not ready - no calls possible")
374+
return False
373375

374376
key = input_validator.validate_track_key(key)
375377
event_type = input_validator.validate_event_type(event_type)
@@ -395,7 +397,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
395397
timestamp=utctime_ms(),
396398
properties=properties,
397399
)
398-
return self._events_storage.put([EventWrapper(
400+
return self._recorder.record_track_stats([EventWrapper(
399401
event=event,
400402
size=size,
401403
)])

splitio/client/config.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
'IPAddressesEnabled': True,
3030
'impressionsMode': 'OPTIMIZED',
3131
'impressionListener': None,
32-
'redisLocalCacheEnabled': False,
32+
'redisLocalCacheEnabled': True,
3333
'redisLocalCacheTTL': 5,
3434
'redisHost': 'localhost',
3535
'redisPort': 6379,
@@ -55,7 +55,8 @@
5555
'redisMaxConnections': None,
5656
'machineName': None,
5757
'machineIp': None,
58-
'splitFile': os.path.join(os.path.expanduser('~'), '.split')
58+
'splitFile': os.path.join(os.path.expanduser('~'), '.split'),
59+
'preforkedInitialization': False,
5960
}
6061

6162

splitio/client/factory.py

Lines changed: 102 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
from splitio.api.telemetry import TelemetryAPI
3636
from splitio.api.auth import AuthAPI
3737

38-
3938
# Tasks
4039
from splitio.tasks.split_sync import SplitSynchronizationTask
4140
from splitio.tasks.segment_sync import SegmentSynchronizationTask
@@ -53,6 +52,9 @@
5352
from splitio.sync.event import EventSynchronizer
5453
from splitio.sync.telemetry import TelemetrySynchronizer
5554

55+
# Recorder
56+
from splitio.recorder.recorder import StandardRecorder, PipelinedRecorder
57+
5658
# Localhost stuff
5759
from splitio.client.localhost import LocalhostEventsStorage, LocalhostImpressionsStorage, \
5860
LocalhostTelemetryStorage
@@ -69,6 +71,7 @@ class Status(Enum):
6971
NOT_INITIALIZED = 'NOT_INITIALIZED'
7072
READY = 'READY'
7173
DESTROYED = 'DESTROYED'
74+
WAITING_FORK = 'WAITING_FORK'
7275

7376

7477
class TimeoutException(Exception):
@@ -85,9 +88,10 @@ def __init__( # pylint: disable=too-many-arguments
8588
apikey,
8689
storages,
8790
labels_enabled,
88-
impressions_manager,
91+
recorder,
8992
sync_manager=None,
9093
sdk_ready_flag=None,
94+
preforked_initialization=False,
9195
):
9296
"""
9397
Class constructor.
@@ -102,20 +106,31 @@ def __init__( # pylint: disable=too-many-arguments
102106
:type sync_manager: splitio.sync.manager.Manager
103107
:param sdk_ready_flag: Event to set when the sdk is ready.
104108
:type sdk_ready_flag: threading.Event
105-
:param impression_manager: Impressions manager instance
106-
:type impression_listener: ImpressionsManager
109+
:param recorder: StatsRecorder instance
110+
:type recorder: StatsRecorder
111+
:param preforked_initialization: Whether should be instantiated as preforked or not.
112+
:type preforked_initialization: bool
107113
"""
108114
self._apikey = apikey
109115
self._storages = storages
110116
self._labels_enabled = labels_enabled
111117
self._sync_manager = sync_manager
112118
self._sdk_internal_ready_flag = sdk_ready_flag
113-
self._sdk_ready_flag = threading.Event()
114-
self._impressions_manager = impressions_manager
119+
self._recorder = recorder
120+
self._preforked_initialization = preforked_initialization
121+
self._start_status_updater()
115122

123+
def _start_status_updater(self):
124+
"""
125+
Perform status updater
126+
"""
127+
if self._preforked_initialization:
128+
self._status = Status.WAITING_FORK
129+
return
116130
# If we have a ready flag, it means we have sync tasks that need to finish
117131
# before the SDK client becomes ready.
118132
if self._sdk_internal_ready_flag is not None:
133+
self._sdk_ready_flag = threading.Event()
119134
self._status = Status.NOT_INITIALIZED
120135
# add a listener that updates the status to READY once the flag is set.
121136
ready_updater = threading.Thread(target=self._update_status_when_ready,
@@ -150,7 +165,7 @@ def client(self):
150165
This client is only a set of references to structures hold by the factory.
151166
Creating one a fast operation and safe to be used anywhere.
152167
"""
153-
return Client(self, self._impressions_manager, self._labels_enabled)
168+
return Client(self, self._recorder, self._labels_enabled)
154169

155170
def manager(self):
156171
"""
@@ -230,6 +245,38 @@ def destroyed(self):
230245
"""
231246
return self._status == Status.DESTROYED
232247

248+
def _waiting_fork(self):
249+
"""
250+
Return whether the factory is waiting to be recreated by forking or not.
251+
252+
:return: True if the factory is waiting to be recreated by forking. False otherwise.
253+
:rtype: bool
254+
"""
255+
return self._status == Status.WAITING_FORK
256+
257+
def resume(self):
258+
"""
259+
Function in charge of starting periodic/realtime synchronization after a fork.
260+
"""
261+
if not self._waiting_fork():
262+
_LOGGER.warning('Cannot call resume')
263+
return
264+
self._sync_manager.recreate()
265+
sdk_ready_flag = threading.Event()
266+
self._sdk_internal_ready_flag = sdk_ready_flag
267+
self._sync_manager._ready_flag = sdk_ready_flag
268+
self._get_storage('telemetry').clear()
269+
self._get_storage('impressions').clear()
270+
self._get_storage('events').clear()
271+
initialization_thread = threading.Thread(
272+
target=self._sync_manager.start,
273+
name="SDKInitializer",
274+
)
275+
initialization_thread.setDaemon(True)
276+
initialization_thread.start()
277+
self._preforked_initialization = False # reset for status updater
278+
self._start_status_updater()
279+
233280

234281
def _wrap_impression_listener(listener, metadata):
235282
"""
@@ -280,7 +327,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
280327
}
281328

282329
imp_manager = ImpressionsManager(
283-
storages['impressions'].put,
284330
cfg['impressionsMode'],
285331
True,
286332
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata))
@@ -318,19 +364,34 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
318364

319365
synchronizer = Synchronizer(synchronizers, tasks)
320366

321-
sdk_ready_flag = threading.Event()
367+
preforked_initialization = cfg.get('preforkedInitialization', False)
368+
369+
sdk_ready_flag = threading.Event() if not preforked_initialization else None
322370
manager = Manager(sdk_ready_flag, synchronizer, apis['auth'], cfg['streamingEnabled'],
323371
streaming_api_base_url)
324372

373+
storages['events'].set_queue_full_hook(tasks.events_task.flush)
374+
storages['impressions'].set_queue_full_hook(tasks.impressions_task.flush)
375+
376+
recorder = StandardRecorder(
377+
imp_manager,
378+
storages['telemetry'],
379+
storages['events'],
380+
storages['impressions'],
381+
)
382+
383+
if preforked_initialization:
384+
synchronizer.sync_all()
385+
synchronizer._split_synchronizers._segment_sync.shutdown()
386+
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
387+
recorder, manager, preforked_initialization=preforked_initialization)
388+
325389
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer")
326390
initialization_thread.setDaemon(True)
327391
initialization_thread.start()
328392

329-
storages['events'].set_queue_full_hook(tasks.events_task.flush)
330-
storages['impressions'].set_queue_full_hook(tasks.impressions_task.flush)
331-
332393
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
333-
imp_manager, manager, sdk_ready_flag)
394+
recorder, manager, sdk_ready_flag)
334395

335396

336397
def _build_redis_factory(api_key, cfg):
@@ -346,12 +407,19 @@ def _build_redis_factory(api_key, cfg):
346407
'events': RedisEventsStorage(redis_adapter, sdk_metadata),
347408
'telemetry': RedisTelemetryStorage(redis_adapter, sdk_metadata)
348409
}
410+
recorder = PipelinedRecorder(
411+
redis_adapter.pipeline,
412+
ImpressionsManager(cfg['impressionsMode'], False,
413+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata)),
414+
storages['telemetry'],
415+
storages['events'],
416+
storages['impressions'],
417+
)
349418
return SplitFactory(
350419
api_key,
351420
storages,
352421
cfg['labelsEnabled'],
353-
ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], False,
354-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata))
422+
recorder,
355423
)
356424

357425

@@ -366,12 +434,22 @@ def _build_uwsgi_factory(api_key, cfg):
366434
'events': UWSGIEventStorage(uwsgi_adapter),
367435
'telemetry': UWSGITelemetryStorage(uwsgi_adapter)
368436
}
437+
recorder = StandardRecorder(
438+
ImpressionsManager(cfg['impressionsMode'], True,
439+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata)),
440+
storages['telemetry'],
441+
storages['events'],
442+
storages['impressions'],
443+
)
444+
_LOGGER.warning(
445+
"Beware: uwsgi-cache based operation mode is soon to be deprecated. Please consider " +
446+
"redis if you need a centralized point of syncrhonization, or in-memory (with preforking " +
447+
"support enabled) if running uwsgi with a master and several http workers)")
369448
return SplitFactory(
370449
api_key,
371450
storages,
372451
cfg['labelsEnabled'],
373-
ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], True,
374-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata))
452+
recorder,
375453
)
376454

377455

@@ -401,12 +479,17 @@ def _build_localhost_factory(cfg):
401479
synchronizer = LocalhostSynchronizer(synchronizers, tasks)
402480
manager = Manager(ready_event, synchronizer, None, False)
403481
manager.start()
404-
482+
recorder = StandardRecorder(
483+
ImpressionsManager(cfg['impressionsMode'], True, None),
484+
storages['telemetry'],
485+
storages['events'],
486+
storages['impressions'],
487+
)
405488
return SplitFactory(
406489
'localhost',
407490
storages,
408491
False,
409-
ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], True, None),
492+
recorder,
410493
manager,
411494
ready_event
412495
)

splitio/client/localhost.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ def pop_many(self, *_, **__): # pylint: disable=arguments-differ
3030
"""Accept any arguments and do nothing."""
3131
pass
3232

33+
def clear(self, *_, **__): # pylint: disable=arguments-differ
34+
"""Accept any arguments and do nothing."""
35+
pass
36+
3337

3438
class LocalhostEventsStorage(EventStorage):
3539
"""Impression storage that doesn't cache anything."""
@@ -42,6 +46,10 @@ def pop_many(self, *_, **__): # pylint: disable=arguments-differ
4246
"""Accept any arguments and do nothing."""
4347
pass
4448

49+
def clear(self, *_, **__): # pylint: disable=arguments-differ
50+
"""Accept any arguments and do nothing."""
51+
pass
52+
4553

4654
class LocalhostTelemetryStorage(TelemetryStorage):
4755
"""Impression storage that doesn't cache anything."""
@@ -69,3 +77,7 @@ def pop_counters(self, *_, **__): # pylint: disable=arguments-differ
6977
def pop_gauges(self, *_, **__): # pylint: disable=arguments-differ
7078
"""Accept any arguments and do nothing."""
7179
pass
80+
81+
def clear(self, *_, **__): # pylint: disable=arguments-differ
82+
"""Accept any arguments and do nothing."""
83+
pass

0 commit comments

Comments
 (0)