Skip to content

Commit 17097c9

Browse files
committed
Removed imp count sync and task in Debug mode
1 parent 7eeb605 commit 17097c9

File tree

3 files changed

+48
-47
lines changed

3 files changed

+48
-47
lines changed

splitio/client/factory.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -330,17 +330,24 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
330330
clear_filter_sync = None
331331
unique_keys_task = None
332332
clear_filter_task = None
333+
impressions_count_sync = None
334+
impressions_count_task = None
335+
333336
if cfg['impressionsMode'] == ImpressionsMode.NONE:
334337
imp_strategy = StrategyNoneMode(imp_counter)
335338
clear_filter_sync = ClearFilterSynchronizer(imp_strategy.get_unique_keys_tracker())
336339
unique_keys_synchronizer = UniqueKeysSynchronizer(InMemorySenderAdapter(apis['telemetry']), imp_strategy.get_unique_keys_tracker())
337340
unique_keys_task = UniqueKeysSyncTask(unique_keys_synchronizer.send_all)
338341
clear_filter_task = ClearFilterSyncTask(clear_filter_sync.clear_all)
339342
imp_strategy.get_unique_keys_tracker().set_queue_full_hook(unique_keys_task.flush)
343+
impressions_count_sync = ImpressionsCountSynchronizer(apis['impressions'], imp_counter)
344+
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
340345
elif cfg['impressionsMode'] == ImpressionsMode.DEBUG:
341346
imp_strategy = StrategyDebugMode()
342347
else:
343348
imp_strategy = StrategyOptimizedMode(imp_counter)
349+
impressions_count_sync = ImpressionsCountSynchronizer(apis['impressions'], imp_counter)
350+
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
344351

345352
imp_manager = ImpressionsManager(
346353
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
@@ -352,7 +359,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
352359
ImpressionSynchronizer(apis['impressions'], storages['impressions'],
353360
cfg['impressionsBulkSize']),
354361
EventSynchronizer(apis['events'], storages['events'], cfg['eventsBulkSize']),
355-
ImpressionsCountSynchronizer(apis['impressions'], imp_counter),
362+
impressions_count_sync,
356363
unique_keys_synchronizer,
357364
clear_filter_sync
358365
)
@@ -371,7 +378,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
371378
cfg['impressionsRefreshRate'],
372379
),
373380
EventsSyncTask(synchronizers.events_sync.synchronize_events, cfg['eventsPushRate']),
374-
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters),
381+
impressions_count_task,
375382
unique_keys_task,
376383
clear_filter_task
377384
)
@@ -425,36 +432,44 @@ def _build_redis_factory(api_key, cfg):
425432
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED)
426433
data_sampling = _MIN_DEFAULT_DATA_SAMPLING_ALLOWED
427434

428-
imp_counter = ImpressionsCounter() if cfg['impressionsMode'] != ImpressionsMode.DEBUG else None
429435
unique_keys_synchronizer = None
430436
clear_filter_sync = None
431437
unique_keys_task = None
432438
clear_filter_task = None
439+
impressions_count_sync = None
440+
impressions_count_task = None
433441
redis_sender_adapter = RedisSenderAdapter(redis_adapter)
442+
434443
if cfg['impressionsMode'] == ImpressionsMode.NONE:
444+
imp_counter = ImpressionsCounter()
435445
imp_strategy = StrategyNoneMode(imp_counter)
436446
clear_filter_sync = ClearFilterSynchronizer(imp_strategy.get_unique_keys_tracker())
437447
unique_keys_synchronizer = UniqueKeysSynchronizer(redis_sender_adapter, imp_strategy.get_unique_keys_tracker())
438448
unique_keys_task = UniqueKeysSyncTask(unique_keys_synchronizer.send_all)
439449
clear_filter_task = ClearFilterSyncTask(clear_filter_sync.clear_all)
440450
imp_strategy.get_unique_keys_tracker().set_queue_full_hook(unique_keys_task.flush)
451+
impressions_count_sync = ImpressionsCountSynchronizer(redis_sender_adapter, imp_counter)
452+
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
441453
elif cfg['impressionsMode'] == ImpressionsMode.DEBUG:
442454
imp_strategy = StrategyDebugMode()
443455
else:
456+
imp_counter = ImpressionsCounter()
444457
imp_strategy = StrategyOptimizedMode(imp_counter)
458+
impressions_count_sync = ImpressionsCountSynchronizer(redis_sender_adapter, imp_counter)
459+
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
445460

446461
imp_manager = ImpressionsManager(
447462
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
448463
imp_strategy)
449464

450465
synchronizers = SplitSynchronizers(None, None, None, None,
451-
ImpressionsCountSynchronizer(redis_sender_adapter, imp_counter),
466+
impressions_count_sync,
452467
unique_keys_synchronizer,
453468
clear_filter_sync
454469
)
455470

456471
tasks = SplitTasks(None, None, None, None,
457-
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters),
472+
impressions_count_task,
458473
unique_keys_task,
459474
clear_filter_task
460475
)

splitio/sync/manager.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,6 @@ def _streaming_feedback_handler(self):
132132
class RedisManager(object): # pylint:disable=too-many-instance-attributes
133133
"""Manager Class."""
134134

