Skip to content

Commit 9c01c20

Browse files
committed
Merge remote-tracking branch 'origin/main' into v4/helm
2 parents 6a1bab3 + aaf40c3 commit 9c01c20

File tree

13 files changed

+136
-20
lines changed

13 files changed

+136
-20
lines changed

apps/webapp/app/env.server.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,9 @@ const EnvironmentSchema = z.object({
618618
LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
619619
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
620620
LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
621+
LEGACY_RUN_ENGINE_WORKER_LOG_LEVEL: z
622+
.enum(["log", "error", "warn", "info", "debug"])
623+
.default("info"),
621624

622625
LEGACY_RUN_ENGINE_WORKER_REDIS_HOST: z
623626
.string()
@@ -661,6 +664,7 @@ const EnvironmentSchema = z.object({
661664
COMMON_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
662665
COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
663666
COMMON_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
667+
COMMON_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
664668

665669
COMMON_WORKER_REDIS_HOST: z
666670
.string()
@@ -699,6 +703,7 @@ const EnvironmentSchema = z.object({
699703
ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(100),
700704
ALERTS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
701705
ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
706+
ALERTS_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
702707

703708
ALERTS_WORKER_REDIS_HOST: z
704709
.string()
@@ -732,8 +737,8 @@ const EnvironmentSchema = z.object({
732737

733738
SCHEDULE_ENGINE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
734739
SCHEDULE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
735-
SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(1),
736-
SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1),
740+
SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
741+
SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
737742
SCHEDULE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
738743
SCHEDULE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
739744
SCHEDULE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50),

apps/webapp/app/v3/alertsWorker.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ function initializeWorker() {
6161
pollIntervalMs: env.ALERTS_WORKER_POLL_INTERVAL,
6262
immediatePollIntervalMs: env.ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL,
6363
shutdownTimeoutMs: env.ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS,
64-
logger: new Logger("AlertsWorker", "debug"),
64+
logger: new Logger("AlertsWorker", env.ALERTS_WORKER_LOG_LEVEL),
6565
jobs: {
6666
"v3.deliverAlert": async ({ payload }) => {
6767
const service = new DeliverAlertService();

apps/webapp/app/v3/commonWorker.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ function initializeWorker() {
196196
pollIntervalMs: env.COMMON_WORKER_POLL_INTERVAL,
197197
immediatePollIntervalMs: env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL,
198198
shutdownTimeoutMs: env.COMMON_WORKER_SHUTDOWN_TIMEOUT_MS,
199-
logger: new Logger("CommonWorker", "debug"),
199+
logger: new Logger("CommonWorker", env.COMMON_WORKER_LOG_LEVEL),
200200
jobs: {
201201
scheduleEmail: async ({ payload }) => {
202202
await sendEmail(payload);

apps/webapp/app/v3/legacyRunEngineWorker.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ function initializeWorker() {
6868
pollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL,
6969
immediatePollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL,
7070
shutdownTimeoutMs: env.LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS,
71-
logger: new Logger("LegacyRunEngineWorker", "debug"),
71+
logger: new Logger("LegacyRunEngineWorker", env.LEGACY_RUN_ENGINE_WORKER_LOG_LEVEL),
7272
jobs: {
7373
runHeartbeat: async ({ payload }) => {
7474
const service = new TaskRunHeartbeatFailedService();

apps/webapp/app/v3/scheduleEngine.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ function createScheduleEngine() {
6161
},
6262
worker: {
6363
concurrency: env.SCHEDULE_WORKER_CONCURRENCY_LIMIT,
64+
workers: env.SCHEDULE_WORKER_CONCURRENCY_WORKERS,
65+
tasksPerWorker: env.SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER,
6466
pollIntervalMs: env.SCHEDULE_WORKER_POLL_INTERVAL,
6567
shutdownTimeoutMs: env.SCHEDULE_WORKER_SHUTDOWN_TIMEOUT_MS,
6668
disabled: env.SCHEDULE_WORKER_ENABLED === "0",

docs/tags.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ description: "Tags allow you to easily filter runs in the dashboard and when usi
55

66
## What are tags?
77

8-
We support up to 5 tags per run. Each one must be a string between 1 and 64 characters long.
8+
We support up to 10 tags per run. Each one must be a string between 1 and 64 characters long.
99

1010
We recommend prefixing your tags with their type and then an underscore or colon. For example, `user_123456` or `video:123`.
1111

internal-packages/clickhouse/src/taskRuns.test.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,4 +344,54 @@ describe("Task Runs V2", () => {
344344
expect(result2).toEqual([]);
345345
}
346346
);
347+
348+
clickhouseTest(
349+
"should be able to insert payloads with a duplicate path",
350+
async ({ clickhouseContainer }) => {
351+
const client = new ClickhouseClient({
352+
name: "test",
353+
url: clickhouseContainer.getConnectionUrl(),
354+
});
355+
356+
const insertPayloads = insertRawTaskRunPayloads(client, {
357+
async_insert: 0, // turn off async insert for this test
358+
});
359+
360+
const [insertPayloadsError, insertPayloadsResult] = await insertPayloads([
361+
{
362+
run_id: "run_1234",
363+
created_at: Date.now(),
364+
payload: {
365+
data: {
366+
title: {
367+
id: "123",
368+
},
369+
"title.id": 123,
370+
},
371+
},
372+
},
373+
]);
374+
375+
expect(insertPayloadsError).toBeNull();
376+
expect(insertPayloadsResult).toEqual(expect.objectContaining({ executed: true }));
377+
expect(insertPayloadsResult?.summary?.written_rows).toEqual("1");
378+
379+
const queryPayloads = client.query({
380+
name: "query-raw-task-run-payloads",
381+
query: "SELECT * FROM trigger_dev.raw_task_runs_payload_v1",
382+
schema: z.object({
383+
run_id: z.string(),
384+
created_at: z.coerce.date(),
385+
payload: z.unknown(),
386+
}),
387+
});
388+
389+
const [queryPayloadsError, resultPayloads] = await queryPayloads({ run_id: "run_1234" });
390+
391+
expect(queryPayloadsError).toBeNull();
392+
expect(resultPayloads).toEqual(
393+
expect.arrayContaining([expect.objectContaining({ run_id: "run_1234" })])
394+
);
395+
}
396+
);
347397
});

internal-packages/clickhouse/src/taskRuns.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ export function insertTaskRuns(ch: ClickhouseWriter, settings?: ClickHouseSettin
5959
async_insert_max_data_size: "1000000",
6060
async_insert_busy_timeout_ms: 1000,
6161
enable_json_type: 1,
62+
type_json_skip_duplicated_paths: 1,
6263
...settings,
6364
},
6465
});
@@ -83,6 +84,7 @@ export function insertRawTaskRunPayloads(ch: ClickhouseWriter, settings?: ClickH
8384
async_insert_max_data_size: "1000000",
8485
async_insert_busy_timeout_ms: 1000,
8586
enable_json_type: 1,
87+
type_json_skip_duplicated_paths: 1,
8688
...settings,
8789
},
8890
});

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ export class RunEngine {
152152
pollIntervalMs: options.worker.pollIntervalMs,
153153
immediatePollIntervalMs: options.worker.immediatePollIntervalMs,
154154
shutdownTimeoutMs: options.worker.shutdownTimeoutMs,
155-
logger: new Logger("RunEngineWorker", "debug"),
155+
logger: new Logger("RunEngineWorker", options.logLevel ?? "info"),
156156
jobs: {
157157
finishWaitpoint: async ({ payload }) => {
158158
await this.waitpointSystem.completeWaitpoint({

internal-packages/schedule-engine/src/engine/distributedScheduling.ts

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,27 @@
55
*/
66
export function calculateDistributedExecutionTime(
77
exactScheduleTime: Date,
8-
distributionWindowSeconds: number = 30
8+
distributionWindowSeconds: number = 30,
9+
instanceId?: string
910
): Date {
10-
// Use the ISO string of the exact schedule time as the seed for consistency
11-
const seed = exactScheduleTime.toISOString();
11+
// Create seed by combining ISO timestamp with optional instanceId
12+
// This ensures different instances get different distributions even with same schedule time
13+
const timeSeed = exactScheduleTime.toISOString();
14+
const seed = instanceId ? `${timeSeed}:${instanceId}` : timeSeed;
15+
16+
// Use a better hash function (FNV-1a variant) for more uniform distribution
17+
let hash = 2166136261; // FNV offset basis (32-bit)
1218

13-
// Create a simple hash from the seed string
14-
let hash = 0;
1519
for (let i = 0; i < seed.length; i++) {
16-
const char = seed.charCodeAt(i);
17-
hash = (hash << 5) - hash + char;
18-
hash = hash & hash; // Convert to 32-bit integer
20+
hash ^= seed.charCodeAt(i);
21+
hash *= 16777619; // FNV prime (32-bit)
22+
// Keep it as 32-bit unsigned integer
23+
hash = hash >>> 0;
1924
}
2025

21-
// Convert hash to a value between 0 and 1
22-
const normalized = Math.abs(hash) / Math.pow(2, 31);
26+
// Convert hash to a value between 0 and 1 using better normalization
27+
// Use the full 32-bit range for better distribution
28+
const normalized = hash / 0xffffffff;
2329

2430
// Calculate offset in milliseconds (0 to distributionWindowSeconds * 1000)
2531
const offsetMs = Math.floor(normalized * distributionWindowSeconds * 1000);

0 commit comments

Comments
 (0)