From e42fdcb93c40c4a98c31a1f285a190c120e4e71a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Victor=20Mu=C5=A1tar?= Date: Mon, 13 Oct 2025 22:59:01 +0200 Subject: [PATCH 1/3] Add eviction limits to AbortRegistry and AbortedGenerations Introduces maximum limits for tracked conversations and controllers in AbortRegistry, evicting the oldest entries when limits are reached. Also adds a cap to the AbortedGenerations cache, switching to a Map for storage and trimming oldest entries when the limit is hit. These changes help prevent unbounded memory growth in long-running processes. --- src/lib/server/abortRegistry.ts | 39 +++++++++++++++++++++++++++- src/lib/server/abortedGenerations.ts | 25 +++++++++++++----- 2 files changed, 57 insertions(+), 7 deletions(-) diff --git a/src/lib/server/abortRegistry.ts b/src/lib/server/abortRegistry.ts index fc6de8a4413..5eed5e5c9e3 100644 --- a/src/lib/server/abortRegistry.ts +++ b/src/lib/server/abortRegistry.ts @@ -1,5 +1,8 @@ import { logger } from "$lib/server/logger"; +const MAX_TRACKED_CONVERSATIONS = 1000; +const MAX_CONTROLLERS_PER_CONVERSATION = 16; + /** * Tracks active upstream generation requests so they can be cancelled on demand. * Multiple controllers can be registered per conversation (for threaded/background runs). @@ -21,9 +24,24 @@ export class AbortRegistry { let set = this.controllers.get(key); if (!set) { set = new Set(); - this.controllers.set(key, set); + } + if (set.size >= MAX_CONTROLLERS_PER_CONVERSATION) { + const oldestController = set.values().next().value as AbortController | undefined; + if (oldestController) { + if (!oldestController.signal.aborted) { + logger.warn( + { conversationId: key }, + "Evicting oldest AbortController after reaching per-conversation limit" + ); + oldestController.abort(); + } + set.delete(oldestController); + } } set.add(controller); + // Refresh insertion order for LRU-style eviction + this.controllers.delete(key); + this.controllers.set(key, set); controller.signal.addEventListener( "abort", () => { @@ -31,6 +49,25 @@ export class AbortRegistry { }, { once: true } ); + + if (this.controllers.size > MAX_TRACKED_CONVERSATIONS) { + const oldestKey = this.controllers.keys().next().value as string | undefined; + if (oldestKey) { + const controllers = this.controllers.get(oldestKey); + if (controllers) { + logger.warn( + { conversationId: oldestKey }, + "Evicting AbortRegistry entry after reaching capacity" + ); + for (const ctrl of controllers) { + if (!ctrl.signal.aborted) { + ctrl.abort(); + } + } + } + this.controllers.delete(oldestKey); + } + } } public abort(conversationId: string) { diff --git a/src/lib/server/abortedGenerations.ts b/src/lib/server/abortedGenerations.ts index 57b5f738b91..86c7edc2325 100644 --- a/src/lib/server/abortedGenerations.ts +++ b/src/lib/server/abortedGenerations.ts @@ -4,10 +4,12 @@ import { logger } from "$lib/server/logger"; import { collections } from "$lib/server/database"; import { onExit } from "./exitHandler"; +const MAX_ABORTED_GENERATIONS = 1000; + export class AbortedGenerations { private static instance: AbortedGenerations; - private abortedGenerations: Record = {}; + private abortedGenerations = new Map(); private constructor() { const interval = setInterval(() => this.updateList(), 1000); @@ -25,15 +27,26 @@ export class AbortedGenerations { } public getAbortTime(conversationId: string): Date | undefined { - return this.abortedGenerations[conversationId]; + return this.abortedGenerations.get(conversationId); } private async updateList() { try { - const aborts = await collections.abortedGenerations.find({}).sort({ createdAt: 1 }).toArray(); - - this.abortedGenerations = Object.fromEntries( - aborts.map((abort) => [abort.conversationId.toString(), abort.createdAt]) + const aborts = await collections.abortedGenerations + .find({}) + .sort({ createdAt: -1 }) + .limit(MAX_ABORTED_GENERATIONS) + .toArray(); + + if (aborts.length === MAX_ABORTED_GENERATIONS) { + logger.debug( + { count: aborts.length }, + "AbortedGenerations cache reached configured capacity; trimming oldest entries" + ); + } + + this.abortedGenerations = new Map( + aborts.reverse().map((abort) => [abort.conversationId.toString(), abort.createdAt] as const) ); } catch (err) { logger.error(err); From ca1bffb527a3e249daf7d1e96fcae1d0b7b0eea0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Victor=20Mu=C5=A1tar?= Date: Mon, 13 Oct 2025 22:59:30 +0200 Subject: [PATCH 2/3] Improve file stream cleanup in upload and download Enhances resource management by ensuring file streams are properly destroyed after use in both downloadFile and uploadFile. This prevents potential memory leaks and ensures timeouts and errors are handled more robustly during file operations. --- src/lib/server/files/downloadFile.ts | 28 +++++++++++++++++++++------- src/lib/server/files/uploadFile.ts | 23 ++++++++++++++++++----- 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/src/lib/server/files/downloadFile.ts b/src/lib/server/files/downloadFile.ts index d289fc10c85..e0b14c1a71f 100644 --- a/src/lib/server/files/downloadFile.ts +++ b/src/lib/server/files/downloadFile.ts @@ -23,12 +23,26 @@ export async function downloadFile( const fileStream = collections.bucket.openDownloadStream(file._id); - const buffer = await new Promise((resolve, reject) => { - const chunks: Uint8Array[] = []; - fileStream.on("data", (chunk) => chunks.push(chunk)); - fileStream.on("error", reject); - fileStream.on("end", () => resolve(Buffer.concat(chunks))); - }); + try { + const buffer = await new Promise((resolve, reject) => { + const chunks: Uint8Array[] = []; + const onData = (chunk: Uint8Array) => chunks.push(chunk); + const onError = (err: unknown) => { + fileStream.removeListener("data", onData); + reject(err instanceof Error ? err : new Error("Failed to read file stream")); + }; + const onEnd = () => { + fileStream.removeListener("data", onData); + resolve(Buffer.concat(chunks)); + }; - return { type: "base64", name, value: buffer.toString("base64"), mime }; + fileStream.on("data", onData); + fileStream.once("error", onError); + fileStream.once("end", onEnd); + }); + + return { type: "base64", name, value: buffer.toString("base64"), mime }; + } finally { + fileStream.destroy(); + } } diff --git a/src/lib/server/files/uploadFile.ts b/src/lib/server/files/uploadFile.ts index 97b335beaf0..15af68b4836 100644 --- a/src/lib/server/files/uploadFile.ts +++ b/src/lib/server/files/uploadFile.ts @@ -20,10 +20,23 @@ export async function uploadFile(file: File, conv: Conversation): Promise { - upload.once("finish", () => - resolve({ type: "hash", value: sha, mime: file.type, name: file.name }) - ); - upload.once("error", reject); - setTimeout(() => reject(new Error("Upload timed out")), 20_000); + const timeout = setTimeout(() => { + upload.destroy(new Error("Upload timed out")); + }, 20_000); + + const resolveOnce = () => { + clearTimeout(timeout); + resolve({ type: "hash", value: sha, mime: file.type, name: file.name }); + }; + const rejectOnce = (err: unknown) => { + clearTimeout(timeout); + if (typeof upload.destroy === "function" && !upload.destroyed) { + upload.destroy(); + } + reject(err); + }; + + upload.once("finish", resolveOnce); + upload.once("error", rejectOnce); }); } From 5f9b4f8f3b0fd73c92b626708cd16887bc4aa4ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Victor=20Mu=C5=A1tar?= Date: Mon, 13 Oct 2025 23:00:04 +0200 Subject: [PATCH 3/3] Optimize conversation queries with aggregation pipelines Refactored conversation and misc routes to use MongoDB aggregation pipelines for counting and paginating conversations, and for efficiently counting assistant messages. This reduces the number of database queries and improves performance, especially for endpoints that require both paginated data and total counts. --- .../server/api/routes/groups/conversations.ts | 42 +++++++++++----- src/lib/server/api/routes/groups/misc.ts | 48 ++++++++++++------- 2 files changed, 60 insertions(+), 30 deletions(-) diff --git a/src/lib/server/api/routes/groups/conversations.ts b/src/lib/server/api/routes/groups/conversations.ts index 0663110e0ce..96e2d860016 100644 --- a/src/lib/server/api/routes/groups/conversations.ts +++ b/src/lib/server/api/routes/groups/conversations.ts @@ -23,21 +23,37 @@ export const conversationGroup = new Elysia().use(authPlugin).group("/conversati .get( "", async ({ locals, query }) => { - const convs = await collections.conversations - .find(authCondition(locals)) - .project>({ - title: 1, - updatedAt: 1, - model: 1, - }) - .sort({ updatedAt: -1 }) - .skip((query.p ?? 0) * CONV_NUM_PER_PAGE) - .limit(CONV_NUM_PER_PAGE) + const page = query.p ?? 0; + const skip = page * CONV_NUM_PER_PAGE; + const [aggregated] = await collections.conversations + .aggregate<{ + conversations: Pick[]; + totals: { count: number }[]; + }>([ + { $match: authCondition(locals) }, + { + $facet: { + conversations: [ + { $sort: { updatedAt: -1 } }, + { $skip: skip }, + { $limit: CONV_NUM_PER_PAGE }, + { + $project: { + _id: 1, + title: 1, + updatedAt: 1, + model: 1, + }, + }, + ], + totals: [{ $count: "count" }], + }, + }, + ]) .toArray(); - const nConversations = await collections.conversations.countDocuments( - authCondition(locals) - ); + const convs = aggregated?.conversations ?? []; + const nConversations = aggregated?.totals?.[0]?.count ?? 0; const res = convs.map((conv) => ({ _id: conv._id, diff --git a/src/lib/server/api/routes/groups/misc.ts b/src/lib/server/api/routes/groups/misc.ts index 7ddc05efc87..c72cb666fb9 100644 --- a/src/lib/server/api/routes/groups/misc.ts +++ b/src/lib/server/api/routes/groups/misc.ts @@ -9,6 +9,7 @@ import yazl from "yazl"; import { downloadFile } from "$lib/server/files/downloadFile"; import mimeTypes from "mime-types"; import { logger } from "$lib/server/logger"; +import type { Document } from "mongodb"; export interface FeatureFlags { enableAssistants: boolean; @@ -28,7 +29,35 @@ export const misc = new Elysia() const messagesBeforeLogin = config.MESSAGES_BEFORE_LOGIN ? parseInt(config.MESSAGES_BEFORE_LOGIN) : 0; - const nConversations = await collections.conversations.countDocuments(authCondition(locals)); + let nConversations = 0; + let assistantMessages = 0; + const matchCondition = authCondition(locals); + + if (requiresUser) { + const facetPipelines: Record = { + conversationCount: [{ $count: "count" }], + }; + + if (!locals.user && messagesBeforeLogin > 0) { + facetPipelines.assistantMessages = [ + { $project: { messages: 1 } }, + { $unwind: "$messages" }, + { $match: { "messages.from": "assistant" } }, + { $limit: messagesBeforeLogin + 1 }, + { $count: "count" }, + ]; + } + + const aggregation = await collections.conversations + .aggregate<{ + conversationCount: { count: number }[]; + assistantMessages?: { count: number }[]; + }>([{ $match: matchCondition }, { $facet: facetPipelines }]) + .next(); + + nConversations = aggregation?.conversationCount?.[0]?.count ?? 0; + assistantMessages = aggregation?.assistantMessages?.[0]?.count ?? 0; + } if (requiresUser && !locals.user) { if (messagesBeforeLogin === 0) { @@ -36,22 +65,7 @@ export const misc = new Elysia() } else if (nConversations >= messagesBeforeLogin) { loginRequired = true; } else { - // get the number of messages where `from === "assistant"` across all conversations. - const totalMessages = - ( - await collections.conversations - .aggregate([ - { $match: { ...authCondition(locals), "messages.from": "assistant" } }, - { $project: { messages: 1 } }, - { $limit: messagesBeforeLogin + 1 }, - { $unwind: "$messages" }, - { $match: { "messages.from": "assistant" } }, - { $count: "messages" }, - ]) - .toArray() - )[0]?.messages ?? 0; - - loginRequired = totalMessages >= messagesBeforeLogin; + loginRequired = assistantMessages >= messagesBeforeLogin; } }