Skip to content
This repository was archived by the owner on Oct 27, 2025. It is now read-only.

Commit 8d66861

Browse files
author
b0dea
committed
Do better checks when searching for jobs to resume on restart + add more unit tests
1 parent 24af00a commit 8d66861

File tree

3 files changed

+108
-12
lines changed

3 files changed

+108
-12
lines changed

src/job/index.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ class Job<T extends JobAttributesData = JobAttributesData> {
186186
attrs: JobAttributes<T>;
187187

188188
constructor(options: Modify<JobAttributes<T>, { _id?: mongodb.ObjectId }>) {
189-
const { pulse, type, nextRunAt, ...args } = options ?? {};
189+
const { pulse, type, nextRunAt, repeatAt, repeatInterval, lastFinishedAt, ...args } = options ?? {};
190190

191191
// Save Pulse instance
192192
this.pulse = pulse;
@@ -213,7 +213,10 @@ class Job<T extends JobAttributesData = JobAttributesData> {
213213
name: attrs.name || '',
214214
priority: attrs.priority,
215215
type: type || 'once',
216-
nextRunAt: nextRunAt || new Date(),
216+
// if a job that's non-recurring has a lastFinishedAt (finished the job), do not default nextRunAt to now
217+
// only if it will be defaulted either by explicitly setting it or by computing it computeNextRunAt
218+
nextRunAt:
219+
repeatAt || repeatInterval ? nextRunAt || new Date() : !lastFinishedAt ? nextRunAt || new Date() : nextRunAt,
217220
};
218221
}
219222

src/pulse/resume-on-restart.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,14 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res
2828
{
2929
lockedAt: { $exists: true },
3030
nextRunAt: { $ne: null },
31-
$or: [{ $expr: { $eq: ['$runCount', '$finishedCount'] } }, { lastFinishedAt: { $exists: false } }],
31+
$or: [
32+
{ $expr: { $eq: ['$runCount', '$finishedCount'] } },
33+
{ $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] },
34+
],
3235
},
3336
{
3437
lockedAt: { $exists: false },
35-
lastFinishedAt: { $exists: false },
38+
$or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }],
3639
nextRunAt: { $lte: now, $ne: null },
3740
},
3841
],
@@ -51,8 +54,10 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res
5154
// Handling for recurring jobs using repeatInterval or repeatAt
5255
this._collection
5356
.find({
54-
$or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }],
55-
nextRunAt: { $exists: false },
57+
$and: [
58+
{ $or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }] },
59+
{ $or: [{ nextRunAt: { $lte: now } }, { nextRunAt: { $exists: false } }, { nextRunAt: null }] },
60+
],
5661
})
5762
.toArray()
5863
.then((jobs) => {

test/unit/pulse.spec.ts

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,19 +215,30 @@ describe('Test Pulse', () => {
215215
expect(globalPulseInstance._resumeOnRestart).toBeFalsy();
216216
});
217217

218+
test('should not reschedule successfully finished non-recurring jobs', async () => {
219+
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
220+
job.attrs.lastFinishedAt = new Date();
221+
job.attrs.nextRunAt = null;
222+
await job.save();
223+
224+
await globalPulseInstance.resumeOnRestart();
225+
226+
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
227+
expect(updatedJob.attrs.nextRunAt).toBeNull();
228+
});
229+
218230
test('should resume non-recurring jobs on restart', async () => {
219231
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
220-
job.attrs.nextRunAt = new Date(Date.now() - 1000); // Set nextRunAt in the past
232+
job.attrs.nextRunAt = new Date(Date.now() - 1000);
221233
await job.save();
222234

223235
await globalPulseInstance.resumeOnRestart();
224236

225237
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
226-
const now = Date.now();
227-
expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(now - 100); // Allow a 100ms buffer
238+
expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(Date.now() - 100);
228239
});
229240

230-
test('should resume recurring jobs on restart', async () => {
241+
test('should resume recurring jobs on restart - interval', async () => {
231242
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
232243
job.attrs.repeatInterval = '5 minutes';
233244
job.attrs.nextRunAt = null;
@@ -236,9 +247,31 @@ describe('Test Pulse', () => {
236247
await globalPulseInstance.resumeOnRestart();
237248

238249
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
239-
const now = Date.now();
240250
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
241-
expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(now - 100); // Allow a 100ms buffer
251+
});
252+
253+
test('should resume recurring jobs on restart - cron', async () => {
254+
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
255+
job.attrs.repeatInterval = '*/5 * * * *';
256+
job.attrs.nextRunAt = null;
257+
await job.save();
258+
259+
await globalPulseInstance.resumeOnRestart();
260+
261+
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
262+
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
263+
});
264+
265+
test('should resume recurring jobs on restart - repeatAt', async () => {
266+
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
267+
job.attrs.repeatAt = '1:00 am';
268+
job.attrs.nextRunAt = null;
269+
await job.save();
270+
271+
await globalPulseInstance.resumeOnRestart();
272+
273+
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
274+
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
242275
});
243276

244277
test('should not modify jobs with existing nextRunAt', async () => {
@@ -252,6 +285,61 @@ describe('Test Pulse', () => {
252285
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
253286
expect(updatedJob.attrs.nextRunAt?.getTime()).toEqual(futureDate.getTime());
254287
});
288+
289+
test('should handle jobs that started but have not finished (non-recurring)', async () => {
290+
const job = globalPulseInstance.create('processData', { data: 'sample' });
291+
job.attrs.nextRunAt = null;
292+
job.attrs.lockedAt = new Date();
293+
await job.save();
294+
295+
await globalPulseInstance.resumeOnRestart();
296+
297+
const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0];
298+
299+
const now = Date.now();
300+
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
301+
expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(now - 100);
302+
});
303+
304+
test('should handle recurring jobs that started but have not finished', async () => {
305+
const job = globalPulseInstance.create('processData', { data: 'sample' });
306+
job.attrs.repeatInterval = '10 minutes';
307+
job.attrs.lockedAt = new Date();
308+
job.attrs.nextRunAt = new Date(Date.now() + 10000); // Next run in 10 seconds
309+
await job.save();
310+
311+
await globalPulseInstance.resumeOnRestart();
312+
313+
const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0];
314+
expect(updatedJob.attrs.lockedAt).not.toBeNull(); // Job remains locked
315+
expect(updatedJob.attrs.nextRunAt).not.toBeNull(); // Scheduling intact
316+
});
317+
318+
test('should handle interrupted recurring jobs after server recovery', async () => {
319+
const job = globalPulseInstance.create('processData', { data: 'sample' });
320+
job.attrs.repeatInterval = '5 minutes';
321+
job.attrs.lastModifiedBy = 'server_crash';
322+
job.attrs.nextRunAt = null;
323+
await job.save();
324+
325+
await globalPulseInstance.resumeOnRestart();
326+
327+
const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0];
328+
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
329+
expect(updatedJob.attrs.lastModifiedBy).not.toEqual('server_crash');
330+
});
331+
332+
test('should not modify non-recurring jobs with lastFinishedAt in the past', async () => {
333+
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
334+
job.attrs.lastFinishedAt = new Date(Date.now() - 10000); // Finished 10 seconds ago
335+
job.attrs.nextRunAt = null;
336+
await job.save();
337+
338+
await globalPulseInstance.resumeOnRestart();
339+
340+
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
341+
expect(updatedJob.attrs.nextRunAt).toBeNull(); // Job remains finished
342+
});
255343
});
256344
});
257345

0 commit comments

Comments
 (0)