Skip to content
This repository was archived by the owner on Oct 27, 2025. It is now read-only.

Commit 24af00a

Browse files
author
-
committed
Fix the resumeOnRestart function to handle correctly also the recurring jobs + add unit tests
1 parent b04171d commit 24af00a

File tree

2 files changed

+83
-5
lines changed

2 files changed

+83
-5
lines changed

src/pulse/resume-on-restart.ts

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import createDebugger from 'debug';
22
import { Pulse } from '.';
3+
import { Job } from '../job';
34

45
const debug = createDebugger('pulse:resumeOnRestart');
56

@@ -18,6 +19,8 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res
1819

1920
if (this._collection && this._resumeOnRestart) {
2021
const now = new Date();
22+
23+
// Non-recurring jobs
2124
this._collection
2225
.updateMany(
2326
{
@@ -41,7 +44,48 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res
4144
)
4245
.then((result) => {
4346
if (result.modifiedCount > 0) {
44-
debug('resuming unfinished %d jobs(%s)', result.modifiedCount, now.toISOString());
47+
debug('Resumed %d unfinished standard jobs (%s)', result.modifiedCount, now.toISOString());
48+
}
49+
});
50+
51+
// Handling for recurring jobs using repeatInterval or repeatAt
52+
this._collection
53+
.find({
54+
$or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }],
55+
nextRunAt: { $exists: false },
56+
})
57+
.toArray()
58+
.then((jobs) => {
59+
const updates = jobs.map((jobData) => {
60+
const job = new Job({
61+
pulse: this,
62+
name: jobData.name || '',
63+
data: jobData.data || {},
64+
type: jobData.type || 'normal',
65+
priority: jobData.priority || 'normal',
66+
shouldSaveResult: jobData.shouldSaveResult || false,
67+
attempts: jobData.attempts || 0,
68+
backoff: jobData.backoff,
69+
...jobData,
70+
});
71+
72+
job.computeNextRunAt();
73+
74+
return this._collection.updateOne(
75+
{ _id: job.attrs._id },
76+
{
77+
$set: { nextRunAt: job.attrs.nextRunAt },
78+
$unset: { lockedAt: undefined, lastModifiedBy: undefined, lastRunAt: undefined },
79+
}
80+
);
81+
});
82+
83+
return Promise.all(updates);
84+
})
85+
.then((results) => {
86+
const modifiedCount = results.filter((res) => res.modifiedCount > 0).length;
87+
if (modifiedCount > 0) {
88+
debug('Resumed %d recurring jobs (%s)', modifiedCount, now.toISOString());
4589
}
4690
});
4791
}

test/unit/pulse.spec.ts

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,17 +206,51 @@ describe('Test Pulse', () => {
206206
});
207207

208208
describe('Test resumeOnRestart', () => {
209-
test('sets the default resumeOnRestart', () => {
209+
test('should enable resumeOnRestart by default', () => {
210210
expect(globalPulseInstance._resumeOnRestart).toBeTruthy();
211211
});
212212

213-
test('sets the custom resumeOnRestart', () => {
213+
test('should disable resumeOnRestart when set to false', () => {
214214
globalPulseInstance.resumeOnRestart(false);
215215
expect(globalPulseInstance._resumeOnRestart).toBeFalsy();
216216
});
217217

218-
test('returns itself', () => {
219-
expect(globalPulseInstance.resumeOnRestart(false)).toEqual(globalPulseInstance);
218+
test('should resume non-recurring jobs on restart', async () => {
219+
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
220+
job.attrs.nextRunAt = new Date(Date.now() - 1000); // Set nextRunAt in the past
221+
await job.save();
222+
223+
await globalPulseInstance.resumeOnRestart();
224+
225+
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
226+
const now = Date.now();
227+
expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(now - 100); // Allow a 100ms buffer
228+
});
229+
230+
test('should resume recurring jobs on restart', async () => {
231+
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
232+
job.attrs.repeatInterval = '5 minutes';
233+
job.attrs.nextRunAt = null;
234+
await job.save();
235+
236+
await globalPulseInstance.resumeOnRestart();
237+
238+
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
239+
const now = Date.now();
240+
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
241+
expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(now - 100); // Allow a 100ms buffer
242+
});
243+
244+
test('should not modify jobs with existing nextRunAt', async () => {
245+
const futureDate = new Date(Date.now() + 60 * 60 * 1000);
246+
const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' });
247+
job.attrs.nextRunAt = futureDate;
248+
await job.save();
249+
250+
await globalPulseInstance.resumeOnRestart();
251+
252+
const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
253+
expect(updatedJob.attrs.nextRunAt?.getTime()).toEqual(futureDate.getTime());
220254
});
221255
});
222256
});

0 commit comments

Comments
 (0)