@@ -327,8 +327,17 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
327327 }
328328 imp_counter = ImpressionsCounter () if cfg ['impressionsMode' ] != ImpressionsMode .DEBUG else None
329329
330+ unique_keys_synchronizer = None
331+ clear_filter_sync = None
332+ unique_keys_task = None
333+ clear_filter_task = None
330334 if cfg ['impressionsMode' ] == ImpressionsMode .NONE :
331335 imp_strategy = StrategyNoneMode (imp_counter )
336+ clear_filter_sync = ClearFilterSynchronizer (imp_strategy ._unique_keys_tracker )
337+ unique_keys_synchronizer = UniqueKeysSynchronizer (InMemorySenderAdapter (apis ['telemetry' ]), imp_strategy ._unique_keys_tracker )
338+ unique_keys_task = UniqueKeysSyncTask (unique_keys_synchronizer .SendAll )
339+ clear_filter_task = ClearFilterSyncTask (clear_filter_sync .clearAll )
340+ imp_strategy ._unique_keys_tracker .set_queue_full_hook (unique_keys_task .flush )
332341 elif cfg ['impressionsMode' ] == ImpressionsMode .DEBUG :
333342 imp_strategy = StrategyDebugMode ()
334343 else :
@@ -344,49 +353,29 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
344353 ImpressionSynchronizer (apis ['impressions' ], storages ['impressions' ],
345354 cfg ['impressionsBulkSize' ]),
346355 EventSynchronizer (apis ['events' ], storages ['events' ], cfg ['eventsBulkSize' ]),
347- ImpressionsCountSynchronizer (apis ['impressions' ], imp_manager ),
356+ ImpressionsCountSynchronizer (apis ['impressions' ], imp_counter ),
357+ unique_keys_synchronizer ,
358+ clear_filter_sync
348359 )
349360
350- if cfg ['impressionsMode' ] == ImpressionsMode .NONE :
351- synchronizers .set_none_sync (
352- UniqueKeysSynchronizer (InMemorySenderAdapter (apis ['telemetry' ]), imp_strategy ._unique_keys_tracker ),
353- ClearFilterSynchronizer (imp_strategy ._unique_keys_tracker )
354- )
355- tasks = SplitTasks (
356- SplitSynchronizationTask (
357- synchronizers .split_sync .synchronize_splits ,
358- cfg ['featuresRefreshRate' ],
359- ),
360- SegmentSynchronizationTask (
361- synchronizers .segment_sync .synchronize_segments ,
362- cfg ['segmentsRefreshRate' ],
363- ),
364- ImpressionsSyncTask (
365- synchronizers .impressions_sync .synchronize_impressions ,
366- cfg ['impressionsRefreshRate' ],
367- ),
368- EventsSyncTask (synchronizers .events_sync .synchronize_events , cfg ['eventsPushRate' ]),
369- ImpressionsCountSyncTask (synchronizers .impressions_count_sync .synchronize_counters ),
370- UniqueKeysSyncTask (synchronizers .unique_keys_sync .SendAll ),
371- ClearFilterSyncTask (synchronizers .clear_filter_sync .clearAll )
372- )
373- else :
374- tasks = SplitTasks (
375- SplitSynchronizationTask (
376- synchronizers .split_sync .synchronize_splits ,
377- cfg ['featuresRefreshRate' ],
378- ),
379- SegmentSynchronizationTask (
380- synchronizers .segment_sync .synchronize_segments ,
381- cfg ['segmentsRefreshRate' ],
382- ),
383- ImpressionsSyncTask (
384- synchronizers .impressions_sync .synchronize_impressions ,
385- cfg ['impressionsRefreshRate' ],
386- ),
387- EventsSyncTask (synchronizers .events_sync .synchronize_events , cfg ['eventsPushRate' ]),
388- ImpressionsCountSyncTask (synchronizers .impressions_count_sync .synchronize_counters ),
389- )
361+ tasks = SplitTasks (
362+ SplitSynchronizationTask (
363+ synchronizers .split_sync .synchronize_splits ,
364+ cfg ['featuresRefreshRate' ],
365+ ),
366+ SegmentSynchronizationTask (
367+ synchronizers .segment_sync .synchronize_segments ,
368+ cfg ['segmentsRefreshRate' ],
369+ ),
370+ ImpressionsSyncTask (
371+ synchronizers .impressions_sync .synchronize_impressions ,
372+ cfg ['impressionsRefreshRate' ],
373+ ),
374+ EventsSyncTask (synchronizers .events_sync .synchronize_events , cfg ['eventsPushRate' ]),
375+ ImpressionsCountSyncTask (synchronizers .impressions_count_sync .synchronize_counters ),
376+ unique_keys_task ,
377+ clear_filter_task
378+ )
390379
391380 synchronizer = Synchronizer (synchronizers , tasks )
392381
@@ -398,8 +387,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
398387
399388 storages ['events' ].set_queue_full_hook (tasks .events_task .flush )
400389 storages ['impressions' ].set_queue_full_hook (tasks .impressions_task .flush )
401- if cfg ['impressionsMode' ] == ImpressionsMode .NONE :
402- imp_strategy ._unique_keys_tracker .set_queue_full_hook (tasks ._unique_keys_task .flush )
403390
404391 recorder = StandardRecorder (
405392 imp_manager ,
0 commit comments