Skip to content
This repository was archived by the owner on Oct 27, 2025. It is now read-only.

Commit ec10759

Browse files
committed
fix: Fix the resumeOnRestart function
Fix the resumeOnRestart function to handle correctl the recurring jobs + do not run finished non-recurrent jobs on restart + add unit tests
2 parents b04171d + 5a3e1fa commit ec10759

File tree

3 files changed

+183
-5
lines changed

3 files changed

+183
-5
lines changed

src/job/index.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ class Job<T extends JobAttributesData = JobAttributesData> {
186186
attrs: JobAttributes<T>;
187187

188188
constructor(options: Modify<JobAttributes<T>, { _id?: mongodb.ObjectId }>) {
189-
const { pulse, type, nextRunAt, ...args } = options ?? {};
189+
const { pulse, type, nextRunAt, repeatAt, repeatInterval, lastFinishedAt, ...args } = options ?? {};
190190

191191
// Save Pulse instance
192192
this.pulse = pulse;
@@ -213,7 +213,10 @@ class Job<T extends JobAttributesData = JobAttributesData> {
213213
name: attrs.name || '',
214214
priority: attrs.priority,
215215
type: type || 'once',
216-
nextRunAt: nextRunAt || new Date(),
216+
// if a job that's non-recurring has a lastFinishedAt (finished the job), do not default nextRunAt to now
217+
// only if it will be defaulted either by explicitly setting it or by computing it computeNextRunAt
218+
nextRunAt:
219+
repeatAt || repeatInterval ? nextRunAt || new Date() : !lastFinishedAt ? nextRunAt || new Date() : nextRunAt,
217220
};
218221
}
219222

src/pulse/resume-on-restart.ts

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import createDebugger from 'debug';
22
import { Pulse } from '.';
3+
import { Job } from '../job';
34

45
const debug = createDebugger('pulse:resumeOnRestart');
56

@@ -18,18 +19,23 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res
1819

1920
if (this._collection && this._resumeOnRestart) {
2021
const now = new Date();
22+
23+
// Non-recurring jobs
2124
this._collection
2225
.updateMany(
2326
{
2427
$or: [
2528
{
2629
lockedAt: { $exists: true },
2730
nextRunAt: { $ne: null },
28-
$or: [{ $expr: { $eq: ['$runCount', '$finishedCount'] } }, { lastFinishedAt: { $exists: false } }],
31+
$or: [
32+
{ $expr: { $eq: ['$runCount', '$finishedCount'] } },
33+
{ $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] },
34+
],
2935
},
3036
{
3137
lockedAt: { $exists: false },
32-
lastFinishedAt: { $exists: false },
38+
$or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }],
3339
nextRunAt: { $lte: now, $ne: null },
3440
},
3541
],
@@ -41,7 +47,50 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res
4147
)
4248
.then((result) => {
4349
if (result.modifiedCount > 0) {
44-
debug('resuming unfinished %d jobs(%s)', result.modifiedCount, now.toISOString());
50+
debug('Resumed %d unfinished standard jobs (%s)', result.modifiedCount, now.toISOString());
51+
}
52+
});
53+
54+
// Recurring jobs
55+
this._collection
56+
.find({
57+
$and: [
58+
{ $or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }] },
59+
{ $or: [{ nextRunAt: { $lte: now } }, { nextRunAt: { $exists: false } }, { nextRunAt: null }] },
60+
],
61+
})
62+
.toArray()
63+
.then((jobs) => {
64+
const updates = jobs.map((jobData) => {
65+
const job = new Job({
66+
pulse: this,
67+
name: jobData.name || '',
68+
data: jobData.data || {},
69+
type: jobData.type || 'normal',
70+
priority: jobData.priority || 'normal',
71+
shouldSaveResult: jobData.shouldSaveResult || false,
72+
attempts: jobData.attempts || 0,
73+
backoff: jobData.backoff,
74+
...jobData,
75+
});
76+
77+
job.computeNextRunAt();
78+
79+
return this._collection.updateOne(
80+
{ _id: job.attrs._id },
81+
{
82+
$set: { nextRunAt: job.attrs.nextRunAt },
83+
$unset: { lockedAt: undefined, lastModifiedBy: undefined, lastRunAt: undefined },
84+
}
85+
);
86+
});
87+
88+
return Promise.all(updates);
89+
})
90+
.then((results) => {
91+
const modifiedCount = results.filter((res) => res.modifiedCount > 0).length;
92+
if (modifiedCount > 0) {
93+
debug('Resumed %d recurring jobs (%s)', modifiedCount, now.toISOString());
4594
}
4695
});
4796
}

