Skip to content

Commit 3436d68

Browse files
authored
fix fal timeout (#113)
* fix logos * fix fal timeout
1 parent a0f3872 commit 3436d68

33 files changed

+361
-6
lines changed

lib/steps/condition.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,4 @@ export async function conditionStep(
2626
Promise.resolve(evaluateCondition(input))
2727
);
2828
}
29+
conditionStep.maxRetries = 0;

lib/steps/database-query.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,3 +139,4 @@ export async function databaseQueryStep(
139139
"use step";
140140
return withStepLogging(input, () => databaseQuery(input));
141141
}
142+
databaseQueryStep.maxRetries = 0;

lib/steps/http-request.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,4 @@ export async function httpRequestStep(
101101
"use step";
102102
return withStepLogging(input, () => httpRequest(input));
103103
}
104+
httpRequestStep.maxRetries = 0;

lib/steps/trigger.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,4 @@ export async function triggerStep(input: TriggerInput): Promise<TriggerResult> {
5454
// Normal trigger execution with logging
5555
return withStepLogging(input, () => Promise.resolve(executeTrigger(input)));
5656
}
57+
triggerStep.maxRetries = 0;

plugins/ai-gateway/steps/generate-image.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,5 +87,6 @@ export async function generateImageStep(
8787

8888
return withStepLogging(input, () => stepHandler(input, credentials));
8989
}
90+
generateImageStep.maxRetries = 0;
9091

9192
export const _integrationType = "ai-gateway";

plugins/ai-gateway/steps/generate-text.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,5 +143,6 @@ export async function generateTextStep(
143143

144144
return withStepLogging(input, () => stepHandler(input, credentials));
145145
}
146+
generateTextStep.maxRetries = 0;
146147

147148
export const _integrationType = "ai-gateway";

plugins/blob/steps/list.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ export async function listBlobsStep(
119119

120120
return withStepLogging(input, () => stepHandler(input, credentials));
121121
}
122+
listBlobsStep.maxRetries = 0;
122123

123124
export const _integrationType = "blob";
124125

plugins/blob/steps/put.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ export async function putBlobStep(input: PutBlobInput): Promise<PutBlobResult> {
123123

124124
return withStepLogging(input, () => stepHandler(input, credentials));
125125
}
126+
putBlobStep.maxRetries = 0;
126127

127128
export const _integrationType = "blob";
128129

plugins/fal/steps/generate-image.ts

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,20 @@ import { getErrorMessage } from "@/lib/utils";
66
import type { FalCredentials } from "../credentials";
77

88
const FAL_API_URL = "https://queue.fal.run";
9+
const POLL_INTERVAL_MS = 1000;
10+
const MAX_POLL_ATTEMPTS = 600; // 10 minutes max
11+
12+
type FalQueueResponse = {
13+
status: "IN_QUEUE" | "IN_PROGRESS" | "COMPLETED";
14+
request_id: string;
15+
response_url: string;
16+
status_url: string;
17+
};
18+
19+
type FalStatusResponse = {
20+
status: "IN_QUEUE" | "IN_PROGRESS" | "COMPLETED";
21+
response_url?: string;
22+
};
923

1024
type FalImageResponse = {
1125
images?: Array<{
@@ -35,6 +49,51 @@ export type FalGenerateImageInput = StepInput &
3549
integrationId?: string;
3650
};
3751

52+
/**
53+
* Poll fal.ai queue until the request is completed
54+
*/
55+
async function pollForResult(
56+
statusUrl: string,
57+
responseUrl: string,
58+
apiKey: string
59+
): Promise<FalImageResponse> {
60+
for (let attempt = 0; attempt < MAX_POLL_ATTEMPTS; attempt++) {
61+
const statusResponse = await fetch(statusUrl, {
62+
method: "GET",
63+
headers: {
64+
Authorization: `Key ${apiKey}`,
65+
},
66+
});
67+
68+
if (!statusResponse.ok) {
69+
throw new Error(`Failed to check status: HTTP ${statusResponse.status}`);
70+
}
71+
72+
const status = (await statusResponse.json()) as FalStatusResponse;
73+
74+
if (status.status === "COMPLETED") {
75+
// Fetch the actual result
76+
const resultResponse = await fetch(responseUrl, {
77+
method: "GET",
78+
headers: {
79+
Authorization: `Key ${apiKey}`,
80+
},
81+
});
82+
83+
if (!resultResponse.ok) {
84+
throw new Error(`Failed to fetch result: HTTP ${resultResponse.status}`);
85+
}
86+
87+
return (await resultResponse.json()) as FalImageResponse;
88+
}
89+
90+
// Wait before polling again
91+
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS));
92+
}
93+
94+
throw new Error("Request timed out waiting for fal.ai to complete");
95+
}
96+
3897
/**
3998
* Core logic - portable between app and export
4099
*/
@@ -68,7 +127,20 @@ async function stepHandler(
68127
throw new Error(`HTTP ${response.status}: ${errorText}`);
69128
}
70129

