From 0621bc6bbd7eba5d4b3ed088520c10a78e98d6dc Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Thu, 16 Jan 2025 19:14:22 +0800 Subject: [PATCH 1/3] fix: don't wait if reclaimed task > 0 --- src/api.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/api.js b/src/api.js index aaffa7a..b07986d 100644 --- a/src/api.js +++ b/src/api.js @@ -280,6 +280,7 @@ export class Api { }) } tasks.length > 0 && logWorker('Accepted tasks ', { tasks }) + let reclaimCounts = 0 await promise.all(tasks.map(async task => { const streamlen = await this.redis.xLen(task.stream) if (streamlen === 0) { @@ -289,6 +290,7 @@ export class Api { .exec() logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream }) } else { + reclaimCounts++ const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix) const { ydoc, storeReferences, redisLastId } = await this.getDoc(room, docid) const lastId = math.max(number.parseInt(redisLastId.split('-')[0]), number.parseInt(task.id.split('-')[0])) @@ -321,7 +323,7 @@ export class Api { ydoc.destroy() } })) - return tasks + return { tasks, reclaimCounts } } async destroy () { @@ -353,12 +355,10 @@ export class Worker { this.client = client logWorker('Created worker process ', { id: client.consumername, prefix: client.prefix, minMessageLifetime: client.redisMinMessageLifetime }) ;(async () => { - const startRedisTime = await client.redis.time() - const timeDiff = startRedisTime.getTime() - time.getUnixTime() while (!client._destroyed) { try { - const tasks = await client.consumeWorkerQueue(opts) - if (tasks.length === 0 || (client.redisMinMessageLifetime > time.getUnixTime() + timeDiff - number.parseInt(tasks[0].id.split('-')[0]))) { + const { reclaimCounts } = await client.consumeWorkerQueue(opts) + if (reclaimCounts === 0) { await promise.wait(client.redisWorkerTimeout) } } catch (e) { From 33f856a1b8767e13cbe0e6021727e769384e6866 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Thu, 16 Jan 2025 19:17:05 +0800 Subject: [PATCH 2/3] fix: remove unused --- src/api.js | 1 - 1 file changed, 1 deletion(-) diff --git a/src/api.js b/src/api.js index b07986d..87adc0b 100644 --- a/src/api.js +++ b/src/api.js @@ -11,7 +11,6 @@ import * as math from 'lib0/math' import * as protocol from './protocol.js' import * as env from 'lib0/environment' import * as logging from 'lib0/logging' -import * as time from 'lib0/time' const logWorker = logging.createModuleLogger('@y/redis/api/worker') // const logApi = logging.createModuleLogger('@y/redis/api') From 83d2126eaffa414d6b9d3464ddae3894441e8ef2 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Thu, 16 Jan 2025 19:52:11 +0800 Subject: [PATCH 3/3] fix: worker should process at least wait time --- src/api.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/api.js b/src/api.js index 87adc0b..51efe3f 100644 --- a/src/api.js +++ b/src/api.js @@ -354,12 +354,17 @@ export class Worker { this.client = client logWorker('Created worker process ', { id: client.consumername, prefix: client.prefix, minMessageLifetime: client.redisMinMessageLifetime }) ;(async () => { + let prev = performance.now() while (!client._destroyed) { try { const { reclaimCounts } = await client.consumeWorkerQueue(opts) + const now = performance.now() if (reclaimCounts === 0) { await promise.wait(client.redisWorkerTimeout) + } else if (now - prev < client.redisWorkerTimeout) { + await promise.wait(client.redisWorkerTimeout - (now - prev)) } + prev = now } catch (e) { console.error(e) }