test/unit/pulse.spec.ts

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,132 @@ describe('Test Pulse', () => {
218218
test('returns itself', () => {
219219
expect(globalPulseInstance.resumeOnRestart(false)).toEqual(globalPulseInstance);
220220
});
221+
222+
test('should not reschedule successfully finished non-recurring jobs', async () => {
223+
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
224+
job.attrs.lastFinishedAt = new Date();
225+
job.attrs.nextRunAt = null;
226+
await job.save();
227+
228+
await globalPulseInstance.resumeOnRestart();
229+
230+
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
231+
expect(updatedJob.attrs.nextRunAt).toBeNull();
232+
});
233+
234+
test('should resume non-recurring jobs on restart', async () => {
235+
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
236+
job.attrs.nextRunAt = new Date(Date.now() - 1000);
237+
await job.save();
238+
239+
await globalPulseInstance.resumeOnRestart();
240+
241+
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
242+
expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(Date.now() - 100);
243+
});
244+
245+
test('should resume recurring jobs on restart - interval', async () => {
246+
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
247+
job.attrs.repeatInterval = '5 minutes';
248+
job.attrs.nextRunAt = null;
249+
await job.save();
250+
251+
await globalPulseInstance.resumeOnRestart();
252+
253+
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
254+
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
255+
});
256+
257+
test('should resume recurring jobs on restart - cron', async () => {
258+
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
259+
job.attrs.repeatInterval = '*/5 * * * *';
260+
job.attrs.nextRunAt = null;
261+
await job.save();
262+
263+
await globalPulseInstance.resumeOnRestart();
264+
265+
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
266+
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
267+
});
268+
269+
test('should resume recurring jobs on restart - repeatAt', async () => {
270+
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
271+
job.attrs.repeatAt = '1:00 am';
272+
job.attrs.nextRunAt = null;
273+
await job.save();
274+
275+
await globalPulseInstance.resumeOnRestart();
276+
277+
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
278+
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
279+
});
280+
281+
test('should not modify jobs with existing nextRunAt', async () => {
282+
const futureDate = new Date(Date.now() + 60 * 60 * 1000);
283+
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
284+
job.attrs.nextRunAt = futureDate;
285+
await job.save();
286+
287+
await globalPulseInstance.resumeOnRestart();
288+
289+
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
290+
expect(updatedJob.attrs.nextRunAt?.getTime()).toEqual(futureDate.getTime());
291+
});
292+
293+
test('should handle jobs that started but have not finished (non-recurring)', async () => {
294+
const job = globalPulseInstance.create('processData', { data: 'sample' });
295+
job.attrs.nextRunAt = null;
296+
job.attrs.lockedAt = new Date();
297+
await job.save();
298+
299+
await globalPulseInstance.resumeOnRestart();
300+
301+
const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0];
302+
303+
const now = Date.now();
304+
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
305+
expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(now - 100);
306+
});
307+
308+
test('should handle recurring jobs that started but have not finished', async () => {
309+
const job = globalPulseInstance.create('processData', { data: 'sample' });
310+
job.attrs.repeatInterval = '10 minutes';
311+
job.attrs.lockedAt = new Date();
312+
job.attrs.nextRunAt = new Date(Date.now() + 10000);
313+
await job.save();
314+
315+
await globalPulseInstance.resumeOnRestart();
316+
317+
const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0];
318+
expect(updatedJob.attrs.lockedAt).not.toBeNull();
319+
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
320+
});
321+
322+
test('should handle interrupted recurring jobs after server recovery', async () => {
323+
const job = globalPulseInstance.create('processData', { data: 'sample' });
324+
job.attrs.repeatInterval = '5 minutes';
325+
job.attrs.lastModifiedBy = 'server_crash';
326+
job.attrs.nextRunAt = null;
327+
await job.save();
328+
329+
await globalPulseInstance.resumeOnRestart();
330+
331+
const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0];
332+
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
333+
expect(updatedJob.attrs.lastModifiedBy).not.toEqual('server_crash');
334+
});
335+
336+
test('should not modify non-recurring jobs with lastFinishedAt in the past', async () => {
337+
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
338+
job.attrs.lastFinishedAt = new Date(Date.now() - 10000);
339+
job.attrs.nextRunAt = null;
340+
await job.save();
341+
342+
await globalPulseInstance.resumeOnRestart();
343+
344+
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
345+
expect(updatedJob.attrs.nextRunAt).toBeNull();
346+
});
221347
});
222348
});
223349

0 commit comments

Comments
 (0)