135-
_CENTINEL_EVENT = object()
136-
137135
def __init__(self, synchronizer): # pylint:disable=too-many-arguments
138136
"""
139137
Construct Manager.

splitio/sync/synchronizer.py

Lines changed: 28 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,17 @@ def __init__(self, split_synchronizers, split_tasks):
223223
"""
224224
self._split_synchronizers = split_synchronizers
225225
self._split_tasks = split_tasks
226+
self._periodic_data_recording_tasks = [
227+
self._split_tasks.impressions_task,
228+
self._split_tasks.events_task
229+
]
230+
if self._split_tasks.impressions_count_task:
231+
self._periodic_data_recording_tasks.append(self._split_tasks.impressions_count_task)
232+
if self._split_tasks.unique_keys_task is not None:
233+
self._periodic_data_recording_tasks.append(self._split_tasks.unique_keys_task)
234+
if self._split_tasks.clear_filter_task is not None:
235+
self._periodic_data_recording_tasks.append(self._split_tasks.clear_filter_task)
236+
226237

227238
def _synchronize_segments(self):
228239
_LOGGER.debug('Starting segments synchronization')
@@ -322,13 +333,8 @@ def stop_periodic_fetching(self):
322333
def start_periodic_data_recording(self):
323334
"""Start recorders."""
324335
_LOGGER.debug('Starting periodic data recording')
325-
self._split_tasks.impressions_task.start()
326-
self._split_tasks.events_task.start()
327-
self._split_tasks.impressions_count_task.start()
328-
if self._split_tasks.unique_keys_task is not None:
329-
self._split_tasks.unique_keys_task.start()
330-
if self._split_tasks.clear_filter_task is not None:
331-
self._split_tasks.clear_filter_task.start()
336+
for task in self._periodic_data_recording_tasks:
337+
task.start()
332338

333339
def stop_periodic_data_recording(self, blocking):
334340
"""
@@ -340,28 +346,15 @@ def stop_periodic_data_recording(self, blocking):
340346
_LOGGER.debug('Stopping periodic data recording')
341347
if blocking:
342348
events = []
343-
tasks = [self._split_tasks.impressions_task,
344-
self._split_tasks.events_task,
345-
self._split_tasks.impressions_count_task]
346-
if self._split_tasks.unique_keys_task is not None:
347-
tasks.append(self._split_tasks.unique_keys_task)
348-
if self._split_tasks.clear_filter_task is not None:
349-
tasks.append(self._split_tasks.clear_filter_task)
350-
351-
for task in tasks:
349+
for task in self._periodic_data_recording_tasks:
352350
stop_event = threading.Event()
353351
task.stop(stop_event)
354352
events.append(stop_event)
355353
if all(event.wait() for event in events):
356354
_LOGGER.debug('all tasks finished successfully.')
357355
else:
358-
self._split_tasks.impressions_task.stop()
359-
self._split_tasks.events_task.stop()
360-
self._split_tasks.impressions_count_task.stop()
361-
if self._split_tasks.unique_keys_task is not None:
362-
self._split_tasks.unique_keys_task.stop()
363-
if self._split_tasks.clear_filter_task is not None:
364-
self._split_tasks.clear_filter_task.stop()
356+
for task in self._periodic_data_recording_tasks:
357+
task.stop()
365358

366359
def kill_split(self, split_name, default_treatment, change_number):
367360
"""
@@ -390,7 +383,13 @@ def __init__(self, split_synchronizers, split_tasks):
390383
:type split_tasks: splitio.sync.synchronizer.SplitTasks
391384
"""
392385
self._split_synchronizers = split_synchronizers
393-
self._split_tasks = split_tasks
386+
self._tasks = []
387+
if split_tasks.impressions_count_task is not None:
388+
self._tasks.append(split_tasks.impressions_count_task)
389+
if split_tasks.unique_keys_task is not None:
390+
self._tasks.append(split_tasks.unique_keys_task)
391+
if split_tasks.clear_filter_task is not None:
392+
self._tasks.append(split_tasks.clear_filter_task)
394393

395394
def sync_all(self):
396395
"""
@@ -411,11 +410,8 @@ def shutdown(self, blocking):
411410
def start_periodic_data_recording(self):
412411
"""Start recorders."""
413412
_LOGGER.debug('Starting periodic data recording')
414-
self._split_tasks.impressions_count_task.start()
415-
if self._split_tasks.unique_keys_task is not None:
416-
self._split_tasks.unique_keys_task.start()
417-
if self._split_tasks.clear_filter_task is not None:
418-
self._split_tasks.clear_filter_task.start()
413+
for task in self._tasks:
414+
task.start()
419415

420416
def stop_periodic_data_recording(self, blocking):
421417
"""
@@ -427,23 +423,15 @@ def stop_periodic_data_recording(self, blocking):
427423
_LOGGER.debug('Stopping periodic data recording')
428424
if blocking:
429425
events = []
430-
tasks = [self._split_tasks.impressions_count_task]
431-
if self._split_tasks.unique_keys_task is not None:
432-
tasks.append(self._split_tasks.unique_keys_task)
433-
if self._split_tasks.clear_filter_task is not None:
434-
tasks.append(self._split_tasks.clear_filter_task)
435-
for task in tasks:
426+
for task in self._tasks:
436427
stop_event = threading.Event()
437428
task.stop(stop_event)
438429
events.append(stop_event)
439430
if all(event.wait() for event in events):
440431
_LOGGER.debug('all tasks finished successfully.')
441432
else:
442-
self._split_tasks.impressions_count_task.stop()
443-
if self._split_tasks.unique_keys_task is not None:
444-
self._split_tasks.unique_keys_task.stop()
445-
if self._split_tasks.clear_filter_task is not None:
446-
self._split_tasks.clear_filter_task.stop()
433+
for task in self._tasks:
434+
task.stop()
447435

448436
def kill_split(self, split_name, default_treatment, change_number):
449437
"""Kill a split locally."""

0 commit comments

Comments
 (0)