Skip to content
This repository was archived by the owner on Oct 27, 2025. It is now read-only.
This repository was archived by the owner on Oct 27, 2025. It is now read-only.

Pulse re-runs finished jobs after a process restart #57

@SergChr

Description

@SergChr

Description

I have a Pulse job that's scheduled to run every midnight on week days (repeatInterval: "0 0 * * 1-5"). When it's finished, Pulse leaves lockedAt: null and lastRunAt with the last run date in the job document (inside pulseJobs MongoDB collection).

Once I restart a Node.js process, Pulse picks up this job again no matter the actual time (the job should run only at midnight). And it happens with every restart, even though the job gets completed, it's not interrupted.

Code example

Pulse client configuration

const config: PulseConfig = {
    processEvery: '30 seconds',
    resumeOnRestart: true,
    defaultLockLimit: 1,
    defaultLockLifetime: 180_000,
};

export const initPulseClient = (db: Db): void => {
    pulse = new Pulse({
        mongo: db,
        ...config,
    });
};

The job configuration

pulse.define(
        'MY_PULSE_JOB',
        async (_job, done) => {
            try {
                await myPulseJob(); // could be any function
                done();
            } catch (error) {
                const msg = 'Failed during publishing signals';
                logger.error(error, msg);
                done(new Error(msg));
            }
        },
        {
            concurrency: 1,
            shouldSaveResult: true,
            lockLifetime: 180_000,
            attempts: 3,
            backoff: {
                type: 'exponential',
                delay: 3_000,
            },
        },
    );

pulse.every('0 0 * * 1-5', 'MY_PULSE_JOB');

Pulse debug logs

pulse:db_init init database collection using name [pulseJobs] +0ms
  pulse:resumeOnRestart Pulse.resumeOnRestart() +0ms
  pulse:db_init attempting index creation +1ms
  pulse:define job [MY_PULSE_JOB] defined with following options: 
  pulse:define {
  pulse:define   fn: [AsyncFunction (anonymous)],
  pulse:define   concurrency: 1,
  pulse:define   lockLimit: 1,
  pulse:define   priority: 0,
  pulse:define   lockLifetime: 180000,
  pulse:define   running: 0,
  pulse:define   locked: 0,
  pulse:define   shouldSaveResult: true,
  pulse:define   attempts: 3,
  pulse:define   backoff: { type: 'exponential', delay: 3000 }
  pulse:define } +0ms
  pulse:resumeOnRestart resuming unfinished 1 jobs(2024-10-07T11:47:13.294Z) +98ms
  pulse:db_init index creation success +575ms
  pulse:start Pulse.start called, creating interval to call processJobs every [30000ms] +0ms
  pulse:every Pulse.every(0 0 * * 1-5, 'MY_PULSE_JOB', undefined) +0ms
  pulse:create Pulse.create(MY_PULSE_JOB, [Object]) +0ms
  pulse:job [MY_PULSE_JOB:undefined] computing next run via interval [0 0 * * 1-5] +0ms
  pulse:job [MY_PULSE_JOB:undefined] nextRunAt set to [2024-10-07T20:00:00.000Z] +9ms
  pulse:saveJob attempting to save a job into Pulse instance +0ms
  pulse:saveJob [job undefined] set job props: 
  pulse:saveJob {
  pulse:saveJob   name: 'MY_PULSE_JOB',
  pulse:saveJob   data: {},
  pulse:saveJob   priority: 0,
  pulse:saveJob   shouldSaveResult: true,
  pulse:saveJob   attempts: 3,
  pulse:saveJob   backoff: { type: 'exponential', delay: 3000 },
  pulse:saveJob   type: 'single',
  pulse:saveJob   nextRunAt: 2024-10-07T20:00:00.000Z,
  pulse:saveJob   repeatInterval: '0 0 * * 1-5',
  pulse:saveJob   repeatTimezone: null,
  pulse:saveJob   startDate: null,
  pulse:saveJob   endDate: null,
  pulse:saveJob   skipDays: null,
  pulse:saveJob   lastModifiedBy: undefined
  pulse:saveJob } +1ms
  pulse:saveJob current time stored as 2024-10-07T11:47:13.882Z +0ms
  pulse:saveJob job with type of "single" found +0ms
  pulse:saveJob calling findOneAndUpdate() with job name and type of "single" as query +0ms
  pulse:internal:processJobs starting to process jobs: [unknownName:unknownId] +0ms
  pulse:internal:processJobs queuing up job to process: [MY_PULSE_JOB] +0ms
  pulse:internal:processJobs jobQueueFilling: MY_PULSE_JOB isJobQueueFilling: false +0ms
  pulse:internal:processJobs job [MY_PULSE_JOB] lock status: shouldLock = true +0ms
  pulse:internal:_findAndLockNextJob _findAndLockNextJob(MY_PULSE_JOB, [Function]) +0ms
  pulse:internal:_findAndLockNextJob found a job available to lock, creating a new job on Pulse with id [6703c2b8f3d5afdf65e69436] +76ms
  pulse:internal:processJobs job [MY_PULSE_JOB] lock status: shouldLock = true +78ms
  pulse:internal:processJobs [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] job locked while filling queue +0ms
  pulse:internal:processJobs jobQueueFilling: MY_PULSE_JOB isJobQueueFilling: true +0ms
  pulse:internal:processJobs job [MY_PULSE_JOB] lock status: shouldLock = false +0ms
  pulse:internal:processJobs lock limit reached in queue filling for [MY_PULSE_JOB] +0ms
  pulse:internal:processJobs [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] about to process job +0ms
  pulse:internal:processJobs [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] nextRunAt is in the past, run the job immediately +0ms

