Skip to content

Commit 0621bc6

Browse files
committed
fix: don't wait if reclaimed task > 0
1 parent 0732006 commit 0621bc6

File tree

1 file changed

+5
-5
lines changed

1 file changed

+5
-5
lines changed

src/api.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ export class Api {
280280
})
281281
}
282282
tasks.length > 0 && logWorker('Accepted tasks ', { tasks })
283+
let reclaimCounts = 0
283284
await promise.all(tasks.map(async task => {
284285
const streamlen = await this.redis.xLen(task.stream)
285286
if (streamlen === 0) {
@@ -289,6 +290,7 @@ export class Api {
289290
.exec()
290291
logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream })
291292
} else {
293+
reclaimCounts++
292294
const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix)
293295
const { ydoc, storeReferences, redisLastId } = await this.getDoc(room, docid)
294296
const lastId = math.max(number.parseInt(redisLastId.split('-')[0]), number.parseInt(task.id.split('-')[0]))
@@ -321,7 +323,7 @@ export class Api {
321323
ydoc.destroy()
322324
}
323325
}))
324-
return tasks
326+
return { tasks, reclaimCounts }
325327
}
326328

327329
async destroy () {
@@ -353,12 +355,10 @@ export class Worker {
353355
this.client = client
354356
logWorker('Created worker process ', { id: client.consumername, prefix: client.prefix, minMessageLifetime: client.redisMinMessageLifetime })
355357
;(async () => {
356-
const startRedisTime = await client.redis.time()
357-
const timeDiff = startRedisTime.getTime() - time.getUnixTime()
358358
while (!client._destroyed) {
359359
try {
360-
const tasks = await client.consumeWorkerQueue(opts)
361-
if (tasks.length === 0 || (client.redisMinMessageLifetime > time.getUnixTime() + timeDiff - number.parseInt(tasks[0].id.split('-')[0]))) {
360+
const { reclaimCounts } = await client.consumeWorkerQueue(opts)
361+
if (reclaimCounts === 0) {
362362
await promise.wait(client.redisWorkerTimeout)
363363
}
364364
} catch (e) {

0 commit comments

Comments
 (0)