@@ -29,6 +29,8 @@ import {
2929} from './types' ;
3030import enumList from './enum' ;
3131
32+ const delay = time => new Promise ( resolve => setTimeout ( resolve , time * 1000 ) ) ;
33+
3234export default class StepFunctionsOfflinePlugin implements Plugin {
3335 private location : string ;
3436
@@ -277,32 +279,29 @@ export default class StepFunctionsOfflinePlugin implements Plugin {
277279 return { handler : handlerName , filePath } ;
278280 }
279281
280- buildStepWorkFlow ( ) : Promise < void | Callback > {
282+ buildStepWorkFlow ( ) : ReturnType < StepFunctionsOfflinePlugin [ 'process' ] > {
281283 this . cliLog ( 'Building StepWorkFlow' ) ;
282284 if ( ! this . stateDefinition ) throw new Error ( 'Missing state definition' ) ;
283- this . contextObject = this . createContextObject ( this . stateDefinition . States , false ) ;
285+ const event = this . loadedEventFile ?? { } ;
286+ if ( ! this . stateDefinition ?. StartAt ) {
287+ throw new Error ( 'Missing `startAt` in definition' ) ;
288+ }
289+ this . contextObject = this . createContextObject (
290+ this . stateDefinition . States ,
291+ this . stateDefinition . StartAt ,
292+ event ,
293+ false
294+ ) ;
284295 this . states = this . stateDefinition . States ;
285-
286- return Promise . resolve ( ) . then ( ( ) => {
287- if ( ! this . stateDefinition ?. StartAt ) {
288- throw new Error ( 'Missing `startAt` in definition' ) ;
289- }
290- // if (!this.loadedEventFile) throw new Error('Was unable to load event file');
291- return this . process (
292- this . states [ this . stateDefinition . StartAt ] ,
293- this . stateDefinition . StartAt ,
294- this . loadedEventFile ?? { } ,
295- false
296- ) ;
297- } ) ;
296+ return this . process ( this . states [ this . stateDefinition . StartAt ] , this . stateDefinition . StartAt , event , false ) ;
298297 }
299298
300299 async buildSubStepWorkFlow (
301300 stateDefinition : StateMachine ,
302301 event : Event
303302 ) : Promise < ReturnType < StepFunctionsOfflinePlugin [ 'process' ] > > {
304303 this . cliLog ( 'Building Iterator StepWorkFlow' ) ;
305- this . subContextObject = this . createContextObject ( stateDefinition . States , true ) ;
304+ this . subContextObject = this . createContextObject ( stateDefinition . States , stateDefinition . StartAt , event , true ) ;
306305
307306 if ( ! stateDefinition . States ) return ;
308307 const state = stateDefinition . States [ stateDefinition . StartAt ] ;
@@ -363,16 +362,20 @@ export default class StepFunctionsOfflinePlugin implements Plugin {
363362 if ( ! mod ) return ;
364363 let res ;
365364 let err ;
366- const done = ( error , val ) => {
367- res = val ;
368- err = error ;
369- } ;
370- const functionRes = await mod ( event , contextObject , done ) ;
371- if ( functionRes ) res = functionRes ;
372365 try {
373- if ( typeof res === 'string' ) res = JSON . parse ( res ) ;
374- } catch ( err ) { }
375- if ( res ) return contextObject . done ( err , res || { } ) ;
366+ const done = ( e , val ) => {
367+ res = val ;
368+ err = e ;
369+ } ;
370+ const functionRes = await mod ( event , contextObject , done ) ;
371+ if ( functionRes ) res = functionRes ;
372+ try {
373+ if ( typeof res === 'string' ) res = JSON . parse ( res ) ;
374+ } catch ( err ) { }
375+ } catch ( error ) {
376+ err = error ;
377+ }
378+ return contextObject . done ( err , res || { } ) ;
376379 } ) ;
377380 }
378381 return func ( event , contextObject , contextObject . done ) ;
@@ -514,14 +517,11 @@ export default class StepFunctionsOfflinePlugin implements Plugin {
514517 // works with parameter: seconds, timestamp, timestampPath, secondsPath;
515518 return {
516519 waitState : true ,
517- f : event => {
520+ f : async event => {
518521 const waitTimer = this . _waitState ( event , currentState , stateName ) ;
519522 this . cliLog ( `Wait function ${ stateName } - please wait ${ waitTimer } seconds` ) ;
520- return ( arg1 , arg2 , cb ) => {
521- setTimeout ( ( ) => {
522- cb ( null , event ) ;
523- } , waitTimer * 1000 ) ;
524- } ;
523+ await delay ( waitTimer ) ;
524+ return ( e , context , done ) => done ( null , e || event ) ;
525525 } ,
526526 } ;
527527 }
@@ -657,12 +657,43 @@ export default class StepFunctionsOfflinePlugin implements Plugin {
657657 return waitTimer ;
658658 }
659659
660- createContextObject ( states : StateMachine [ 'States' ] , isSubContext : boolean ) : ContextObject {
660+ createContextObject (
661+ states : StateMachine [ 'States' ] ,
662+ name : string ,
663+ originalEvent : Event ,
664+ isSubContext : boolean
665+ ) : ContextObject {
666+ let attempt = 0 ;
661667 const cb = ( err : Maybe < Error > , result ?: Event ) => {
662- if ( err ) {
668+ if ( ! notEmpty ( this . currentState ) ) return ;
669+ if ( err && ! isType ( 'Task' ) < Task > ( this . currentState ) ) {
663670 throw `Error in function "${ this . currentStateName } ": ${ JSON . stringify ( err ) } ` ;
664671 }
665- if ( ! notEmpty ( this . currentState ) ) return ;
672+ if ( err && isType ( 'Task' ) < Task > ( this . currentState ) ) {
673+ const matchingError = ( this . currentState . Retry ?? [ ] ) . find ( condition =>
674+ condition . ErrorEquals . includes ( 'HandledError' )
675+ ) ;
676+ if ( ! matchingError ) throw `Error in function "${ this . currentStateName } ": ${ JSON . stringify ( err ) } ` ;
677+ attempt += 1 ;
678+ if ( attempt < ( matchingError . MaxAttempts ?? 0 ) ) {
679+ if ( matchingError . IntervalSeconds !== undefined && matchingError . IntervalSeconds !== 0 ) {
680+ const backoffRate = matchingError ?. BackoffRate ?? 2 ;
681+ const fullDelay =
682+ attempt === 1
683+ ? matchingError . IntervalSeconds
684+ : matchingError . IntervalSeconds * ( attempt - 1 ) * backoffRate ;
685+ console . log ( `Delaying ${ fullDelay } seconds for execution #${ attempt + 1 } of state ${ name } ` ) ;
686+ return delay ( fullDelay ) . then ( ( ) => this . process ( states [ name ] , name , originalEvent , isSubContext ) ) ;
687+ }
688+ return this . process ( states [ name ] , name , originalEvent , isSubContext ) ;
689+ }
690+ const newErr = `Error in function "${ this . currentStateName } " after ${ attempt } attempts: ${ JSON . stringify (
691+ this . currentState
692+ ) } - ${ JSON . stringify ( err ) } `;
693+ attempt = 0 ;
694+ throw newErr ;
695+ }
696+ attempt = 0 ;
666697 this . executionLog ( `~~~~~~~~~~~~~~~~~~~~~~~~~~~ ${ this . currentStateName } finished ~~~~~~~~~~~~~~~~~~~~~~~~~~~` ) ;
667698 let state = states ;
668699 if ( ! isNotCompletedState ( this . currentState ) ) return ;
@@ -680,6 +711,7 @@ export default class StepFunctionsOfflinePlugin implements Plugin {
680711 } ;
681712
682713 return {
714+ attempt,
683715 cb,
684716 done : cb ,
685717 succeed : result => cb ( null , result ) ,
0 commit comments