1- import {
2- BeforeApplicationShutdown ,
3- Injectable ,
4- Logger ,
5- OnApplicationBootstrap ,
6- } from "@nestjs/common" ;
7- import { ModuleRef } from "@nestjs/core" ;
1+ import { BeforeApplicationShutdown , Injectable , Logger , OnApplicationBootstrap } from '@nestjs/common' ;
2+ import { ModuleRef } from '@nestjs/core' ;
83import Pulse , {
94 PulseConfig ,
105 Job ,
116 Processor ,
127 JobAttributes ,
13- } from "@pulsecron/pulse" ;
14- import { NO_QUEUE_FOUND } from "../pulse.messages" ;
15- import {
16- PulseModuleJobOptions ,
17- NonRepeatableJobOptions ,
18- RepeatableJobOptions ,
19- } from "../decorators" ;
20- import { JobProcessorType } from "../enums" ;
21- import { PulseQueueConfig } from "../interfaces" ;
22- import { DatabaseService } from "./database.service" ;
8+ PulseOnEventType ,
9+ JobAttributesData ,
10+ } from '@pulsecron/pulse' ;
11+ import { NO_QUEUE_FOUND } from '../pulse.messages' ;
12+ import { PulseModuleJobOptions , NonRepeatableJobOptions , RepeatableJobOptions } from '../decorators' ;
13+ import { JobProcessorType } from '../enums' ;
14+ import { PulseQueueConfig } from '../interfaces' ;
15+ import { DatabaseService } from './database.service' ;
2316
2417type JobProcessorConfig = {
25- handler : Processor < JobAttributes > ;
18+ handler : Processor < JobAttributesData > ;
2619 type : JobProcessorType ;
2720 options : RepeatableJobOptions | NonRepeatableJobOptions ;
2821 useCallback : boolean ;
@@ -38,17 +31,12 @@ type QueueRegistry = {
3831} ;
3932
4033@Injectable ( )
41- export class PulseOrchestrator
42- implements OnApplicationBootstrap , BeforeApplicationShutdown
43- {
44- private readonly logger = new Logger ( "Pulse" ) ;
34+ export class PulseOrchestrator implements OnApplicationBootstrap , BeforeApplicationShutdown {
35+ private readonly logger = new Logger ( 'Pulse' ) ;
4536
4637 private readonly queues : Map < string , QueueRegistry > = new Map ( ) ;
4738
48- constructor (
49- private readonly moduleRef : ModuleRef ,
50- private readonly database : DatabaseService
51- ) { }
39+ constructor ( private readonly moduleRef : ModuleRef , private readonly database : DatabaseService ) { }
5240
5341 async onApplicationBootstrap ( ) {
5442 await this . database . connect ( ) ;
@@ -60,10 +48,7 @@ export class PulseOrchestrator
6048
6149 this . attachEventListeners ( queue , registry ) ;
6250
63- queue . mongo (
64- this . database . getConnection ( ) ,
65- config . collection || queueToken
66- ) ;
51+ queue . mongo ( this . database . getConnection ( ) , config . collection || queueToken ) ;
6752
6853 if ( config . autoStart ) {
6954 await queue . start ( ) ;
@@ -99,7 +84,7 @@ export class PulseOrchestrator
9984
10085 addJobProcessor (
10186 queueToken : string ,
102- processor : Processor < JobAttributes > & Record < " _name" , string > ,
87+ processor : Processor < JobAttributesData > & Record < ' _name' , string > ,
10388 options : PulseModuleJobOptions ,
10489 type : JobProcessorType ,
10590 useCallback : boolean
@@ -114,39 +99,28 @@ export class PulseOrchestrator
11499 } ) ;
115100 }
116101
117- addEventListener (
118- queueToken : string ,
119- listener : EventListener ,
120- eventName : string ,
121- jobName ?: string
122- ) {
102+ addEventListener ( queueToken : string , listener : EventListener , eventName : PulseOnEventType , jobName ?: string ) {
123103 const key = jobName ? `${ eventName } :${ jobName } ` : eventName ;
124104
125105 this . queues . get ( queueToken ) ?. listeners . set ( key , listener ) ;
126106 }
127107
128108 private attachEventListeners ( pulse : Pulse , registry : QueueRegistry ) {
129109 registry . listeners . forEach ( ( listener : EventListener , eventName : string ) => {
130- pulse . on ( eventName , listener ) ;
110+ pulse . on ( eventName as PulseOnEventType , listener ) ;
131111 } ) ;
132112 }
133113
134114 private defineJobProcessors ( pulse : Pulse , registry : QueueRegistry ) {
135- registry . processors . forEach (
136- ( jobConfig : JobProcessorConfig , jobName : string ) => {
137- const { options, handler, useCallback } = jobConfig ;
138-
139- if ( useCallback ) {
140- pulse . define (
141- jobName ,
142- ( job : Job , done ?: ( ) => void ) => handler ( job , done ) ,
143- options
144- ) ;
145- } else {
146- pulse . define ( jobName , handler , options ) ;
147- }
115+ registry . processors . forEach ( ( jobConfig : JobProcessorConfig , jobName : string ) => {
116+ const { options, handler, useCallback } = jobConfig ;
117+
118+ if ( useCallback ) {
119+ pulse . define ( jobName , ( job : Job , done : ( ) => void = ( ) => { } ) => handler ( job , done ) , options ) ;
120+ } else {
121+ pulse . define ( jobName , handler , options ) ;
148122 }
149- ) ;
123+ } ) ;
150124 }
151125
152126 private async scheduleJobs ( pulse : Pulse , registry : QueueRegistry ) {
@@ -156,18 +130,9 @@ export class PulseOrchestrator
156130 const { type, options } = jobConfig ;
157131
158132 if ( type === JobProcessorType . EVERY ) {
159- await pulse . every (
160- ( options as RepeatableJobOptions ) . interval ,
161- jobName ,
162- { } ,
163- options
164- ) ;
133+ await pulse . every ( ( options as RepeatableJobOptions ) . interval , jobName , { } , options ) ;
165134 } else if ( type === JobProcessorType . SCHEDULE ) {
166- await pulse . schedule (
167- ( options as NonRepeatableJobOptions ) . when ,
168- jobName ,
169- { }
170- ) ;
135+ await pulse . schedule ( ( options as NonRepeatableJobOptions ) . when , jobName , { } ) ;
171136 } else if ( type === JobProcessorType . NOW ) {
172137 await pulse . now ( jobName , { } ) ;
173138 }
0 commit comments