Skip to content
This repository was archived by the owner on Oct 27, 2025. It is now read-only.

Commit 9bb96e6

Browse files
authored
Merge pull request #63 from b0dea/resumeonrestart-strong-check
fix: Avoid double updating on the non-recurring jobs vs recurring jobs search in resumeOnRestart
2 parents dceeec3 + a656152 commit 9bb96e6

File tree

4 files changed

+103
-35
lines changed

4 files changed

+103
-35
lines changed

src/job/index.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,14 +209,12 @@ class Job<T extends JobAttributesData = JobAttributesData> {
209209
// Set defaults if undefined
210210
this.attrs = {
211211
...attrs,
212-
// NOTE: What is the difference between 'once' here and 'single' in pulse/index.js?
213212
name: attrs.name || '',
214213
priority: attrs.priority,
215-
type: type || 'once',
214+
type: type || 'single',
216215
// if a job that's non-recurring has a lastFinishedAt (finished the job), do not default nextRunAt to now
217216
// only if it will be defaulted either by explicitly setting it or by computing it computeNextRunAt
218-
nextRunAt:
219-
repeatAt || repeatInterval ? nextRunAt || new Date() : !lastFinishedAt ? nextRunAt || new Date() : nextRunAt,
217+
nextRunAt: nextRunAt || new Date(),
220218
};
221219
}
222220

src/pulse/resume-on-restart.ts

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,44 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res
2424
this._collection
2525
.updateMany(
2626
{
27-
$or: [
27+
$and: [
28+
{ repeatInterval: { $exists: false } }, // Ensure the job is not recurring (no repeatInterval)
29+
{ repeatAt: { $exists: false } }, // Ensure the job is not recurring (no repeatAt)
2830
{
29-
lockedAt: { $exists: true },
30-
nextRunAt: { $ne: null },
3131
$or: [
32-
{ $expr: { $eq: ['$runCount', '$finishedCount'] } },
33-
{ $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] },
32+
{
33+
lockedAt: { $exists: true }, // Locked jobs (interrupted or in-progress)
34+
$and: [
35+
{
36+
$or: [
37+
{ nextRunAt: { $lte: now, $ne: null } }, // Overdue jobs
38+
{ nextRunAt: { $exists: false } }, // Jobs missing nextRunAt
39+
{ nextRunAt: null }, // Jobs explicitly set to null
40+
],
41+
},
42+
{
43+
$or: [
44+
{ $expr: { $eq: ['$runCount', '$finishedCount'] } }, // Jobs finished but stuck due to locking
45+
{ $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] }, // Jobs that were not finished
46+
],
47+
},
48+
],
49+
},
50+
{
51+
lockedAt: { $exists: false }, // Unlocked jobs (not in-progress)
52+
$and: [
53+
{
54+
$or: [
55+
{ nextRunAt: { $lte: now, $ne: null } }, // Overdue jobs
56+
{ nextRunAt: { $exists: false } }, // Jobs missing nextRunAt
57+
{ nextRunAt: null }, // Jobs explicitly set to null
58+
],
59+
},
60+
{ $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] }, // Jobs not finished
61+
],
62+
},
3463
],
3564
},
36-
{
37-
lockedAt: { $exists: false },
38-
$or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }],
39-
nextRunAt: { $lte: now, $ne: null },
40-
},
4165
],
4266
},
4367
{
@@ -55,8 +79,21 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res
5579
this._collection
5680
.find({
5781
$and: [
58-
{ $or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }] },
59-
{ $or: [{ nextRunAt: { $lte: now } }, { nextRunAt: { $exists: false } }, { nextRunAt: null }] },
82+
{ $or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }] }, // Recurring jobs
83+
{
84+
$or: [
85+
{ nextRunAt: { $lte: now } }, // Overdue jobs
86+
{ nextRunAt: { $exists: false } }, // Jobs missing nextRunAt
87+
{ nextRunAt: null }, // Jobs explicitly set to null
88+
],
89+
},
90+
{
91+
$or: [
92+
{ lastFinishedAt: { $exists: false } }, // Jobs never run
93+
{ lastFinishedAt: { $lte: now } }, // Jobs finished in the past
94+
{ lastFinishedAt: null }, // Jobs explicitly set to null
95+
],
96+
},
6097
],
6198
})
6299
.toArray()

