11import createDebugger from 'debug' ;
22import { Pulse } from '.' ;
3- import { Job } from '../job' ;
43
54const debug = createDebugger ( 'pulse:resumeOnRestart' ) ;
65
@@ -19,48 +18,19 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res
1918
2019 if ( this . _collection && this . _resumeOnRestart ) {
2120 const now = new Date ( ) ;
22-
23- // Non-recurring jobs
2421 this . _collection
2522 . updateMany (
2623 {
27- $and : [
28- { repeatInterval : { $exists : false } } , // Ensure the job is not recurring (no repeatInterval)
29- { repeatAt : { $exists : false } } , // Ensure the job is not recurring (no repeatAt)
24+ $or : [
25+ {
26+ lockedAt : { $exists : true } ,
27+ nextRunAt : { $ne : null } ,
28+ $or : [ { $expr : { $eq : [ '$runCount' , '$finishedCount' ] } } , { lastFinishedAt : { $exists : false } } ] ,
29+ } ,
3030 {
31- $or : [
32- {
33- lockedAt : { $exists : true } , // Locked jobs (interrupted or in-progress)
34- $and : [
35- {
36- $or : [
37- { nextRunAt : { $lte : now , $ne : null } } , // Overdue jobs
38- { nextRunAt : { $exists : false } } , // Jobs missing nextRunAt
39- { nextRunAt : null } , // Jobs explicitly set to null
40- ] ,
41- } ,
42- {
43- $or : [
44- { $expr : { $eq : [ '$runCount' , '$finishedCount' ] } } , // Jobs finished but stuck due to locking
45- { $or : [ { lastFinishedAt : { $exists : false } } , { lastFinishedAt : null } ] } , // Jobs that were not finished
46- ] ,
47- } ,
48- ] ,
49- } ,
50- {
51- lockedAt : { $exists : false } , // Unlocked jobs (not in-progress)
52- $and : [
53- {
54- $or : [
55- { nextRunAt : { $lte : now , $ne : null } } , // Overdue jobs
56- { nextRunAt : { $exists : false } } , // Jobs missing nextRunAt
57- { nextRunAt : null } , // Jobs explicitly set to null
58- ] ,
59- } ,
60- { $or : [ { lastFinishedAt : { $exists : false } } , { lastFinishedAt : null } ] } , // Jobs not finished
61- ] ,
62- } ,
63- ] ,
31+ lockedAt : { $exists : false } ,
32+ lastFinishedAt : { $exists : false } ,
33+ nextRunAt : { $lte : now , $ne : null } ,
6434 } ,
6535 ] ,
6636 } ,
@@ -71,63 +41,7 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res
7141 )
7242 . then ( ( result ) => {
7343 if ( result . modifiedCount > 0 ) {
74- debug ( 'Resumed %d unfinished standard jobs (%s)' , result . modifiedCount , now . toISOString ( ) ) ;
75- }
76- } ) ;
77-
78- // Recurring jobs
79- this . _collection
80- . find ( {
81- $and : [
82- { $or : [ { repeatInterval : { $exists : true } } , { repeatAt : { $exists : true } } ] } , // Recurring jobs
83- {
84- $or : [
85- { nextRunAt : { $lte : now } } , // Overdue jobs
86- { nextRunAt : { $exists : false } } , // Jobs missing nextRunAt
87- { nextRunAt : null } , // Jobs explicitly set to null
88- ] ,
89- } ,
90- {
91- $or : [
92- { lastFinishedAt : { $exists : false } } , // Jobs never run
93- { lastFinishedAt : { $lte : now } } , // Jobs finished in the past
94- { lastFinishedAt : null } , // Jobs explicitly set to null
95- ] ,
96- } ,
97- ] ,
98- } )
99- . toArray ( )
100- . then ( ( jobs ) => {
101- const updates = jobs . map ( ( jobData ) => {
102- const job = new Job ( {
103- pulse : this ,
104- name : jobData . name || '' ,
105- data : jobData . data || { } ,
106- type : jobData . type || 'normal' ,
107- priority : jobData . priority || 'normal' ,
108- shouldSaveResult : jobData . shouldSaveResult || false ,
109- attempts : jobData . attempts || 0 ,
110- backoff : jobData . backoff ,
111- ...jobData ,
112- } ) ;
113-
114- job . computeNextRunAt ( ) ;
115-
116- return this . _collection . updateOne (
117- { _id : job . attrs . _id } ,
118- {
119- $set : { nextRunAt : job . attrs . nextRunAt } ,
120- $unset : { lockedAt : undefined , lastModifiedBy : undefined , lastRunAt : undefined } ,
121- }
122- ) ;
123- } ) ;
124-
125- return Promise . all ( updates ) ;
126- } )
127- . then ( ( results ) => {
128- const modifiedCount = results . filter ( ( res ) => res . modifiedCount > 0 ) . length ;
129- if ( modifiedCount > 0 ) {
130- debug ( 'Resumed %d recurring jobs (%s)' , modifiedCount , now . toISOString ( ) ) ;
44+ debug ( 'resuming unfinished %d jobs(%s)' , result . modifiedCount , now . toISOString ( ) ) ;
13145 }
13246 } ) ;
13347 }
0 commit comments