diff --git a/internal-packages/tracing/src/index.ts b/internal-packages/tracing/src/index.ts index 8d01bdaed5..d679509188 100644 --- a/internal-packages/tracing/src/index.ts +++ b/internal-packages/tracing/src/index.ts @@ -25,7 +25,16 @@ export type { ObservableResult, } from "@opentelemetry/api"; -import { trace, context, propagation, SpanKind, metrics, ValueType } from "@opentelemetry/api"; +import { + trace, + context, + propagation, + SpanKind, + metrics, + ValueType, + ROOT_CONTEXT, +} from "@opentelemetry/api"; +import type { Context } from "@opentelemetry/api"; export { trace, context, @@ -36,6 +45,8 @@ export { SpanStatusCode, metrics, ValueType, + ROOT_CONTEXT, + type Context, }; export function getTracer(name: string): Tracer { diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index 8b47a79f34..d12185cdb7 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -1,5 +1,5 @@ import { createRedisClient, type Redis, type RedisOptions } from "@internal/redis"; -import { SpanKind } from "@internal/tracing"; +import { SpanKind, type Span } from "@internal/tracing"; import { Logger } from "@trigger.dev/core/logger"; import { nanoid } from "nanoid"; import { setInterval } from "node:timers/promises"; @@ -7,7 +7,12 @@ import { type z } from "zod"; import { ConcurrencyManager } from "./concurrency.js"; import { MasterQueue } from "./masterQueue.js"; import { type RetryStrategy, ExponentialBackoffRetry } from "./retry.js"; -import { FairQueueTelemetry, FairQueueAttributes, MessagingAttributes } from "./telemetry.js"; +import { + FairQueueTelemetry, + FairQueueAttributes, + MessagingAttributes, + BatchedSpanManager, +} from "./telemetry.js"; import type { ConcurrencyGroupConfig, DeadLetterMessage, @@ -89,6 +94,11 @@ export class FairQueue { // Global rate limiter private globalRateLimiter?: GlobalRateLimiter; + // Consumer tracing + private consumerTraceMaxIterations: number; + private consumerTraceTimeoutSeconds: number; + private batchedSpanManager: BatchedSpanManager; + // Runtime state private messageHandler?: MessageHandler>; private isRunning = false; @@ -136,6 +146,10 @@ export class FairQueue { // Global rate limiter this.globalRateLimiter = options.globalRateLimiter; + // Consumer tracing + this.consumerTraceMaxIterations = options.consumerTraceMaxIterations ?? 500; + this.consumerTraceTimeoutSeconds = options.consumerTraceTimeoutSeconds ?? 60; + // Initialize telemetry this.telemetry = new FairQueueTelemetry({ tracer: options.tracer, @@ -143,6 +157,14 @@ export class FairQueue { name: options.name ?? "fairqueue", }); + // Initialize batched span manager for consumer tracing + this.batchedSpanManager = new BatchedSpanManager({ + tracer: options.tracer, + name: options.name ?? "fairqueue", + maxIterations: this.consumerTraceMaxIterations, + timeoutSeconds: this.consumerTraceTimeoutSeconds, + }); + // Initialize components this.masterQueue = new MasterQueue({ redis: options.redis, @@ -653,6 +675,10 @@ export class FairQueue { */ async close(): Promise { await this.stop(); + + // Clean up any remaining batched spans + this.batchedSpanManager.cleanupAll(); + await Promise.all([ this.masterQueue.close(), this.concurrencyManager?.close(), @@ -703,56 +729,123 @@ export class FairQueue { async #runMasterQueueConsumerLoop(shardId: number): Promise { const loopId = `master-shard-${shardId}`; + // Initialize batched span tracking for this loop + this.batchedSpanManager.initializeLoop(loopId); + try { for await (const _ of setInterval(this.consumerIntervalMs, null, { signal: this.abortController.signal, })) { try { - await this.#processMasterQueueShard(loopId, shardId); + await this.batchedSpanManager.withBatchedSpan( + loopId, + async (span) => { + span.setAttribute("shard_id", shardId); + await this.#processMasterQueueShard(loopId, shardId, span); + }, + { + iterationSpanName: "processMasterQueueShard", + attributes: { shard_id: shardId }, + } + ); } catch (error) { this.logger.error("Master queue consumer error", { loopId, shardId, error: error instanceof Error ? error.message : String(error), }); + this.batchedSpanManager.markForRotation(loopId); } } } catch (error) { if (error instanceof Error && error.name === "AbortError") { this.logger.debug("Master queue consumer aborted", { loopId }); + this.batchedSpanManager.cleanup(loopId); return; } throw error; + } finally { + this.batchedSpanManager.cleanup(loopId); } } - async #processMasterQueueShard(loopId: string, shardId: number): Promise { + async #processMasterQueueShard( + loopId: string, + shardId: number, + parentSpan?: Span + ): Promise { const masterQueueKey = this.keys.masterQueueKey(shardId); // Create scheduler context - const context = this.#createSchedulerContext(); + const schedulerContext = this.#createSchedulerContext(); // Get queues to process from scheduler - const tenantQueues = await this.scheduler.selectQueues(masterQueueKey, loopId, context); + const tenantQueues = await this.telemetry.trace( + "selectQueues", + async (span) => { + span.setAttribute(FairQueueAttributes.SHARD_ID, shardId.toString()); + span.setAttribute(FairQueueAttributes.CONSUMER_ID, loopId); + const result = await this.scheduler.selectQueues(masterQueueKey, loopId, schedulerContext); + span.setAttribute("tenant_count", result.length); + span.setAttribute( + "queue_count", + result.reduce((acc, t) => acc + t.queues.length, 0) + ); + return result; + }, + { kind: SpanKind.INTERNAL } + ); if (tenantQueues.length === 0) { + this.batchedSpanManager.incrementStat(loopId, "empty_iterations"); return; } + // Track stats + this.batchedSpanManager.incrementStat(loopId, "tenants_selected", tenantQueues.length); + this.batchedSpanManager.incrementStat( + loopId, + "queues_selected", + tenantQueues.reduce((acc, t) => acc + t.queues.length, 0) + ); + // Process queues and push to worker queues for (const { tenantId, queues } of tenantQueues) { for (const queueId of queues) { // Check cooloff if (this.cooloffEnabled && this.#isInCooloff(queueId)) { + this.batchedSpanManager.incrementStat(loopId, "cooloff_skipped"); continue; } - const processed = await this.#claimAndPushToWorkerQueue(loopId, queueId, tenantId, shardId); + const processed = await this.telemetry.trace( + "claimAndPushToWorkerQueue", + async (span) => { + span.setAttribute(FairQueueAttributes.QUEUE_ID, queueId); + span.setAttribute(FairQueueAttributes.TENANT_ID, tenantId); + span.setAttribute(FairQueueAttributes.SHARD_ID, shardId.toString()); + return this.#claimAndPushToWorkerQueue(loopId, queueId, tenantId, shardId); + }, + { kind: SpanKind.INTERNAL } + ); if (processed) { - await this.scheduler.recordProcessed?.(tenantId, queueId); + this.batchedSpanManager.incrementStat(loopId, "messages_claimed"); + + if (this.scheduler.recordProcessed) { + await this.telemetry.trace( + "recordProcessed", + async (span) => { + span.setAttribute(FairQueueAttributes.QUEUE_ID, queueId); + span.setAttribute(FairQueueAttributes.TENANT_ID, tenantId); + await this.scheduler.recordProcessed!(tenantId, queueId); + }, + { kind: SpanKind.INTERNAL } + ); + } this.#resetCooloff(queueId); } else { + this.batchedSpanManager.incrementStat(loopId, "claim_failures"); this.#incrementCooloff(queueId); } } @@ -839,6 +932,9 @@ export class FairQueue { const loopId = `worker-${consumerId}`; const workerQueueId = loopId; // Each consumer has its own worker queue by default + // Initialize batched span tracking for this loop + this.batchedSpanManager.initializeLoop(loopId); + try { while (this.isRunning) { if (!this.messageHandler) { @@ -855,6 +951,7 @@ export class FairQueue { ); if (!messageKey) { + this.batchedSpanManager.incrementStat(loopId, "blocking_pop_timeouts"); continue; // Timeout, loop again } @@ -862,13 +959,31 @@ export class FairQueue { const colonIndex = messageKey.indexOf(":"); if (colonIndex === -1) { this.logger.error("Invalid message key format", { messageKey }); + this.batchedSpanManager.incrementStat(loopId, "invalid_message_keys"); continue; } const messageId = messageKey.substring(0, colonIndex); const queueId = messageKey.substring(colonIndex + 1); - await this.#processMessageFromWorkerQueue(loopId, messageId, queueId); + await this.batchedSpanManager.withBatchedSpan( + loopId, + async (span) => { + span.setAttribute("consumer_id", consumerId); + span.setAttribute(FairQueueAttributes.MESSAGE_ID, messageId); + span.setAttribute(FairQueueAttributes.QUEUE_ID, queueId); + await this.#processMessageFromWorkerQueue(loopId, messageId, queueId); + this.batchedSpanManager.incrementStat(loopId, "messages_processed"); + }, + { + iterationSpanName: "processMessageFromWorkerQueue", + attributes: { + consumer_id: consumerId, + [FairQueueAttributes.MESSAGE_ID]: messageId, + [FairQueueAttributes.QUEUE_ID]: queueId, + }, + } + ); } catch (error) { if (this.abortController.signal.aborted) { break; @@ -877,14 +992,18 @@ export class FairQueue { loopId, error: error instanceof Error ? error.message : String(error), }); + this.batchedSpanManager.markForRotation(loopId); } } } catch (error) { if (error instanceof Error && error.name === "AbortError") { this.logger.debug("Worker queue consumer aborted", { loopId }); + this.batchedSpanManager.cleanup(loopId); return; } throw error; + } finally { + this.batchedSpanManager.cleanup(loopId); } } @@ -921,6 +1040,9 @@ export class FairQueue { async #runDirectConsumerLoop(consumerId: number, shardId: number): Promise { const loopId = `consumer-${consumerId}-shard-${shardId}`; + // Initialize batched span tracking for this loop + this.batchedSpanManager.initializeLoop(loopId); + try { for await (const _ of setInterval(this.consumerIntervalMs, null, { signal: this.abortController.signal, @@ -930,36 +1052,74 @@ export class FairQueue { } try { - await this.#processDirectIteration(loopId, shardId); + await this.batchedSpanManager.withBatchedSpan( + loopId, + async (span) => { + span.setAttribute("consumer_id", consumerId); + span.setAttribute("shard_id", shardId); + await this.#processDirectIteration(loopId, shardId, span); + }, + { + iterationSpanName: "processDirectIteration", + attributes: { consumer_id: consumerId, shard_id: shardId }, + } + ); } catch (error) { this.logger.error("Direct consumer iteration error", { loopId, error: error instanceof Error ? error.message : String(error), }); + this.batchedSpanManager.markForRotation(loopId); } } } catch (error) { if (error instanceof Error && error.name === "AbortError") { this.logger.debug("Direct consumer loop aborted", { loopId }); + this.batchedSpanManager.cleanup(loopId); return; } throw error; + } finally { + this.batchedSpanManager.cleanup(loopId); } } - async #processDirectIteration(loopId: string, shardId: number): Promise { + async #processDirectIteration(loopId: string, shardId: number, parentSpan?: Span): Promise { const masterQueueKey = this.keys.masterQueueKey(shardId); // Create scheduler context - const context = this.#createSchedulerContext(); + const schedulerContext = this.#createSchedulerContext(); // Get queues to process from scheduler - const tenantQueues = await this.scheduler.selectQueues(masterQueueKey, loopId, context); + const tenantQueues = await this.telemetry.trace( + "selectQueues", + async (span) => { + span.setAttribute(FairQueueAttributes.SHARD_ID, shardId.toString()); + span.setAttribute(FairQueueAttributes.CONSUMER_ID, loopId); + const result = await this.scheduler.selectQueues(masterQueueKey, loopId, schedulerContext); + span.setAttribute("tenant_count", result.length); + span.setAttribute( + "queue_count", + result.reduce((acc, t) => acc + t.queues.length, 0) + ); + return result; + }, + { kind: SpanKind.INTERNAL } + ); if (tenantQueues.length === 0) { + this.batchedSpanManager.incrementStat(loopId, "empty_iterations"); return; } + // Track stats + this.batchedSpanManager.incrementStat(loopId, "tenants_selected", tenantQueues.length); + this.batchedSpanManager.incrementStat( + loopId, + "queues_selected", + tenantQueues.reduce((acc, t) => acc + t.queues.length, 0) + ); + // Process messages from each selected tenant // For fairness, process up to available concurrency slots per tenant for (const { tenantId, queues } of tenantQueues) { @@ -979,16 +1139,39 @@ export class FairQueue { while (slotsUsed < availableSlots) { // Check cooloff if (this.cooloffEnabled && this.#isInCooloff(queueId)) { + this.batchedSpanManager.incrementStat(loopId, "cooloff_skipped"); break; // Try next queue } - const processed = await this.#processOneMessage(loopId, queueId, tenantId, shardId); + const processed = await this.telemetry.trace( + "processOneMessage", + async (span) => { + span.setAttribute(FairQueueAttributes.QUEUE_ID, queueId); + span.setAttribute(FairQueueAttributes.TENANT_ID, tenantId); + span.setAttribute(FairQueueAttributes.SHARD_ID, shardId.toString()); + return this.#processOneMessage(loopId, queueId, tenantId, shardId); + }, + { kind: SpanKind.INTERNAL } + ); if (processed) { - await this.scheduler.recordProcessed?.(tenantId, queueId); + this.batchedSpanManager.incrementStat(loopId, "messages_processed"); + + if (this.scheduler.recordProcessed) { + await this.telemetry.trace( + "recordProcessed", + async (span) => { + span.setAttribute(FairQueueAttributes.QUEUE_ID, queueId); + span.setAttribute(FairQueueAttributes.TENANT_ID, tenantId); + await this.scheduler.recordProcessed!(tenantId, queueId); + }, + { kind: SpanKind.INTERNAL } + ); + } this.#resetCooloff(queueId); slotsUsed++; } else { + this.batchedSpanManager.incrementStat(loopId, "process_failures"); this.#incrementCooloff(queueId); break; // Queue empty or blocked, try next queue } diff --git a/packages/redis-worker/src/fair-queue/telemetry.ts b/packages/redis-worker/src/fair-queue/telemetry.ts index abf4f78a0e..5304f1699f 100644 --- a/packages/redis-worker/src/fair-queue/telemetry.ts +++ b/packages/redis-worker/src/fair-queue/telemetry.ts @@ -8,7 +8,9 @@ import type { SpanKind, SpanOptions, Tracer, + Context, } from "@internal/tracing"; +import { context, trace, SpanStatusCode, ROOT_CONTEXT } from "@internal/tracing"; /** * Semantic attributes for fair queue messaging operations. @@ -426,6 +428,286 @@ export class FairQueueTelemetry { } } +// ============================================================================ +// Batched Span Manager +// ============================================================================ + +/** + * State for tracking a consumer loop's batched span. + */ +export interface ConsumerLoopState { + /** Countdown of iterations before starting a new span */ + perTraceCountdown: number; + /** When the current trace started */ + traceStartedAt: Date; + /** The current batched span */ + currentSpan?: Span; + /** The context for the current batched span */ + currentSpanContext?: Context; + /** Number of iterations in the current span */ + iterationsCount: number; + /** Total iterations across all spans */ + totalIterationsCount: number; + /** Running duration in milliseconds for the current span */ + runningDurationInMs: number; + /** Stats counters for the current span */ + stats: Record; + /** Flag to force span end on next iteration */ + endSpanInNextIteration: boolean; +} + +/** + * Configuration for the BatchedSpanManager. + */ +export interface BatchedSpanManagerOptions { + /** The tracer to use for creating spans */ + tracer?: Tracer; + /** Name prefix for spans */ + name: string; + /** Maximum iterations before rotating the span */ + maxIterations: number; + /** Maximum seconds before rotating the span */ + timeoutSeconds: number; +} + +/** + * Manages batched spans for consumer loops. + * + * This allows multiple iterations to be grouped into a single parent span, + * reducing the volume of spans while maintaining observability. + */ +export class BatchedSpanManager { + private tracer?: Tracer; + private name: string; + private maxIterations: number; + private timeoutSeconds: number; + private loopStates = new Map(); + + constructor(options: BatchedSpanManagerOptions) { + this.tracer = options.tracer; + this.name = options.name; + this.maxIterations = options.maxIterations; + this.timeoutSeconds = options.timeoutSeconds; + } + + /** + * Initialize state for a consumer loop. + */ + initializeLoop(loopId: string): void { + this.loopStates.set(loopId, { + perTraceCountdown: this.maxIterations, + traceStartedAt: new Date(), + iterationsCount: 0, + totalIterationsCount: 0, + runningDurationInMs: 0, + stats: {}, + endSpanInNextIteration: false, + }); + } + + /** + * Get the state for a consumer loop. + */ + getState(loopId: string): ConsumerLoopState | undefined { + return this.loopStates.get(loopId); + } + + /** + * Increment a stat counter for a loop. + */ + incrementStat(loopId: string, statName: string, value: number = 1): void { + const state = this.loopStates.get(loopId); + if (state) { + state.stats[statName] = (state.stats[statName] ?? 0) + value; + } + } + + /** + * Mark that the span should end on the next iteration. + */ + markForRotation(loopId: string): void { + const state = this.loopStates.get(loopId); + if (state) { + state.endSpanInNextIteration = true; + } + } + + /** + * Check if the span should be rotated (ended and a new one started). + */ + shouldRotate(loopId: string): boolean { + const state = this.loopStates.get(loopId); + if (!state) return true; + + return ( + state.perTraceCountdown <= 0 || + Date.now() - state.traceStartedAt.getTime() > this.timeoutSeconds * 1000 || + state.currentSpanContext === undefined || + state.endSpanInNextIteration + ); + } + + /** + * End the current span for a loop and record stats. + */ + endCurrentSpan(loopId: string): void { + const state = this.loopStates.get(loopId); + if (!state?.currentSpan) return; + + // Record stats as span attributes + for (const [statName, count] of Object.entries(state.stats)) { + state.currentSpan.setAttribute(`stats.${statName}`, count); + } + + state.currentSpan.end(); + state.currentSpan = undefined; + state.currentSpanContext = undefined; + } + + /** + * Start a new batched span for a loop. + */ + startNewSpan(loopId: string, attributes?: Attributes): void { + if (!this.tracer) return; + + const state = this.loopStates.get(loopId); + if (!state) return; + + // End any existing span first + this.endCurrentSpan(loopId); + + // Calculate metrics from previous span period + const traceDurationInMs = state.traceStartedAt + ? Date.now() - state.traceStartedAt.getTime() + : undefined; + const iterationsPerSecond = + traceDurationInMs && traceDurationInMs > 0 + ? state.iterationsCount / (traceDurationInMs / 1000) + : undefined; + + // Start new span + state.currentSpan = this.tracer.startSpan( + `${this.name}.consumerLoop`, + { + kind: 1, // SpanKind.CONSUMER + attributes: { + loop_id: loopId, + max_iterations: this.maxIterations, + timeout_seconds: this.timeoutSeconds, + previous_iterations: state.iterationsCount, + previous_duration_ms: traceDurationInMs, + previous_iterations_per_second: iterationsPerSecond, + total_iterations: state.totalIterationsCount, + ...attributes, + }, + }, + ROOT_CONTEXT + ); + + // Set up context for child spans + state.currentSpanContext = trace.setSpan(ROOT_CONTEXT, state.currentSpan); + + // Reset counters + state.perTraceCountdown = this.maxIterations; + state.traceStartedAt = new Date(); + state.iterationsCount = 0; + state.runningDurationInMs = 0; + state.stats = {}; + state.endSpanInNextIteration = false; + } + + /** + * Execute a function within the batched span context. + * Automatically handles span rotation and iteration tracking. + */ + async withBatchedSpan( + loopId: string, + fn: (span: Span) => Promise, + options?: { + iterationSpanName?: string; + attributes?: Attributes; + } + ): Promise { + let state = this.loopStates.get(loopId); + + // Initialize state if not present + if (!state) { + this.initializeLoop(loopId); + state = this.loopStates.get(loopId)!; + } + + // Check if we need to rotate the span + if (this.shouldRotate(loopId)) { + this.startNewSpan(loopId); + } + + const startTime = performance.now(); + + try { + // If no tracer, just execute the function + if (!this.tracer || !state.currentSpanContext) { + return await fn(noopSpan); + } + + // Execute within the batched span context + return await context.with(state.currentSpanContext, async () => { + // Create an iteration span within the batched span + const iterationSpanName = options?.iterationSpanName ?? "iteration"; + + return await this.tracer!.startActiveSpan( + `${this.name}.${iterationSpanName}`, + { + attributes: { + loop_id: loopId, + iteration: state.iterationsCount, + ...options?.attributes, + }, + }, + async (iterationSpan) => { + try { + return await fn(iterationSpan); + } catch (error) { + if (error instanceof Error) { + iterationSpan.recordException(error); + state.currentSpan?.recordException(error); + } + iterationSpan.setStatus({ code: SpanStatusCode.ERROR }); + state.endSpanInNextIteration = true; + throw error; + } finally { + iterationSpan.end(); + } + } + ); + }); + } finally { + // Update iteration tracking + const duration = performance.now() - startTime; + state.runningDurationInMs += duration; + state.iterationsCount++; + state.totalIterationsCount++; + state.perTraceCountdown--; + } + } + + /** + * Clean up state for a loop when it's stopped. + */ + cleanup(loopId: string): void { + this.endCurrentSpan(loopId); + this.loopStates.delete(loopId); + } + + /** + * Clean up all loop states. + */ + cleanupAll(): void { + for (const loopId of this.loopStates.keys()) { + this.cleanup(loopId); + } + } +} + /** * No-op span implementation for when telemetry is disabled. */ diff --git a/packages/redis-worker/src/fair-queue/types.ts b/packages/redis-worker/src/fair-queue/types.ts index 107d82dbcb..90ea138f38 100644 --- a/packages/redis-worker/src/fair-queue/types.ts +++ b/packages/redis-worker/src/fair-queue/types.ts @@ -364,6 +364,12 @@ export interface FairQueueOptions