71-
const result = (await response.json()) as FalImageResponse;
130+
const queueResponse = (await response.json()) as FalQueueResponse;
131+
132+
// If the response is queued, poll for the result
133+
let result: FalImageResponse;
134+
if (queueResponse.status === "IN_QUEUE" || queueResponse.status === "IN_PROGRESS") {
135+
result = await pollForResult(
136+
queueResponse.status_url,
137+
queueResponse.response_url,
138+
apiKey
139+
);
140+
} else {
141+
// Immediate response (shouldn't happen with queue endpoint, but handle it)
142+
result = queueResponse as unknown as FalImageResponse;
143+
}
72144

73145
if (result.error) {
74146
throw new Error(result.error);
@@ -103,5 +175,6 @@ export async function falGenerateImageStep(
103175

104176
return withStepLogging(input, () => stepHandler(input, credentials));
105177
}
178+
falGenerateImageStep.maxRetries = 0;
106179

107180
export const _integrationType = "fal";

plugins/fal/steps/generate-video.ts

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,20 @@ import { getErrorMessage } from "@/lib/utils";
66
import type { FalCredentials } from "../credentials";
77

88
const FAL_API_URL = "https://queue.fal.run";
9+
const POLL_INTERVAL_MS = 2000;
10+
const MAX_POLL_ATTEMPTS = 300; // 10 minutes max for video
11+
12+
type FalQueueResponse = {
13+
status: "IN_QUEUE" | "IN_PROGRESS" | "COMPLETED";
14+
request_id: string;
15+
response_url: string;
16+
status_url: string;
17+
};
18+
19+
type FalStatusResponse = {
20+
status: "IN_QUEUE" | "IN_PROGRESS" | "COMPLETED";
21+
response_url?: string;
22+
};
923

1024
type FalVideoResponse = {
1125
video?: {
@@ -29,6 +43,49 @@ export type FalGenerateVideoInput = StepInput &
2943
integrationId?: string;
3044
};
3145

46+
/**
47+
* Poll fal.ai queue until the request is completed
48+
*/
49+
async function pollForResult(
50+
statusUrl: string,
51+
responseUrl: string,
52+
apiKey: string
53+
): Promise<FalVideoResponse> {
54+
for (let attempt = 0; attempt < MAX_POLL_ATTEMPTS; attempt++) {
55+
const statusResponse = await fetch(statusUrl, {
56+
method: "GET",
57+
headers: {
58+
Authorization: `Key ${apiKey}`,
59+
},
60+
});
61+
62+
if (!statusResponse.ok) {
63+
throw new Error(`Failed to check status: HTTP ${statusResponse.status}`);
64+
}
65+
66+
const status = (await statusResponse.json()) as FalStatusResponse;
67+
68+
if (status.status === "COMPLETED") {
69+
const resultResponse = await fetch(responseUrl, {
70+
method: "GET",
71+
headers: {
72+
Authorization: `Key ${apiKey}`,
73+
},
74+
});
75+
76+
if (!resultResponse.ok) {
77+
throw new Error(`Failed to fetch result: HTTP ${resultResponse.status}`);
78+
}
79+
80+
return (await resultResponse.json()) as FalVideoResponse;
81+
}
82+
83+
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS));
84+
}
85+
86+
throw new Error("Request timed out waiting for fal.ai to complete");
87+
}
88+
3289
/**
3390
* Core logic - portable between app and export
3491
*/
@@ -45,7 +102,6 @@ async function stepHandler(
45102
try {
46103
const model = input.model || "fal-ai/minimax-video";
47104

48-
// Build request body based on whether it's text-to-video or image-to-video
49105
const requestBody: Record<string, unknown> = {
50106
prompt: input.prompt,
51107
};
@@ -68,7 +124,18 @@ async function stepHandler(
68124
throw new Error(`HTTP ${response.status}: ${errorText}`);
69125
}
70126

71-
const result = (await response.json()) as FalVideoResponse;
127+
const queueResponse = (await response.json()) as FalQueueResponse;
128+
129+
let result: FalVideoResponse;
130+
if (queueResponse.status === "IN_QUEUE" || queueResponse.status === "IN_PROGRESS") {
131+
result = await pollForResult(
132+
queueResponse.status_url,
133+
queueResponse.response_url,
134+
apiKey
135+
);
136+
} else {
137+
result = queueResponse as unknown as FalVideoResponse;
138+
}
72139

73140
if (result.error) {
74141
throw new Error(result.error);
@@ -100,5 +167,6 @@ export async function falGenerateVideoStep(
100167

101168
return withStepLogging(input, () => stepHandler(input, credentials));
102169
}
170+
falGenerateVideoStep.maxRetries = 0;
103171

104172
export const _integrationType = "fal";

0 commit comments

Comments
 (0)