Skip to content

Commit a543cfc

Browse files
committed
fix(serverless): fix connection wf lifecycle (#3573)
1 parent 6014357 commit a543cfc

File tree

10 files changed

+652
-521
lines changed

10 files changed

+652
-521
lines changed

engine/packages/api-public/src/runner_configs/upsert.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ async fn upsert_inner(
9393
.iter()
9494
.map(|dc| (dc.clone(), body.datacenters.remove(&dc.name)))
9595
.collect::<Vec<_>>();
96+
97+
// Check for leftover datacenters in the body, this means those datacenters don't exist
98+
if !body.datacenters.is_empty() {
99+
return Err(crate::errors::Datacenter::NotFound.build());
100+
}
101+
96102
let any_endpoint_config_changed = futures_util::stream::iter(dcs)
97103
.map(|(dc, runner_config)| {
98104
let ctx = ctx.clone();

engine/packages/pegboard/src/workflows/runner2.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()
5252
})
5353
.await?;
5454

55+
// NOTE: This is only if there's a queue of actors. this path is not used if there is enough capacity of
56+
// runners, the actor wf allocates itself independently
5557
// Check for pending actors
5658
let res = ctx
5759
.activity(AllocatePendingActorsInput {

engine/packages/pegboard/src/workflows/serverless/connection.rs

Lines changed: 46 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ struct RescheduleState {
4141
#[workflow]
4242
pub async fn pegboard_serverless_connection(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> {
4343
// Run the connection activity, which will handle the full lifecycle
44-
let send_drain_started = ctx
44+
let drain_sent = ctx
4545
.loope(RescheduleState::default(), |ctx, state| {
4646
let input = input.clone();
4747

@@ -55,8 +55,8 @@ pub async fn pegboard_serverless_connection(ctx: &mut WorkflowCtx, input: &Input
5555
})
5656
.await?;
5757

58-
if let OutboundReqOutput::Done(res) = res {
59-
return Ok(Loop::Break(res.send_drain_started));
58+
if let OutboundReqOutput::Draining { drain_sent } = res {
59+
return Ok(Loop::Break(drain_sent));
6060
}
6161

6262
let mut backoff = reconnect_backoff(
@@ -87,7 +87,7 @@ pub async fn pegboard_serverless_connection(ctx: &mut WorkflowCtx, input: &Input
8787
tracing::debug!("drain received during serverless connection backoff");
8888

8989
// Notify pool that drain started
90-
return Ok(Loop::Break(true));
90+
return Ok(Loop::Break(false));
9191
}
9292

9393
Ok(Loop::Continue)
@@ -98,7 +98,7 @@ pub async fn pegboard_serverless_connection(ctx: &mut WorkflowCtx, input: &Input
9898

9999
// If we failed to send inline during the activity, durably ensure the
100100
// signal is dispatched here
101-
if send_drain_started {
101+
if !drain_sent {
102102
ctx.signal(pool::RunnerDrainStarted {
103103
runner_wf_id: input.runner_wf_id,
104104
})
@@ -141,44 +141,50 @@ struct OutboundReqInput {
141141
runner_name: String,
142142
}
143143

144-
#[derive(Debug, Serialize, Deserialize)]
145-
struct OutboundReqInnerOutput {
146-
send_drain_started: bool,
147-
}
148-
149144
#[derive(Debug, Serialize, Deserialize)]
150145
enum OutboundReqOutput {
151-
Done(OutboundReqInnerOutput),
152-
NeedsRetry,
146+
Continue,
147+
Draining {
148+
/// Whether or not to retry sending the drain signal because it failed or was never sent.
149+
drain_sent: bool,
150+
},
151+
Retry,
153152
}
154153

155154
#[activity(OutboundReq)]
156155
#[timeout = u64::MAX]
157156
async fn outbound_req(ctx: &ActivityCtx, input: &OutboundReqInput) -> Result<OutboundReqOutput> {
158-
match outbound_req_inner(ctx, input).await {
159-
Ok(res) => Ok(OutboundReqOutput::Done(res)),
160-
Err(error) => {
161-
tracing::error!(?error, "outbound_req_inner failed, retrying after backoff");
162-
Ok(OutboundReqOutput::NeedsRetry)
157+
let mut term_signal = TermSignal::new().await;
158+
let mut drain_sub = ctx
159+
.subscribe::<Drain>(("workflow_id", ctx.workflow_id()))
160+
.await?;
161+
162+
loop {
163+
match outbound_req_inner(ctx, input, &mut term_signal, &mut drain_sub).await {
164+
// If the outbound req exited successfully, continue with no backoff
165+
Ok(OutboundReqOutput::Continue) => {}
166+
Ok(OutboundReqOutput::Draining { drain_sent }) => {
167+
return Ok(OutboundReqOutput::Draining { drain_sent });
168+
}
169+
Ok(OutboundReqOutput::Retry) => return Ok(OutboundReqOutput::Retry),
170+
Err(error) => {
171+
tracing::warn!(?error, "outbound_req_inner failed, retrying after backoff");
172+
return Ok(OutboundReqOutput::Retry);
173+
}
163174
}
164175
}
165176
}
166177

167178
async fn outbound_req_inner(
168179
ctx: &ActivityCtx,
169180
input: &OutboundReqInput,
170-
) -> Result<OutboundReqInnerOutput> {
181+
term_signal: &mut TermSignal,
182+
drain_sub: &mut message::SubscriptionHandle<Drain>,
183+
) -> Result<OutboundReqOutput> {
171184
if is_runner_draining(ctx, input.runner_wf_id).await? {
172-
return Ok(OutboundReqInnerOutput {
173-
send_drain_started: true,
174-
});
185+
return Ok(OutboundReqOutput::Draining { drain_sent: false });
175186
}
176187

177-
let mut term_signal = TermSignal::new().await;
178-
let mut drain_sub = ctx
179-
.subscribe::<Drain>(("workflow_id", ctx.workflow_id()))
180-
.await?;
181-
182188
let (runner_config_res, namespace_res) = tokio::try_join!(
183189
ctx.op(crate::ops::runner_config::get::Input {
184190
runners: vec![(input.namespace_id, input.runner_name.clone())],
@@ -190,9 +196,7 @@ async fn outbound_req_inner(
190196
)?;
191197
let Some(runner_config) = runner_config_res.into_iter().next() else {
192198
tracing::debug!("runner config does not exist, ending outbound req");
193-
return Ok(OutboundReqInnerOutput {
194-
send_drain_started: true,
195-
});
199+
return Ok(OutboundReqOutput::Draining { drain_sent: false });
196200
};
197201

198202
let RunnerConfigKind::Serverless {
@@ -204,9 +208,7 @@ async fn outbound_req_inner(
204208
} = runner_config.config.kind
205209
else {
206210
tracing::debug!("runner config is not serverless, ending outbound req");
207-
return Ok(OutboundReqInnerOutput {
208-
send_drain_started: true,
209-
});
211+
return Ok(OutboundReqOutput::Draining { drain_sent: false });
210212
};
211213

212214
let namespace = namespace_res
@@ -320,26 +322,20 @@ async fn outbound_req_inner(
320322
Duration::from_secs(request_lifespan as u64).saturating_sub(DRAIN_GRACE_PERIOD);
321323
tokio::select! {
322324
res = stream_handler => {
323-
return match res {
324-
Err(e) => Err(e.into()),
325-
// TODO:
326-
// For unexpected closes, we don’t know if the runner connected
327-
// or not bc we can’t correlate the runner id.
328-
//
329-
// Lifecycle state falls apart
330-
Ok(_) => Ok(OutboundReqInnerOutput {
331-
send_drain_started: false
332-
})
333-
};
325+
match res {
326+
// If the outbound req was stopped from the client side, we can just continue the loop
327+
Ok(_) => return Ok(OutboundReqOutput::Continue),
328+
Err(e) => return Err(e.into()),
329+
}
334330
},
335331
_ = tokio::time::sleep(sleep_until_drain) => {}
336332
_ = drain_sub.next() => {}
337333
_ = term_signal.recv() => {}
338334
};
339335

340-
tracing::debug!(?runner_id, "connection reached lifespan, needs draining");
336+
tracing::debug!(?runner_id, "connection reached lifespan, starting drain");
341337

342-
if let Err(e) = ctx
338+
if let Err(err) = ctx
343339
.signal(pool::RunnerDrainStarted {
344340
runner_wf_id: input.runner_wf_id,
345341
})
@@ -349,17 +345,9 @@ async fn outbound_req_inner(
349345
.send()
350346
.await
351347
{
352-
tracing::warn!(
353-
runner_name=%input.runner_name.clone(),
354-
namespace_id=%input.namespace_id,
355-
workflow_id=%ctx.workflow_id(),
356-
"failed to send signal: {}", e
357-
);
358-
359-
// If we failed to send, have the workflow send it durably
360-
return Ok(OutboundReqInnerOutput {
361-
send_drain_started: true,
362-
});
348+
tracing::debug!(?err, "failed to send drain signal");
349+
350+
return Ok(OutboundReqOutput::Draining { drain_sent: false });
363351
}
364352

365353
// After we tell the pool we're draining, any remaining failures
@@ -371,9 +359,7 @@ async fn outbound_req_inner(
371359
tracing::debug!(?err, "failed non critical draining phase");
372360
}
373361

374-
Ok(OutboundReqInnerOutput {
375-
send_drain_started: false,
376-
})
362+
Ok(OutboundReqOutput::Draining { drain_sent: true })
377363
}
378364

379365
/// Reads from the adjacent serverless runner wf which is keeping track of signals while this workflow runs
@@ -394,7 +380,7 @@ async fn is_runner_draining(ctx: &ActivityCtx, runner_wf_id: Id) -> Result<bool>
394380
#[tracing::instrument(skip_all)]
395381
async fn finish_non_critical_draining(
396382
ctx: &ActivityCtx,
397-
mut term_signal: TermSignal,
383+
term_signal: &mut TermSignal,
398384
mut source: sse::EventSource,
399385
mut runner_id: Option<Id>,
400386
mut runner_protocol_version: Option<u16>,

engine/sdks/typescript/runner/src/mod.ts

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,25 @@ export class Runner {
302302
this.#sendActorStateUpdate(actorId, actor.generation, "stopped");
303303
}
304304

305+
#handleLost() {
306+
this.log?.info({
307+
msg: "stopping all actors due to runner lost threshold",
308+
});
309+
310+
// Remove all remaining kv requests
311+
for (const [_, request] of this.#kvRequests.entries()) {
312+
request.reject(
313+
new Error(
314+
"KV request timed out waiting for WebSocket connection",
315+
),
316+
);
317+
}
318+
319+
this.#kvRequests.clear();
320+
321+
this.#stopAllActors();
322+
}
323+
305324
#stopAllActors() {
306325
const actorIds = Array.from(this.#actors.keys());
307326
for (const actorId of actorIds) {
@@ -837,11 +856,7 @@ export class Runner {
837856
seconds: this.#runnerLostThreshold / 1000,
838857
});
839858
this.#runnerLostTimeout = setTimeout(() => {
840-
this.log?.info({
841-
msg: "stopping all actors due to runner lost threshold",
842-
});
843-
844-
this.#stopAllActors();
859+
this.#handleLost();
845860
}, this.#runnerLostThreshold);
846861
}
847862

@@ -897,7 +912,7 @@ export class Runner {
897912
seconds: this.#runnerLostThreshold / 1000,
898913
});
899914
this.#runnerLostTimeout = setTimeout(() => {
900-
this.#stopAllActors();
915+
this.#handleLost();
901916
}, this.#runnerLostThreshold);
902917
}
903918

examples/counter/scripts/connect.ts

Lines changed: 2 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pnpm-lock.yaml

Lines changed: 3 additions & 25 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rivetkit-asyncapi/asyncapi.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"asyncapi": "3.0.0",
33
"info": {
44
"title": "RivetKit WebSocket Protocol",
5-
"version": "2.0.25-rc.2",
5+
"version": "2.0.25",
66
"description": "WebSocket protocol for bidirectional communication between RivetKit clients and actors"
77
},
88
"channels": {

rivetkit-openapi/openapi.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"openapi": "3.0.0",
33
"info": {
4-
"version": "2.0.25-rc.2",
4+
"version": "2.0.25",
55
"title": "RivetKit API"
66
},
77
"components": {
@@ -666,4 +666,4 @@
666666
}
667667
}
668668
}
669-
}
669+
}

0 commit comments

Comments
 (0)