@@ -206,7 +206,41 @@ export class ActionManager {
206206 } )
207207 }
208208
209- private async updateActionStatuses (
209+ /**
210+ * Mark actions with the given status.
211+ * @param actions
212+ * @param transaction
213+ * @param status
214+ * @returns updated actions
215+ * @throws error if the update fails
216+ */
217+ private async markActions (
218+ actions : Action [ ] ,
219+ transaction : Transaction ,
220+ status : ActionStatus ,
221+ ) : Promise < Action [ ] > {
222+ const ids = actions . map ( ( action ) => action . id )
223+ const [ , updatedActions ] = await this . models . Action . update (
224+ {
225+ status,
226+ } ,
227+ {
228+ where : { id : ids } ,
229+ returning : true ,
230+ transaction,
231+ } ,
232+ )
233+ return updatedActions
234+ }
235+
236+ /**
237+ * Update the action statuses from the results provided by execution.
238+ *
239+ * @param results
240+ * @param transaction
241+ * @returns updated actions
242+ */
243+ private async updateActionStatusesWithResults (
210244 results : AllocationResult [ ] ,
211245 transaction : Transaction ,
212246 ) : Promise < Action [ ] > {
@@ -255,12 +289,14 @@ export class ActionManager {
255289 protocolNetwork,
256290 } )
257291
258- logger . debug ( 'Begin database transaction for executing approved actions' )
292+ logger . debug ( 'Begin executing approved actions' )
293+ let batchStartTime
294+
259295 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
260- await this . models . Action . sequelize ! . transaction (
296+ const prioritizedActions : Action [ ] = await this . models . Action . sequelize ! . transaction (
261297 { isolationLevel : Transaction . ISOLATION_LEVELS . SERIALIZABLE } ,
262298 async ( transaction ) => {
263- const transactionOpenTime = Date . now ( )
299+ batchStartTime = Date . now ( )
264300 let approvedActions
265301 try {
266302 // Execute already approved actions in the order of type and priority.
@@ -271,10 +307,7 @@ export class ActionManager {
271307 const actionTypePriority = [ 'unallocate' , 'reallocate' , 'allocate' ]
272308 approvedActions = (
273309 await this . models . Action . findAll ( {
274- where : {
275- status : ActionStatus . APPROVED ,
276- protocolNetwork,
277- } ,
310+ where : { status : ActionStatus . APPROVED , protocolNetwork } ,
278311 order : [ [ 'priority' , 'ASC' ] ] ,
279312 transaction,
280313 lock : transaction . LOCK . UPDATE ,
@@ -283,6 +316,16 @@ export class ActionManager {
283316 return actionTypePriority . indexOf ( a . type ) - actionTypePriority . indexOf ( b . type )
284317 } )
285318
319+ const pendingActions = await this . models . Action . findAll ( {
320+ where : { status : ActionStatus . PENDING , protocolNetwork } ,
321+ order : [ [ 'priority' , 'ASC' ] ] ,
322+ transaction,
323+ } )
324+ if ( pendingActions . length > 0 ) {
325+ logger . warn ( `${ pendingActions } Actions found in PENDING state when execution began. Was there a crash? \
326+ These indicate that execution was interrupted and will need to be cleared manually.` )
327+ }
328+
286329 if ( approvedActions . length === 0 ) {
287330 logger . debug ( 'No approved actions were found for this network' )
288331 return [ ]
@@ -295,34 +338,67 @@ export class ActionManager {
295338 logger . error ( 'Failed to query approved actions for network' , { error } )
296339 return [ ]
297340 }
298- try {
299- logger . debug ( 'Executing batch action' , {
300- approvedActions ,
301- startTimeMs : Date . now ( ) - transactionOpenTime ,
302- } )
341+ // mark all approved actions as PENDING, this serves as a lock on other processing of them
342+ await this . markActions ( approvedActions , transaction , ActionStatus . PENDING )
343+ return prioritizedActions
344+ } ,
345+ )
303346
304- // This will return all results if successful, if failed it will return the failed actions
305- const allocationManager =
306- this . allocationManagers [ network . specification . networkIdentifier ]
307- const results = await allocationManager . executeBatch ( approvedActions )
347+ try {
348+ logger . debug ( 'Executing batch action' , {
349+ prioritizedActions,
350+ startTimeMs : Date . now ( ) - batchStartTime ,
351+ } )
352+
353+ const allocationManager =
354+ this . allocationManagers [ network . specification . networkIdentifier ]
355+
356+ let results
357+ try {
358+ // This will return all results if successful, if failed it will return the failed actions
359+ results = await allocationManager . executeBatch ( prioritizedActions )
360+ logger . debug ( 'Completed batch action execution' , {
361+ results,
362+ endTimeMs : Date . now ( ) - batchStartTime ,
363+ } )
364+ } catch ( error ) {
365+ // Release the actions from the PENDING state. This means they will be retried again on the next batch execution.
366+ logger . error (
367+ `Error raised during executeBatch, releasing ${ prioritizedActions . length } actions from PENDING state. \
368+ These will be attempted again on the next batch.` ,
369+ error ,
370+ )
371+ await this . models . Action . sequelize ! . transaction (
372+ { isolationLevel : Transaction . ISOLATION_LEVELS . SERIALIZABLE } ,
373+ async ( transaction ) => {
374+ return await this . markActions (
375+ prioritizedActions ,
376+ transaction ,
377+ ActionStatus . APPROVED ,
378+ )
379+ } ,
380+ )
381+ return [ ]
382+ }
308383
309- logger . debug ( 'Completed batch action execution' , {
310- results,
311- endTimeMs : Date . now ( ) - transactionOpenTime ,
312- } )
313- updatedActions = await this . updateActionStatuses ( results , transaction )
384+ // Happy path: execution went well (success or failure but no exceptions). Update the actions with the results.
385+ updatedActions = await this . models . Action . sequelize ! . transaction (
386+ { isolationLevel : Transaction . ISOLATION_LEVELS . SERIALIZABLE } ,
387+ async ( transaction ) => {
388+ return await this . updateActionStatusesWithResults ( results , transaction )
389+ } ,
390+ )
314391
315- logger . debug ( 'Updated action statuses' , {
316- updatedActions,
317- updatedTimeMs : Date . now ( ) - transactionOpenTime ,
318- } )
319- } catch ( error ) {
320- logger . error ( `Failed to execute batch tx on staking contract: ${ error } ` )
321- throw indexerError ( IndexerErrorCode . IE072 , error )
322- }
323- } ,
324- )
325- logger . debug ( 'End database transaction for executing approved actions' )
392+ logger . debug ( 'Updated action statuses' , {
393+ updatedActions,
394+ updatedTimeMs : Date . now ( ) - batchStartTime ,
395+ } )
396+ } catch ( error ) {
397+ logger . error ( `Failed to execute batch tx on staking contract: ${ error } ` )
398+ throw indexerError ( IndexerErrorCode . IE072 , error )
399+ }
400+
401+ logger . debug ( 'End executing approved actions' )
326402 return updatedActions
327403 }
328404
0 commit comments