This repository was archived by the owner on Oct 27, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 29
This repository was archived by the owner on Oct 27, 2025. It is now read-only.
Pulse re-runs finished jobs after a process restart #57
Copy link
Copy link
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Description
I have a Pulse job that's scheduled to run every midnight on week days (repeatInterval: "0 0 * * 1-5"). When it's finished, Pulse leaves lockedAt: null and lastRunAt with the last run date in the job document (inside pulseJobs MongoDB collection).
Once I restart a Node.js process, Pulse picks up this job again no matter the actual time (the job should run only at midnight). And it happens with every restart, even though the job gets completed, it's not interrupted.
Code example
Pulse client configuration
const config: PulseConfig = {
processEvery: '30 seconds',
resumeOnRestart: true,
defaultLockLimit: 1,
defaultLockLifetime: 180_000,
};
export const initPulseClient = (db: Db): void => {
pulse = new Pulse({
mongo: db,
...config,
});
};The job configuration
pulse.define(
'MY_PULSE_JOB',
async (_job, done) => {
try {
await myPulseJob(); // could be any function
done();
} catch (error) {
const msg = 'Failed during publishing signals';
logger.error(error, msg);
done(new Error(msg));
}
},
{
concurrency: 1,
shouldSaveResult: true,
lockLifetime: 180_000,
attempts: 3,
backoff: {
type: 'exponential',
delay: 3_000,
},
},
);
pulse.every('0 0 * * 1-5', 'MY_PULSE_JOB');Pulse debug logs
pulse:db_init init database collection using name [pulseJobs] +0ms
pulse:resumeOnRestart Pulse.resumeOnRestart() +0ms
pulse:db_init attempting index creation +1ms
pulse:define job [MY_PULSE_JOB] defined with following options:
pulse:define {
pulse:define fn: [AsyncFunction (anonymous)],
pulse:define concurrency: 1,
pulse:define lockLimit: 1,
pulse:define priority: 0,
pulse:define lockLifetime: 180000,
pulse:define running: 0,
pulse:define locked: 0,
pulse:define shouldSaveResult: true,
pulse:define attempts: 3,
pulse:define backoff: { type: 'exponential', delay: 3000 }
pulse:define } +0ms
pulse:resumeOnRestart resuming unfinished 1 jobs(2024-10-07T11:47:13.294Z) +98ms
pulse:db_init index creation success +575ms
pulse:start Pulse.start called, creating interval to call processJobs every [30000ms] +0ms
pulse:every Pulse.every(0 0 * * 1-5, 'MY_PULSE_JOB', undefined) +0ms
pulse:create Pulse.create(MY_PULSE_JOB, [Object]) +0ms
pulse:job [MY_PULSE_JOB:undefined] computing next run via interval [0 0 * * 1-5] +0ms
pulse:job [MY_PULSE_JOB:undefined] nextRunAt set to [2024-10-07T20:00:00.000Z] +9ms
pulse:saveJob attempting to save a job into Pulse instance +0ms
pulse:saveJob [job undefined] set job props:
pulse:saveJob {
pulse:saveJob name: 'MY_PULSE_JOB',
pulse:saveJob data: {},
pulse:saveJob priority: 0,
pulse:saveJob shouldSaveResult: true,
pulse:saveJob attempts: 3,
pulse:saveJob backoff: { type: 'exponential', delay: 3000 },
pulse:saveJob type: 'single',
pulse:saveJob nextRunAt: 2024-10-07T20:00:00.000Z,
pulse:saveJob repeatInterval: '0 0 * * 1-5',
pulse:saveJob repeatTimezone: null,
pulse:saveJob startDate: null,
pulse:saveJob endDate: null,
pulse:saveJob skipDays: null,
pulse:saveJob lastModifiedBy: undefined
pulse:saveJob } +1ms
pulse:saveJob current time stored as 2024-10-07T11:47:13.882Z +0ms
pulse:saveJob job with type of "single" found +0ms
pulse:saveJob calling findOneAndUpdate() with job name and type of "single" as query +0ms
pulse:internal:processJobs starting to process jobs: [unknownName:unknownId] +0ms
pulse:internal:processJobs queuing up job to process: [MY_PULSE_JOB] +0ms
pulse:internal:processJobs jobQueueFilling: MY_PULSE_JOB isJobQueueFilling: false +0ms
pulse:internal:processJobs job [MY_PULSE_JOB] lock status: shouldLock = true +0ms
pulse:internal:_findAndLockNextJob _findAndLockNextJob(MY_PULSE_JOB, [Function]) +0ms
pulse:internal:_findAndLockNextJob found a job available to lock, creating a new job on Pulse with id [6703c2b8f3d5afdf65e69436] +76ms
pulse:internal:processJobs job [MY_PULSE_JOB] lock status: shouldLock = true +78ms
pulse:internal:processJobs [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] job locked while filling queue +0ms
pulse:internal:processJobs jobQueueFilling: MY_PULSE_JOB isJobQueueFilling: true +0ms
pulse:internal:processJobs job [MY_PULSE_JOB] lock status: shouldLock = false +0ms
pulse:internal:processJobs lock limit reached in queue filling for [MY_PULSE_JOB] +0ms
pulse:internal:processJobs [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] about to process job +0ms
pulse:internal:processJobs [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] nextRunAt is in the past, run the job immediately +0ms
# Custom logs to see the `job` object
# src/utils/process-jobs.ts:246
# {
# 'job.attrs.nextRunAt': 2024-10-07T11:47:13.294Z,
# now: 2024-10-07T11:47:13.961Z
# }
# ^ at this point, Pulse overrides `nextRunAt`, so it replaces the midnight rule to `now`
# job.attrs
# {
# _id: new ObjectId('6703c2b8f3d5afdf65e69436'),
# name: 'MY_PULSE_JOB',
# attempts: 3,
# backoff: { type: 'exponential', delay: 3000 },
# data: {},
# endDate: null,
# priority: 0,
# repeatInterval: '0 0 * * 1-5',
# repeatTimezone: null,
# shouldSaveResult: true,
# skipDays: null,
# startDate: null,
# finishedCount: 4,
# lastFinishedAt: 2024-10-07T11:46:41.174Z,
# runCount: 4,
# lockedAt: 2024-10-07T11:47:13.884Z,
# type: 'single',
# nextRunAt: 2024-10-07T11:47:13.294Z
# }
pulse:internal:processJobs [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] processing job +2ms
pulse:job [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] setting lastRunAt to: 2024-10-07T11:47:13.964Z +0ms
pulse:job [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] computing next run via interval [0 0 * * 1-5] +83ms
pulse:job [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] nextRunAt set to [2024-10-07T20:00:00.000Z] +1ms
pulse:saveJob attempting to save a job into Pulse instance +83ms
pulse:saveJob [job 6703c2b8f3d5afdf65e69436] set job props:
pulse:saveJob {
pulse:saveJob name: 'MY_PULSE_JOB',
pulse:saveJob attempts: 3,
pulse:saveJob backoff: { type: 'exponential', delay: 3000 },
pulse:saveJob data: {},
pulse:saveJob endDate: null,
pulse:saveJob priority: 0,
pulse:saveJob repeatInterval: '0 0 * * 1-5',
pulse:saveJob repeatTimezone: null,
pulse:saveJob shouldSaveResult: true,
pulse:saveJob skipDays: null,
pulse:saveJob startDate: null,
pulse:saveJob finishedCount: 4,
pulse:saveJob lastFinishedAt: 2024-10-07T11:46:41.174Z,
pulse:saveJob runCount: 4,
pulse:saveJob lockedAt: 2024-10-07T11:47:13.884Z,
pulse:saveJob type: 'single',
pulse:saveJob nextRunAt: 2024-10-07T20:00:00.000Z,
pulse:saveJob lastRunAt: 2024-10-07T11:47:13.964Z,
pulse:saveJob lastModifiedBy: undefined
pulse:saveJob } +0ms
pulse:saveJob current time stored as 2024-10-07T11:47:13.965Z +0ms
pulse:saveJob job already has _id, calling findOneAndUpdate() using _id as query +0ms
pulse:saveJob processDbResult() called with success, checking whether to process job immediately or not +2ms
pulse:saveJob processDbResult() called with success, checking whether to process job immediately or not +73ms
pulse:job [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] starting job +76ms
pulse:job [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] process function being called +0ms
Additional context
Node.js version: 20.12.2 (also happens on 22.x).
MongoDB version: 7.0.
MongoDB Node driver: 6.5.0.
juniorUsca
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working