11import { ISegmentChangesFetcher } from '../fetchers/types' ;
22import { ISegmentsCacheBase } from '../../../storages/types' ;
33import { IReadinessManager } from '../../../readiness/types' ;
4- import { MaybeThenable } from '../../../dtos/types' ;
5- import { findIndex } from '../../../utils/lang' ;
64import { SDK_SEGMENTS_ARRIVED } from '../../../readiness/constants' ;
75import { ILogger } from '../../../logger/types' ;
86import { LOG_PREFIX_INSTANTIATION , LOG_PREFIX_SYNC_SEGMENTS } from '../../../logger/constants' ;
9- import { thenable } from '../../../utils/promise/thenable' ;
107
118type ISegmentChangesUpdater = ( fetchOnlyNew ?: boolean , segmentName ?: string , noCache ?: boolean , till ?: number ) => Promise < boolean >
129
@@ -30,31 +27,22 @@ export function segmentChangesUpdaterFactory(
3027
3128 let readyOnAlreadyExistentState = true ;
3229
33- function updateSegment ( segmentName : string , noCache ?: boolean , till ?: number , fetchOnlyNew ?: boolean ) {
30+ function updateSegment ( segmentName : string , noCache ?: boolean , till ?: number , fetchOnlyNew ?: boolean ) : Promise < boolean > {
3431 log . debug ( `${ LOG_PREFIX_SYNC_SEGMENTS } Processing segment ${ segmentName } ` ) ;
3532 let sincePromise = Promise . resolve ( segments . getChangeNumber ( segmentName ) ) ;
3633
3734 return sincePromise . then ( since => {
3835 // if fetchOnlyNew flag, avoid processing already fetched segments
39- if ( fetchOnlyNew && since !== - 1 ) return - 1 ;
40-
41- return segmentChangesFetcher ( since , segmentName , noCache , till ) . then ( function ( changes ) {
42- let changeNumber = - 1 ;
43- const results : MaybeThenable < boolean | void > [ ] = [ ] ;
44- changes . forEach ( x => {
45- if ( x . added . length > 0 ) results . push ( segments . addToSegment ( segmentName , x . added ) ) ;
46- if ( x . removed . length > 0 ) results . push ( segments . removeFromSegment ( segmentName , x . removed ) ) ;
47- if ( x . added . length > 0 || x . removed . length > 0 ) {
48- results . push ( segments . setChangeNumber ( segmentName , x . till ) ) ;
49- changeNumber = x . till ;
50- }
51-
52- log . debug ( `${ LOG_PREFIX_SYNC_SEGMENTS } Processed ${ segmentName } with till = ${ x . till } . Added: ${ x . added . length } . Removed: ${ x . removed . length } ` ) ;
36+ return fetchOnlyNew && since !== - 1 ?
37+ false :
38+ segmentChangesFetcher ( since , segmentName , noCache , till ) . then ( ( changes ) => {
39+ return Promise . all ( changes . map ( x => {
40+ log . debug ( `${ LOG_PREFIX_SYNC_SEGMENTS } Processing ${ segmentName } with till = ${ x . till } . Added: ${ x . added . length } . Removed: ${ x . removed . length } ` ) ;
41+ return segments . update ( x . name , x . added , x . removed , x . till ) ;
42+ } ) ) . then ( ( updates ) => {
43+ return updates . some ( update => update ) ;
44+ } ) ;
5345 } ) ;
54- // If at least one storage operation result is a promise, join all in a single promise.
55- if ( results . some ( result => thenable ( result ) ) ) return Promise . all ( results ) . then ( ( ) => changeNumber ) ;
56- return changeNumber ;
57- } ) ;
5846 } ) ;
5947 }
6048 /**
@@ -75,16 +63,12 @@ export function segmentChangesUpdaterFactory(
7563 let segmentsPromise = Promise . resolve ( segmentName ? [ segmentName ] : segments . getRegisteredSegments ( ) ) ;
7664
7765 return segmentsPromise . then ( segmentNames => {
78- // Async fetchers are collected here.
79- const updaters : Promise < number > [ ] = [ ] ;
80-
81- for ( let index = 0 ; index < segmentNames . length ; index ++ ) {
82- updaters . push ( updateSegment ( segmentNames [ index ] , noCache , till , fetchOnlyNew ) ) ;
83- }
66+ // Async fetchers
67+ const updaters = segmentNames . map ( segmentName => updateSegment ( segmentName , noCache , till , fetchOnlyNew ) ) ;
8468
8569 return Promise . all ( updaters ) . then ( shouldUpdateFlags => {
8670 // if at least one segment fetch succeeded, mark segments ready
87- if ( findIndex ( shouldUpdateFlags , v => v !== - 1 ) !== - 1 || readyOnAlreadyExistentState ) {
71+ if ( shouldUpdateFlags . some ( update => update ) || readyOnAlreadyExistentState ) {
8872 readyOnAlreadyExistentState = false ;
8973 if ( readiness ) readiness . segments . emit ( SDK_SEGMENTS_ARRIVED ) ;
9074 }
0 commit comments