@@ -29,6 +29,8 @@ import {
2929} from './types' ;
3030import enumList from './enum' ;
3131
32+ const delay = time => new Promise ( resolve => setTimeout ( resolve , time * 1000 ) ) ;
33+
3234export default class StepFunctionsOfflinePlugin implements Plugin {
3335 private location : string ;
3436
@@ -277,32 +279,29 @@ export default class StepFunctionsOfflinePlugin implements Plugin {
277279 return { handler : handlerName , filePath } ;
278280 }
279281
280- buildStepWorkFlow ( ) : Promise < void | Callback > {
282+ buildStepWorkFlow ( ) : ReturnType < StepFunctionsOfflinePlugin [ 'process' ] > {
281283 this . cliLog ( 'Building StepWorkFlow' ) ;
282284 if ( ! this . stateDefinition ) throw new Error ( 'Missing state definition' ) ;
283- this . contextObject = this . createContextObject ( this . stateDefinition . States , false ) ;
285+ const event = this . loadedEventFile ?? { } ;
286+ if ( ! this . stateDefinition ?. StartAt ) {
287+ throw new Error ( 'Missing `startAt` in definition' ) ;
288+ }
289+ this . contextObject = this . createContextObject (
290+ this . stateDefinition . States ,
291+ this . stateDefinition . StartAt ,
292+ event ,
293+ false
294+ ) ;
284295 this . states = this . stateDefinition . States ;
285-
286- return Promise . resolve ( ) . then ( ( ) => {
287- if ( ! this . stateDefinition ?. StartAt ) {
288- throw new Error ( 'Missing `startAt` in definition' ) ;
289- }
290- // if (!this.loadedEventFile) throw new Error('Was unable to load event file');
291- return this . process (
292- this . states [ this . stateDefinition . StartAt ] ,
293- this . stateDefinition . StartAt ,
294- this . loadedEventFile ?? { } ,
295- false
296- ) ;
297- } ) ;
296+ return this . process ( this . states [ this . stateDefinition . StartAt ] , this . stateDefinition . StartAt , event , false ) ;
298297 }
299298
300299 async buildSubStepWorkFlow (
301300 stateDefinition : StateMachine ,
302301 event : Event
303302 ) : Promise < ReturnType < StepFunctionsOfflinePlugin [ 'process' ] > > {
304303 this . cliLog ( 'Building Iterator StepWorkFlow' ) ;
305- this . subContextObject = this . createContextObject ( stateDefinition . States , true ) ;
304+ this . subContextObject = this . createContextObject ( stateDefinition . States , stateDefinition . StartAt , event , true ) ;
306305
307306 if ( ! stateDefinition . States ) return ;
308307 const state = stateDefinition . States [ stateDefinition . StartAt ] ;
@@ -363,16 +362,20 @@ export default class StepFunctionsOfflinePlugin implements Plugin {
363362 if ( ! mod ) return ;
364363 let res ;
365364 let err ;
366- const done = ( error , val ) => {
367- res = val ;
368- err = error ;
369- } ;
370- const functionRes = await mod ( event , contextObject , done ) ;
371- if ( functionRes ) res = functionRes ;
372365 try {
373- if ( typeof res === 'string' ) res = JSON . parse ( res ) ;
374- } catch ( err ) { }
375- if ( res ) return contextObject . done ( err , res || { } ) ;
366+ const done = ( e , val ) => {
367+ res = val ;
368+ err = e ;
369+ } ;
370+ const functionRes = await mod ( event , contextObject , done ) ;
371+ if ( functionRes ) res = functionRes ;
372+ try {
373+ if ( typeof res === 'string' ) res = JSON . parse ( res ) ;
374+ } catch ( err ) { }
375+ } catch ( error ) {
376+ err = error ;
377+ }
378+ return contextObject . done ( err , res || { } ) ;
376379 } ) ;
377380 }
378381 return func ( event , contextObject , contextObject . done ) ;
@@ -657,12 +660,43 @@ export default class StepFunctionsOfflinePlugin implements Plugin {
657660 return waitTimer ;
658661 }
659662
660- createContextObject ( states : StateMachine [ 'States' ] , isSubContext : boolean ) : ContextObject {
663+ createContextObject (
664+ states : StateMachine [ 'States' ] ,
665+ name : string ,
666+ originalEvent : Event ,
667+ isSubContext : boolean
668+ ) : ContextObject {
669+ let attempt = 0 ;
661670 const cb = ( err : Maybe < Error > , result ?: Event ) => {
662- if ( err ) {
671+ if ( ! notEmpty ( this . currentState ) ) return ;
672+ if ( err && ! isType ( 'Task' ) < Task > ( this . currentState ) ) {
663673 throw `Error in function "${ this . currentStateName } ": ${ JSON . stringify ( err ) } ` ;
664674 }
665- if ( ! notEmpty ( this . currentState ) ) return ;
675+ if ( err && isType ( 'Task' ) < Task > ( this . currentState ) ) {
676+ const matchingError = ( this . currentState . Retry ?? [ ] ) . find ( condition =>
677+ condition . ErrorEquals . includes ( 'HandledError' )
678+ ) ;
679+ if ( ! matchingError ) throw `Error in function "${ this . currentStateName } ": ${ JSON . stringify ( err ) } ` ;
680+ attempt += 1 ;
681+ if ( attempt < ( matchingError . MaxAttempts ?? 0 ) ) {
682+ if ( matchingError . IntervalSeconds !== undefined && matchingError . IntervalSeconds !== 0 ) {
683+ const backoffRate = matchingError ?. BackoffRate ?? 2 ;
684+ const fullDelay =
685+ attempt === 1
686+ ? matchingError . IntervalSeconds
687+ : matchingError . IntervalSeconds * ( attempt - 1 ) * backoffRate ;
688+ console . log ( `Delaying ${ fullDelay } seconds for execution #${ attempt + 1 } of state ${ name } ` ) ;
689+ return delay ( fullDelay ) . then ( ( ) => this . process ( states [ name ] , name , originalEvent , isSubContext ) ) ;
690+ }
691+ return this . process ( states [ name ] , name , originalEvent , isSubContext ) ;
692+ }
693+ const newErr = `Error in function "${ this . currentStateName } " after ${ attempt } attempts: ${ JSON . stringify (
694+ this . currentState
695+ ) } - ${ JSON . stringify ( err ) } `;
696+ attempt = 0 ;
697+ throw newErr ;
698+ }
699+ attempt = 0 ;
666700 this . executionLog ( `~~~~~~~~~~~~~~~~~~~~~~~~~~~ ${ this . currentStateName } finished ~~~~~~~~~~~~~~~~~~~~~~~~~~~` ) ;
667701 let state = states ;
668702 if ( ! isNotCompletedState ( this . currentState ) ) return ;
@@ -680,6 +714,7 @@ export default class StepFunctionsOfflinePlugin implements Plugin {
680714 } ;
681715
682716 return {
717+ attempt,
683718 cb,
684719 done : cb ,
685720 succeed : result => cb ( null , result ) ,
0 commit comments