# Custom logs to see the `job` object
# src/utils/process-jobs.ts:246
# {
#  'job.attrs.nextRunAt': 2024-10-07T11:47:13.294Z,
#  now: 2024-10-07T11:47:13.961Z
# }
# ^ at this point, Pulse overrides `nextRunAt`, so it replaces the midnight rule to `now` 

# job.attrs
# {
#     _id: new ObjectId('6703c2b8f3d5afdf65e69436'),
#     name: 'MY_PULSE_JOB',
#     attempts: 3,
#     backoff: { type: 'exponential', delay: 3000 },
#     data: {},
#     endDate: null,
#     priority: 0,
#     repeatInterval: '0 0 * * 1-5',
#     repeatTimezone: null,
#     shouldSaveResult: true,
#     skipDays: null,
#     startDate: null,
#     finishedCount: 4,
#     lastFinishedAt: 2024-10-07T11:46:41.174Z,
#     runCount: 4,
#     lockedAt: 2024-10-07T11:47:13.884Z,
#     type: 'single',
#     nextRunAt: 2024-10-07T11:47:13.294Z
#   }
  pulse:internal:processJobs [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] processing job +2ms
  pulse:job [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] setting lastRunAt to: 2024-10-07T11:47:13.964Z +0ms
  pulse:job [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] computing next run via interval [0 0 * * 1-5] +83ms
  pulse:job [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] nextRunAt set to [2024-10-07T20:00:00.000Z] +1ms
  pulse:saveJob attempting to save a job into Pulse instance +83ms
  pulse:saveJob [job 6703c2b8f3d5afdf65e69436] set job props: 
  pulse:saveJob {
  pulse:saveJob   name: 'MY_PULSE_JOB',
  pulse:saveJob   attempts: 3,
  pulse:saveJob   backoff: { type: 'exponential', delay: 3000 },
  pulse:saveJob   data: {},
  pulse:saveJob   endDate: null,
  pulse:saveJob   priority: 0,
  pulse:saveJob   repeatInterval: '0 0 * * 1-5',
  pulse:saveJob   repeatTimezone: null,
  pulse:saveJob   shouldSaveResult: true,
  pulse:saveJob   skipDays: null,
  pulse:saveJob   startDate: null,
  pulse:saveJob   finishedCount: 4,
  pulse:saveJob   lastFinishedAt: 2024-10-07T11:46:41.174Z,
  pulse:saveJob   runCount: 4,
  pulse:saveJob   lockedAt: 2024-10-07T11:47:13.884Z,
  pulse:saveJob   type: 'single',
  pulse:saveJob   nextRunAt: 2024-10-07T20:00:00.000Z,
  pulse:saveJob   lastRunAt: 2024-10-07T11:47:13.964Z,
  pulse:saveJob   lastModifiedBy: undefined
  pulse:saveJob } +0ms
  pulse:saveJob current time stored as 2024-10-07T11:47:13.965Z +0ms
  pulse:saveJob job already has _id, calling findOneAndUpdate() using _id as query +0ms
  pulse:saveJob processDbResult() called with success, checking whether to process job immediately or not +2ms
  pulse:saveJob processDbResult() called with success, checking whether to process job immediately or not +73ms
  pulse:job [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] starting job +76ms
  pulse:job [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] process function being called +0ms

Additional context

Node.js version: 20.12.2 (also happens on 22.x).
MongoDB version: 7.0.
MongoDB Node driver: 6.5.0.

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions