Skip to content

Commit 58174af

Browse files
committed
Add streams.append
1 parent 01e8620 commit 58174af

File tree

13 files changed

+343
-83
lines changed

13 files changed

+343
-83
lines changed
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import { tryCatch } from "@trigger.dev/core/utils";
3+
import { z } from "zod";
4+
import { $replica, prisma } from "~/db.server";
5+
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
6+
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
7+
import { ServiceValidationError } from "~/v3/services/common.server";
8+
9+
const ParamsSchema = z.object({
10+
runId: z.string(),
11+
target: z.enum(["self", "parent", "root"]),
12+
streamId: z.string(),
13+
});
14+
15+
const { action } = createActionApiRoute(
16+
{
17+
params: ParamsSchema,
18+
},
19+
async ({ request, params, authentication }) => {
20+
const run = await $replica.taskRun.findFirst({
21+
where: {
22+
friendlyId: params.runId,
23+
runtimeEnvironmentId: authentication.environment.id,
24+
},
25+
select: {
26+
id: true,
27+
friendlyId: true,
28+
parentTaskRun: {
29+
select: {
30+
friendlyId: true,
31+
},
32+
},
33+
rootTaskRun: {
34+
select: {
35+
friendlyId: true,
36+
},
37+
},
38+
},
39+
});
40+
41+
if (!run) {
42+
return new Response("Run not found", { status: 404 });
43+
}
44+
45+
const targetId =
46+
params.target === "self"
47+
? run.friendlyId
48+
: params.target === "parent"
49+
? run.parentTaskRun?.friendlyId
50+
: run.rootTaskRun?.friendlyId;
51+
52+
if (!targetId) {
53+
return new Response("Target not found", { status: 404 });
54+
}
55+
56+
const targetRun = await prisma.taskRun.findFirst({
57+
where: {
58+
friendlyId: targetId,
59+
runtimeEnvironmentId: authentication.environment.id,
60+
},
61+
select: {
62+
realtimeStreams: true,
63+
realtimeStreamsVersion: true,
64+
completedAt: true,
65+
id: true,
66+
},
67+
});
68+
69+
if (!targetRun) {
70+
return new Response("Run not found", { status: 404 });
71+
}
72+
73+
if (targetRun.completedAt) {
74+
return new Response("Cannot append to a realtime stream on a completed run", {
75+
status: 400,
76+
});
77+
}
78+
79+
if (!targetRun.realtimeStreams.includes(params.streamId)) {
80+
await prisma.taskRun.update({
81+
where: {
82+
id: targetRun.id,
83+
},
84+
data: {
85+
realtimeStreams: {
86+
push: params.streamId,
87+
},
88+
},
89+
});
90+
}
91+
92+
const part = await request.text();
93+
94+
const realtimeStream = getRealtimeStreamInstance(
95+
authentication.environment,
96+
targetRun.realtimeStreamsVersion
97+
);
98+
99+
const [appendError] = await tryCatch(
100+
realtimeStream.appendPart(part, targetId, params.streamId)
101+
);
102+
103+
if (appendError) {
104+
if (appendError instanceof ServiceValidationError) {
105+
return json(
106+
{
107+
ok: false,
108+
error: appendError.message,
109+
},
110+
{ status: appendError.status ?? 422 }
111+
);
112+
} else {
113+
return json(
114+
{
115+
ok: false,
116+
error: appendError.message,
117+
},
118+
{ status: 500 }
119+
);
120+
}
121+
}
122+
123+
return json(
124+
{
125+
ok: true,
126+
},
127+
{ status: 200 }
128+
);
129+
}
130+
);
131+
132+
export { action };

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,30 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
334334
}
335335
}
336336

337+
async appendPart(part: string, runId: string, streamId: string): Promise<void> {
338+
const redis = new Redis(this.options.redis ?? {});
339+
const streamKey = `stream:${runId}:${streamId}`;
340+
341+
await redis.xadd(
342+
streamKey,
343+
"MAXLEN",
344+
"~",
345+
String(env.REALTIME_STREAM_MAX_LENGTH),
346+
"*",
347+
"clientId",
348+
"",
349+
"chunkIndex",
350+
"0",
351+
"data",
352+
part
353+
);
354+
355+
// Set TTL for cleanup when stream is done
356+
await redis.expire(streamKey, env.REALTIME_STREAM_TTL);
357+
358+
await redis.quit();
359+
}
360+
337361
async getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise<number> {
338362
const redis = new Redis(this.options.redis ?? {});
339363
const streamKey = `stream:${runId}:${streamId}`;

apps/webapp/app/services/realtime/s2realtimeStreams.server.ts

Lines changed: 31 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@ export type S2RealtimeStreamsOptions = {
1919
logLevel?: LogLevel;
2020
};
2121

22-
type S2Record = {
23-
headers?: [string, string][];
24-
body: string;
25-
seq_num?: number;
26-
timestamp?: number;
27-
};
28-
29-
type S2ReadResponse = { records: S2Record[] };
3022
type S2IssueAccessTokenResponse = { access_token: string };
23+
type S2AppendInput = { records: { body: string }[] };
24+
type S2AppendAck = {
25+
start: { seq_num: number; timestamp: number };
26+
end: { seq_num: number; timestamp: number };
27+
tail: { seq_num: number; timestamp: number };
28+
};
3129

3230
export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
3331
private readonly basin: string;
@@ -94,6 +92,14 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
9492
throw new Error("S2 streams are written to S2 via the client, not from the server");
9593
}
9694

95+
async appendPart(part: string, runId: string, streamId: string): Promise<void> {
96+
const s2Stream = this.toStreamName(runId, streamId);
97+
98+
await this.s2Append(s2Stream, {
99+
records: [{ body: part }],
100+
});
101+
}
102+
97103
getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise<number> {
98104
throw new Error("S2 streams are written to S2 via the client, not from the server");
99105
}
@@ -123,6 +129,23 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
123129
}
124130

125131
// ---------- Internals: S2 REST ----------
132+
private async s2Append(stream: string, body: S2AppendInput): Promise<S2AppendAck> {
133+
// POST /v1/streams/{stream}/records (JSON)
134+
const res = await fetch(`${this.baseUrl}/streams/${encodeURIComponent(stream)}/records`, {
135+
method: "POST",
136+
headers: {
137+
Authorization: `Bearer ${this.token}`,
138+
"Content-Type": "application/json",
139+
"S2-Format": "raw", // UTF-8 JSON encoding (no base64 overhead) when your data is text. :contentReference[oaicite:8]{index=8}
140+
},
141+
body: JSON.stringify(body),
142+
});
143+
if (!res.ok) {
144+
const text = await res.text().catch(() => "");
145+
throw new Error(`S2 append failed: ${res.status} ${res.statusText} ${text}`);
146+
}
147+
return (await res.json()) as S2AppendAck;
148+
}
126149

127150
private async s2IssueAccessToken(id: string, runId: string, streamId: string): Promise<string> {
128151
// POST /v1/access-tokens
@@ -197,45 +220,6 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
197220
});
198221
}
199222

200-
private async s2ReadOnce(
201-
stream: string,
202-
opts: {
203-
seq_num?: number;
204-
timestamp?: number;
205-
tail_offset?: number;
206-
clamp?: boolean;
207-
count?: number;
208-
bytes?: number;
209-
until?: number;
210-
wait?: number;
211-
}
212-
): Promise<S2ReadResponse> {
213-
// GET /v1/streams/{stream}/records?... (supports wait= for long-poll; linearizable reads). :contentReference[oaicite:9]{index=9}
214-
const qs = new URLSearchParams();
215-
if (opts.seq_num != null) qs.set("seq_num", String(opts.seq_num));
216-
if (opts.timestamp != null) qs.set("timestamp", String(opts.timestamp));
217-
if (opts.tail_offset != null) qs.set("tail_offset", String(opts.tail_offset));
218-
if (opts.clamp != null) qs.set("clamp", String(opts.clamp));
219-
if (opts.count != null) qs.set("count", String(opts.count));
220-
if (opts.bytes != null) qs.set("bytes", String(opts.bytes));
221-
if (opts.until != null) qs.set("until", String(opts.until));
222-
if (opts.wait != null) qs.set("wait", String(opts.wait));
223-
224-
const res = await fetch(`${this.baseUrl}/streams/${encodeURIComponent(stream)}/records?${qs}`, {
225-
method: "GET",
226-
headers: {
227-
Authorization: `Bearer ${this.token}`,
228-
Accept: "application/json",
229-
"S2-Format": "raw",
230-
},
231-
});
232-
if (!res.ok) {
233-
const text = await res.text().catch(() => "");
234-
throw new Error(`S2 read failed: ${res.status} ${res.statusText} ${text}`);
235-
}
236-
return (await res.json()) as S2ReadResponse;
237-
}
238-
239223
private parseLastEventId(lastEventId?: string): number | undefined {
240224
if (!lastEventId) return undefined;
241225
// tolerate formats like "1699999999999-5" (take leading digits)

apps/webapp/app/services/realtime/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ export interface StreamIngestor {
1313
resumeFromChunk?: number
1414
): Promise<Response>;
1515

16+
appendPart(part: string, runId: string, streamId: string): Promise<void>;
17+
1618
getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise<number>;
1719
}
1820

packages/core/src/v3/apiClient/index.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
ApiDeploymentListOptions,
77
ApiDeploymentListResponseItem,
88
ApiDeploymentListSearchParams,
9+
AppendToStreamResponseBody,
910
BatchTaskRunExecutionResult,
1011
BatchTriggerTaskV3RequestBody,
1112
BatchTriggerTaskV3Response,
@@ -1131,6 +1132,25 @@ export class ApiClient {
11311132
});
11321133
}
11331134

1135+
async appendToStream<TBody extends BodyInit>(
1136+
runId: string,
1137+
target: string,
1138+
streamId: string,
1139+
part: TBody,
1140+
requestOptions?: ZodFetchOptions
1141+
) {
1142+
return zodfetch(
1143+
AppendToStreamResponseBody,
1144+
`${this.baseUrl}/realtime/v1/streams/${runId}/${target}/${streamId}/append`,
1145+
{
1146+
method: "POST",
1147+
headers: this.#getHeaders(false),
1148+
body: part,
1149+
},
1150+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
1151+
);
1152+
}
1153+
11341154
async generateJWTClaims(requestOptions?: ZodFetchOptions): Promise<Record<string, any>> {
11351155
return zodfetch(
11361156
z.record(z.any()),

packages/core/src/v3/realtimeStreams/index.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { getGlobal, registerGlobal } from "../utils/globals.js";
22
import { NoopRealtimeStreamsManager } from "./noopManager.js";
33
import {
4-
RealtimePipeStreamOptions,
4+
RealtimeStreamOperationOptions,
55
RealtimeStreamInstance,
66
RealtimeStreamsManager,
77
} from "./types.js";
@@ -34,8 +34,16 @@ export class RealtimeStreamsAPI implements RealtimeStreamsManager {
3434
public pipe<T>(
3535
key: string,
3636
source: AsyncIterable<T> | ReadableStream<T>,
37-
options?: RealtimePipeStreamOptions
37+
options?: RealtimeStreamOperationOptions
3838
): RealtimeStreamInstance<T> {
3939
return this.#getManager().pipe(key, source, options);
4040
}
41+
42+
public append<TPart extends BodyInit>(
43+
key: string,
44+
part: TPart,
45+
options?: RealtimeStreamOperationOptions
46+
): Promise<void> {
47+
return this.#getManager().append(key, part, options);
48+
}
4149
}

0 commit comments

Comments
 (0)