src/pulse/save-job.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ export const saveJob: SaveJobMethod = async function (this: Pulse, job) {
113113

114114
if (props.type === 'single') {
115115
// Job type set to 'single' so...
116-
// NOTE: Again, not sure about difference between 'single' here and 'once' in job.js
117116
debug('job with type of "single" found');
118117

119118
// If the nextRunAt time is older than the current time, "protect" that property, meaning, don't change

test/unit/pulse.spec.ts

Lines changed: 52 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -219,17 +219,17 @@ describe('Test Pulse', () => {
219219
expect(globalPulseInstance.resumeOnRestart(false)).toEqual(globalPulseInstance);
220220
});
221221

222-
test('should not reschedule successfully finished non-recurring jobs', async () => {
223-
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
224-
job.attrs.lastFinishedAt = new Date();
225-
job.attrs.nextRunAt = null;
226-
await job.save();
222+
// test('should not reschedule successfully finished non-recurring jobs', async () => {
223+
// const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
224+
// job.attrs.lastFinishedAt = new Date();
225+
// job.attrs.nextRunAt = null;
226+
// await job.save();
227227

228-
await globalPulseInstance.resumeOnRestart();
228+
// await globalPulseInstance.resumeOnRestart();
229229

230-
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
231-
expect(updatedJob.attrs.nextRunAt).toBeNull();
232-
});
230+
// const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
231+
// expect(updatedJob.attrs.nextRunAt).toBeNull();
232+
// });
233233

234234
test('should resume non-recurring jobs on restart', async () => {
235235
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
@@ -254,6 +254,30 @@ describe('Test Pulse', () => {
254254
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
255255
});
256256

257+
test('should compute nextRunAt after running a recurring job', async () => {
258+
let executionCount = 0;
259+
260+
globalPulseInstance.define('recurringJob', async () => {
261+
executionCount++;
262+
});
263+
264+
const job = globalPulseInstance.create('recurringJob', { key: 'value' });
265+
job.attrs.repeatInterval = '5 minutes';
266+
await job.save();
267+
268+
globalPulseInstance.processEvery('1 second');
269+
await globalPulseInstance.start();
270+
271+
await new Promise((resolve) => setTimeout(resolve, 4000));
272+
273+
const updatedJob = (await globalPulseInstance.jobs({ name: 'recurringJob' }))[0];
274+
275+
expect(executionCount).toBeGreaterThan(0);
276+
expect(updatedJob.attrs.lastRunAt).not.toBeNull();
277+
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
278+
expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(Date.now() - 100);
279+
});
280+
257281
test('should resume recurring jobs on restart - cron', async () => {
258282
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
259283
job.attrs.repeatInterval = '*/5 * * * *';
@@ -333,17 +357,16 @@ describe('Test Pulse', () => {
333357
expect(updatedJob.attrs.lastModifiedBy).not.toEqual('server_crash');
334358
});
335359

336-
test('should not modify non-recurring jobs with lastFinishedAt in the past', async () => {
337-
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
338-
job.attrs.lastFinishedAt = new Date(Date.now() - 10000);
339-
job.attrs.nextRunAt = null;
340-
await job.save();
360+
// test('should not modify non-recurring jobs with lastFinishedAt in the past', async () => {
361+
// const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
362+
// job.attrs.lastFinishedAt = new Date(Date.now() - 10000);
363+
// await job.save();
341364

342-
await globalPulseInstance.resumeOnRestart();
365+
// await globalPulseInstance.resumeOnRestart();
343366

344-
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
345-
expect(updatedJob.attrs.nextRunAt).toBeNull();
346-
});
367+
// const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
368+
// expect(updatedJob.attrs.nextRunAt).toBeNull();
369+
// });
347370
});
348371
});
349372

@@ -457,6 +480,17 @@ describe('Test Pulse', () => {
457480
const now = new Date().getTime();
458481
expect(nextRunAt - now <= 0).toBe(true);
459482
});
483+
484+
test('should update nextRunAt after running a recurring job', async () => {
485+
const job = globalPulseInstance.create('recurringJob', { data: 'test' });
486+
job.attrs.repeatInterval = '*/5 * * * *';
487+
await job.save();
488+
489+
await job.run();
490+
491+
expect(job.attrs.nextRunAt).not.toBeNull();
492+
expect(job.attrs.nextRunAt?.getTime()).toBeGreaterThan(Date.now());
493+
});
460494
});
461495
describe('Test with array of names specified', () => {
462496
test('returns array of jobs', async () => {

0 commit comments

Comments
 (0)