Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
/**
* Message throttler for controlling the speed of streaming updates.
* Non-React version that can be used in Zustand stores or other non-component code.
*
* Automatically speeds up after a configurable duration to drain buffers faster.
*/

const DEFAULT_THROTTLE_DELAY_MS = 25
const DEFAULT_FAST_DELAY_MS = 5
const DEFAULT_SPEEDUP_AFTER_MS = 5_000

export interface MessageThrottlerOptions<T> {
delayInMs?: number
fastDelayInMs?: number
speedupAfterMs?: number
onMessage: (message: T) => void
}

Expand All @@ -15,16 +21,42 @@ export class MessageThrottler<T> {
private timer: ReturnType<typeof setTimeout> | null = null
private isProcessing = false
private delayInMs: number
private fastDelayInMs: number
private speedupAfterMs: number
private startTime: number | null = null
private onMessage: (message: T) => void

constructor({
delayInMs = DEFAULT_THROTTLE_DELAY_MS,
fastDelayInMs = DEFAULT_FAST_DELAY_MS,
speedupAfterMs = DEFAULT_SPEEDUP_AFTER_MS,
onMessage,
}: MessageThrottlerOptions<T>) {
this.delayInMs = delayInMs
this.fastDelayInMs = fastDelayInMs
this.speedupAfterMs = speedupAfterMs
this.onMessage = onMessage
}

/**
* Start the speedup timer. Call this when the first meaningful content arrives.
*/
startSpeedupTimer(): void {
if (this.startTime === null) {
this.startTime = Date.now()
}
}

private getCurrentDelay(): number {
if (this.startTime === null) {
return this.delayInMs
}
const elapsed = Date.now() - this.startTime
return elapsed >= this.speedupAfterMs
? this.fastDelayInMs
: this.delayInMs
}

private processNext = () => {
if (this.queue.length === 0) {
this.isProcessing = false
Expand All @@ -36,7 +68,7 @@ export class MessageThrottler<T> {
this.onMessage(nextMessage)

if (this.queue.length > 0) {
this.timer = setTimeout(this.processNext, this.delayInMs)
this.timer = setTimeout(this.processNext, this.getCurrentDelay())
} else {
this.isProcessing = false
this.timer = null
Expand All @@ -47,7 +79,7 @@ export class MessageThrottler<T> {
this.queue.push(message)
if (!this.isProcessing) {
this.isProcessing = true
this.timer = setTimeout(this.processNext, this.delayInMs)
this.timer = setTimeout(this.processNext, this.getCurrentDelay())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ export interface ChatMessage {
}

const API_ENDPOINT = '/docs/_api/v1/ask-ai/stream'
const MESSAGE_THROTTLE_MS = 25

interface ActiveStream {
controller: AbortController
Expand Down Expand Up @@ -244,8 +243,8 @@ async function startStream(
const controller = new AbortController()

// Create a throttler for this stream to control update frequency
// Throttler automatically speeds up after 10 seconds to drain buffers faster
const throttler = new MessageThrottler<AskAiEvent>({
delayInMs: MESSAGE_THROTTLE_MS,
onMessage: (event) => handleStreamEvent(messageId, event),
})

Expand Down Expand Up @@ -368,6 +367,8 @@ function handleStreamEvent(messageId: string, event: AskAiEvent): void {
if (event.type === 'conversation_start' && event.conversationId) {
actions.setConversationId(event.conversationId)
} else if (event.type === 'message_chunk') {
// Start speedup timer on first content chunk
stream.throttler.startSpeedupTimer()
stream.content += event.content
actions.updateAiMessage(messageId, stream.content, 'streaming')
} else if (event.type === 'error') {
Expand Down
Loading